0

I have a small function that gets sent log to it. In the function, I am trying to send the to AWS Cloudwatch however I have a few issues. Once one log has been sent I can not send another one until the first one has finished as I need the next sequenceToken so the next log knows where to add itself. I know that async iterators are the key here but not sure how to implement them in my code. Here is my current code which fails after the first log is sent:

const build = require('pino-abstract-stream');

const stream = async (options) => {
    // Creates the AWS connection
    const client = await createClient();
 
    // Gets the first token
    let sequenceToken = await getInitSequenceToken(client);

    return build(function (source) {
        source.on("data", async function (obj) {
            // Every time the log is sent to my worker thread it prints out here. Obj contains the info from the log
            const command = new PutLogEventsCommand({
                logGroupName: "api",
                logStreamName: `executive-${Config.env}-${Config.location}`,
                logEvents: [
                    {
                        message: obj.msg,
                        timestamp: obj.time,
                    },
                ],
                sequenceToken,
            });

            // Here I am sending the log to Clouwatch 
            const response = await client.send(command);
        
            // Here I was updating the token but this fails as the next log is already sending 
            sequenceToken = response.nextSequenceToken;
        });
    });
};
10
  • 1
    What is source, a websocket ? Commented Sep 7, 2021 at 9:18
  • Logs from pino using custom transport: github.com/pinojs/pino-abstract-transport Commented Sep 7, 2021 at 9:24
  • use the first example from that link the one with for await, that is the "sync" version. Commented Sep 7, 2021 at 9:26
  • I know I can use the first example and send one requests at time but surely is better to batch them and that is where I am falling over as really unsure how to do that. Commented Sep 7, 2021 at 9:36
  • 1
    You can store the response in an outside variable(array), then once the stream close ( source.on('end') ), you'll loop with a reduce over the client.send Commented Sep 7, 2021 at 9:41

2 Answers 2

2

You can wait for the stream(source) to finish, and store all objects into an array then send them with one request - seeing that logEvents accepts an array with multiple objects - that would probably be the better solution since you have timestamps on objects and you'll have them all grouped under one sequenceToken:

const build = require('pino-abstract-stream');

const stream = async(options) => {
  // Creates the AWS connection
  const client = await createClient();

  // Gets the first token
  let sequenceToken = await getInitSequenceToken(client);
  let storeStreamObjects = [];
    
  return build(function(source) {
    source.on("data", async function(obj) {
      storeStreamObjects.push({
        message: obj.msg,
        timestamp: obj.time,
      });
    });
    source.on("close", async () => {

      const command = new PutLogEventsCommand({
        logGroupName: "api",
        logStreamName: `executive-${Config.env}-${Config.location}`,
        logEvents: storeStreamObjects, // all objects are here
        sequenceToken,
      });

      return await client.send(command);

    })
  });
};
Sign up to request clarification or add additional context in comments.

3 Comments

Can not believe I did not see this solution, typical over thinking a problem. Thanks
Oversight here on my part, the stream does not end unless the server shuts down. There is data being sent to the stream on and off the whole time the server is live
I've changed the finish event from end to close since they say it's a split2 instance.
1

If the logs can be sent later to the cloud, you may do something like this below.

const build = require("pino-abstract-stream");

const stream = async options => {
  // Creates the AWS connection
  const client = await createClient();

  // Gets the first token
  let sequenceToken = await getInitSequenceToken(client);

  const steamData = [];

  return build(function (source) {
    source.on("data", async function (obj) {
      steamData.push(obj);
    });

    source.on("end", async function () {
      // Every time the log is sent to my worker thread it prints out here. Obj contains the info from the log
      for (const data of steamData) {
        const command = new PutLogEventsCommand({
          logGroupName: "api",
          logStreamName: `executive-${Config.env}-${Config.location}`,
          logEvents: [
            {
              message: data.msg,
              timestamp: data.time,
            },
          ],
          sequenceToken,
        });

        // Here I am sending the log to Clouwatch
        const response = await client.send(command);

        sequenceToken = response.nextSequenceToken;
      }
    });
  });
};

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.