If updating same item with same values, the stream won’t be triggered. Tip: add lastUpdated value as timestamp Date.now() to make sure triggering event.
If it takes too long to process:
check how many items to run batch for each invocation.
check lamda concurrency.
check lambda monitor tab to see invocations, duration and other metric.
The disadvantages of dynamo stream is that when we want to replay the event for testing. The condition filtering is not flexible .
Very careful with javascript async function, it easily leads to inconsistent data. Recommend to use await to force function in sync.
Dynamo stream trigger lambda asynchronously. If the computed function needs to process data in sync that will be trouble.
// Initialize the DynamoDB Streams client
const dynamoDBStreams = new AWS.DynamoDBStreams
// Replace with your actual Stream ARN
const streamArn = 'arn:aws:dynamodb:ap-southeast-1::table//stream/2024-10-02T08:47:30.281';
// Function to replay stream events
async function replayStreamEvents(startTime) {
try {
// Describe the stream to get the shard ID
const streamDescription = await dynamoDBStreams.describeStream({ StreamArn: streamArn }).promise();
const shardId = streamDescription.StreamDescription.Shards[0].ShardId;
// Get the shard iterator to start reading the records
const iteratorResponse = await dynamoDBStreams.getShardIterator({
StreamArn: streamArn,
ShardId: shardId,
ShardIteratorType: 'TRIM_HORIZON' // Start from the oldest record
}).promise();
let shardIterator = iteratorResponse.ShardIterator;
// Loop to read records from the stream
while (shardIterator) {
const recordsResponse = await dynamoDBStreams.getRecords({ ShardIterator: shardIterator }).promise();
const records = recordsResponse.Records;
// Process the records
records.forEach(record => {
if (isCreateOrEditEvent(record)) {
const newImage = record.dynamodb.NewImage;
const clientId = newImage.client_id.S;
const sampledTime = newImage.sampled_time.N;
// Main scope to process record.
}
});
// Update the shard iterator for the next iteration
shardIterator = recordsResponse.NextShardIterator;
// Exit if there are no more records to process
if (records.length === 0) {
break;
}
// Optional: Sleep for a moment to avoid excessive calls
await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second if no records
}
} catch (error) {
console.error("Error replaying stream events:", error);
}
}
function isCreateOrEditEvent(record) {
return record.eventName !== "REMOVE";
}
// Call the function to start replaying events
replayStreamEvents(1729526400);