AWS

Replay dynamo stream events

  • If updating same item with same values, the stream won’t be triggered. Tip: add lastUpdated value as timestamp Date.now() to make sure triggering event.
  • If it takes too long to process:
    • check how many items to run batch for each invocation.
    • check lamda concurrency.
    • check lambda monitor tab to see invocations, duration and other metric.
  • The disadvantages of dynamo stream is that when we want to replay the event for testing. The condition filtering is not flexible .
  • Very careful with javascript async function, it easily leads to inconsistent data. Recommend to use await to force function in sync.
  • Dynamo stream trigger lambda asynchronously. If the computed function needs to process data in sync that will be trouble.

// Initialize the DynamoDB Streams client
const dynamoDBStreams = new AWS.DynamoDBStreams

// Replace with your actual Stream ARN
const streamArn = 'arn:aws:dynamodb:ap-southeast-1::table//stream/2024-10-02T08:47:30.281';

// Function to replay stream events
async function replayStreamEvents(startTime) {
    try {
        // Describe the stream to get the shard ID
        const streamDescription = await dynamoDBStreams.describeStream({ StreamArn: streamArn }).promise();
        const shardId = streamDescription.StreamDescription.Shards[0].ShardId;

        // Get the shard iterator to start reading the records
        const iteratorResponse = await dynamoDBStreams.getShardIterator({
            StreamArn: streamArn,
            ShardId: shardId,
            ShardIteratorType: 'TRIM_HORIZON' // Start from the oldest record
        }).promise();

        let shardIterator = iteratorResponse.ShardIterator;

        // Loop to read records from the stream
        while (shardIterator) {
            const recordsResponse = await dynamoDBStreams.getRecords({ ShardIterator: shardIterator }).promise();
            const records = recordsResponse.Records;

            // Process the records
            records.forEach(record => {
                if (isCreateOrEditEvent(record)) {
                    const newImage = record.dynamodb.NewImage;
                    const clientId = newImage.client_id.S;
                    const sampledTime = newImage.sampled_time.N;

                    // Main scope to process record.
                }
            });

            // Update the shard iterator for the next iteration
            shardIterator = recordsResponse.NextShardIterator;

            // Exit if there are no more records to process
            if (records.length === 0) {
                break;
            }

            // Optional: Sleep for a moment to avoid excessive calls
            await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second if no records
        }
    } catch (error) {
        console.error("Error replaying stream events:", error);
    }
}
function isCreateOrEditEvent(record) {
    return record.eventName !== "REMOVE";
}
// Call the function to start replaying events
replayStreamEvents(1729526400);
0