SQS Async Mastery

November 19, 2024|25 min read
SQSAsyncPatterns

title: "Preferred Patterns with SQS" date: "2025-08-23" readTime: "18 min read" tags: ["Distributed Systems", "AWS", "SQS", "Async Processing"] description: "Patterns for reliable, idempotent, and cost-effective message processing at low to high scale." status: "published"

Resilient systems must handle failures gracefully, avoid data corruption, and scale economically. These patterns distinguish production-grade systems from prototypes for product market fit.

Maybe you started with a monolith on a home server, and are ready to go to the cloud. SQS is a key part of the AWS stack.

Here are some patterns for problems you may bump into.

1. The S3 Batch Delay Pattern: Delaying Millions of Messages Economically

The Problem

SQS has a maximum delay of 15 minutes. When you need to delay messages for hours, days, or even months, keeping them in SQS becomes expensive and impractical. Enter the S3 batch delay pattern.

The Solution: S3 + Step Functions

Serialize batches of messages to S3, use Step Functions for orchestration (with waits up to a year!), then rehydrate and process when ready.

Architecture Overview

// Step 1: Lambda to batch messages to S3
export const batchDelayHandler = async (event: SQSEvent) => {
  const messages = event.Records;
  const delayUntil = calculateDelayTimestamp(messages);
  
  // Group messages by delay time for efficient batching
  const messagesByDelay = groupMessagesByDelay(messages);
  
  for (const [delayKey, batch] of messagesByDelay) {
    // Serialize batch to S3
    const s3Key = `delayed-messages/${delayKey}/${Date.now()}-${uuid()}.json`;
    await s3.putObject({
      Bucket: process.env.DELAY_BUCKET,
      Key: s3Key,
      Body: JSON.stringify({
        messages: batch,
        delayUntil,
        originalReceiptHandles: batch.map(m => m.receiptHandle)
      }),
      Metadata: {
        'message-count': String(batch.length),
        'delay-until': delayUntil.toISOString()
      }
    }).promise();
    
    // Start Step Function for this batch
    await stepFunctions.startExecution({
      stateMachineArn: process.env.DELAY_STATE_MACHINE_ARN,
      input: JSON.stringify({
        s3Bucket: process.env.DELAY_BUCKET,
        s3Key,
        delayUntil: delayUntil.toISOString()
      })
    }).promise();
    
    // Delete original messages from SQS
    await deleteMessagesFromQueue(batch);
  }
};

Step Function Definition

{
  "Comment": "Delay messages up to one year",
  "StartAt": "CalculateWaitTime",
  "States": {
    "CalculateWaitTime": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:calculateWaitTime",
      "ResultPath": "$.waitSeconds",
      "Next": "WaitForDelay"
    },
    "WaitForDelay": {
      "Type": "Wait",
      "SecondsPath": "$.waitSeconds",
      "Next": "RehydrateMessages"
    },
    "RehydrateMessages": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT:function:rehydrateMessages",
      "Parameters": {
        "s3Bucket.$": "$.s3Bucket",
        "s3Key.$": "$.s3Key"
      },
      "Retry": [
        {
          "ErrorEquals": ["States.TaskFailed"],
          "IntervalSeconds": 2,
          "MaxAttempts": 3,
          "BackoffRate": 2
        }
      ],
      "End": true
    }
  }
}

Rehydration Lambda

export const rehydrateMessages = async (event: RehydrateEvent) => {
  // Fetch batch from S3
  const s3Object = await s3.getObject({
    Bucket: event.s3Bucket,
    Key: event.s3Key
  }).promise();
  
  const batch = JSON.parse(s3Object.Body.toString());
  const messages = batch.messages;
  
  // Re-queue messages in chunks (SQS batch limit is 10)
  const chunks = chunkArray(messages, 10);
  
  for (const chunk of chunks) {
    const entries = chunk.map((msg, idx) => ({
      Id: String(idx),
      MessageBody: msg.body,
      MessageAttributes: msg.messageAttributes
    }));
    
    await sqs.sendMessageBatch({
      QueueUrl: process.env.TARGET_QUEUE_URL,
      Entries: entries
    }).promise();
  }
  
  // Clean up S3 object
  await s3.deleteObject({
    Bucket: event.s3Bucket,
    Key: event.s3Key
  }).promise();
  
  // Log completion
  await cloudwatch.putMetricData({
    Namespace: 'AsyncProcessing',
    MetricData: [{
      MetricName: 'MessagesRehydrated',
      Value: messages.length,
      Unit: 'Count'
    }]
  }).promise();
};

Cost Analysis

For 1 million messages delayed 30 days:

  • SQS approach: ~$360/month (visibility timeout extensions)
  • S3 + Step Functions: ~$3/month (S3 storage + minimal Step Function cost)
  • Savings: 99.2% cost reduction

2. Idempotency First: Design for Eventual Success or Intervention

Core Principles

  • Every operation must be idempotent
  • Assume messages will be processed multiple times
  • Dead Letter Queues are mandatory, not optional
  • Monitor queue age and volume proactively

Idempotent Message Processor

interface ProcessingResult {
  success: boolean;
  idempotent: boolean;
  error?: Error;
}

export class IdempotentProcessor {
  private readonly maxRetries = 5;
  
  async processMessage(message: SQSMessage): Promise<ProcessingResult> {
    const messageId = this.extractIdempotencyKey(message);
    
    try {
      // Check if already processed
      const existingRecord = await this.checkProcessingRecord(messageId);
      if (existingRecord?.status === 'completed') {
        return { success: true, idempotent: true };
      }
      
      // Create or update processing record
      await this.createProcessingRecord(messageId, 'processing');
      
      // Execute idempotent operations
      const operations = [
        this.updateUserAccount(message),
        this.chargePayment(message),
        this.sendNotification(message),
        this.updateAnalytics(message)
      ];
      
      // All operations are idempotent, safe to retry
      await Promise.all(operations);
      
      // Mark as complete
      await this.updateProcessingRecord(messageId, 'completed');
      
      return { success: true, idempotent: false };
      
    } catch (error) {
      // Check if this is a permanent failure
      if (this.isPermanentFailure(error)) {
        await this.updateProcessingRecord(messageId, 'failed', error);
        throw error; // Let it go to DLQ
      }
      
      // Transient failure - will retry
      throw error;
    }
  }
  
  private extractIdempotencyKey(message: SQSMessage): string {
    // CRITICAL: Key must be derivable from message content
    const body = JSON.parse(message.Body);
    return `${body.userId}#${body.actionId}#${body.timestamp}`;
  }
  
  private isPermanentFailure(error: Error): boolean {
    // Business logic failures that won't resolve with retries
    const permanentErrors = [
      'INVALID_USER',
      'INSUFFICIENT_FUNDS',
      'ACCOUNT_SUSPENDED',
      'INVALID_PAYLOAD'
    ];
    
    return permanentErrors.some(e => error.message.includes(e));
  }
}

DLQ Configuration & Monitoring

// CloudFormation/CDK Configuration
const mainQueue = new sqs.Queue(this, 'MainQueue', {
  visibilityTimeout: Duration.minutes(5),
  retentionPeriod: Duration.days(14),
  deadLetterQueue: {
    queue: dlq,
    maxReceiveCount: 5 // After 5 failures, move to DLQ
  }
});

const dlq = new sqs.Queue(this, 'DeadLetterQueue', {
  retentionPeriod: Duration.days(14), // Keep for investigation
  // IMPORTANT: Set longer than alarm threshold!
  messageRetentionPeriod: Duration.days(7)
});

// Monitoring Setup
const ageAlarm = new cloudwatch.Alarm(this, 'QueueAgeAlarm', {
  metric: mainQueue.metricApproximateAgeOfOldestMessage(),
  threshold: 300, // 5 minutes
  evaluationPeriods: 2,
  treatMissingData: cloudwatch.TreatMissingData.NOT_BREACHING
});

const dlqAlarm = new cloudwatch.Alarm(this, 'DLQDepthAlarm', {
  metric: dlq.metricApproximateNumberOfMessagesVisible(),
  threshold: 10,
  evaluationPeriods: 1
});

// Progressive alerting
const warningTopic = new sns.Topic(this, 'WarningTopic');
const criticalTopic = new sns.Topic(this, 'CriticalTopic');

// Low priority warning (Slack/Email)
ageAlarm.addAlarmAction(new actions.SnsAction(warningTopic));

// High priority page when DLQ has messages
dlqAlarm.addAlarmAction(new actions.SnsAction(criticalTopic));

Anti-Pattern: Time-Based Idempotency

// ❌ NEVER DO THIS
async processMessage(message: Message) {
  const processed = await db.query({
    TableName: 'ProcessedMessages',
    KeyConditionExpression: 'messageType = :type AND processedAt > :time',
    ExpressionAttributeValues: {
      ':type': message.type,
      ':time': Date.now() - 3600000 // Last hour
    }
  });
  
  if (processed.Items.length > 0) {
    return; // Skip - already processed
  }
  
  // THIS IS BROKEN! Time-based checks create race conditions
}

Key Takeaways

  • Always configure a DLQ with maxReceiveCount
  • Set DLQ retention > alarm threshold to prevent data loss
  • Use progressive alerting: warning → critical → page
  • Design for idempotent retries at every level
  • Never use time-based idempotency checks

3. Guarding Side Effects: Preventing Double Charges, Emails, and Rewards

The Challenge

Side effects like sending emails, charging cards, or granting rewards must happen exactly once, even when messages are retried multiple times. Each external system needs its own idempotency strategy.

Multi-System Idempotent Orchestration

interface RewardGrantEvent {
  userId: string;
  rewardId: string;
  amount: number;
  campaignId: string;
  timestamp: string;
}

export class RewardOrchestrator {
  async processRewardGrant(event: RewardGrantEvent): Promise<void> {
    // Derive idempotency key from event data
    const idempotencyKey = `${event.userId}#${event.rewardId}#${event.campaignId}`;
    
    // Step 1: Grant reward (idempotent)
    await this.grantReward(idempotencyKey, event);
    
    // Step 2: Send email notification (idempotent)
    await this.sendRewardEmail(idempotencyKey, event);
    
    // Step 3: Deduct from campaign budget (idempotent)
    await this.deductCampaignBudget(idempotencyKey, event);
    
    // All operations completed successfully
  }
  
  private async grantReward(idempotencyKey: string, event: RewardGrantEvent) {
    const params = {
      TableName: 'UserRewards',
      Item: {
        userId: event.userId,
        rewardKey: idempotencyKey, // Composite key ensures uniqueness
        amount: event.amount,
        grantedAt: new Date().toISOString(),
        status: 'granted'
      },
      ConditionExpression: 'attribute_not_exists(rewardKey)'
    };
    
    try {
      await dynamodb.putItem(params).promise();
      console.log('Reward granted:', idempotencyKey);
    } catch (error) {
      if (error.code === 'ConditionalCheckFailedException') {
        console.log('Reward already granted:', idempotencyKey);
        return; // Idempotent success
      }
      throw error;
    }
  }
  
  private async sendRewardEmail(idempotencyKey: string, event: RewardGrantEvent) {
    // Email service with its own idempotency
    const emailClient = new IdempotentEmailClient();
    
    await emailClient.sendEmail({
      idempotencyKey: `email#${idempotencyKey}`,
      to: await this.getUserEmail(event.userId),
      template: 'reward-granted',
      data: {
        amount: event.amount,
        rewardId: event.rewardId
      }
    });
  }
  
  private async deductCampaignBudget(idempotencyKey: string, event: RewardGrantEvent) {
    // Use DynamoDB conditional update for idempotency
    const deductionKey = `deduction#${idempotencyKey}`;
    
    // First, record the deduction attempt
    try {
      await dynamodb.putItem({
        TableName: 'CampaignDeductions',
        Item: {
          deductionId: deductionKey,
          campaignId: event.campaignId,
          amount: event.amount,
          timestamp: new Date().toISOString()
        },
        ConditionExpression: 'attribute_not_exists(deductionId)'
      }).promise();
    } catch (error) {
      if (error.code === 'ConditionalCheckFailedException') {
        console.log('Budget already deducted:', deductionKey);
        return; // Idempotent success
      }
      throw error;
    }
    
    // Then update the campaign budget
    await dynamodb.updateItem({
      TableName: 'Campaigns',
      Key: { campaignId: event.campaignId },
      UpdateExpression: 'ADD remainingBudget :amount',
      ExpressionAttributeValues: {
        ':amount': -event.amount
      }
    }).promise();
  }
}

Side Effect Best Practices

  • Each external system needs its own idempotency mechanism
  • Use deterministic keys derived from event data
  • Record attempts before executing side effects
  • Design for "at least once" delivery with "exactly once" side effects
  • Keep idempotency records with appropriate TTLs

4. Individualized Expiring Locks: The DynamoDB Pattern

The Pattern

Use DynamoDB with TTL to create expiring locks that automatically clean up if processing fails. Combine with transaction conditions to ensure exactly-once processing without explicit cleanup code.

Expiring Lock Implementation

export class ExpiringLockService {
  private readonly lockTable = 'ProcessingLocks';
  private readonly historyTable = 'ProcessingHistory';
  
  async acquireLockAndProcess<T>(
    lockKey: string,
    visibilityTimeout: number,
    processor: () => Promise<T>
  ): Promise<T> {
    // Step 1: Attempt to acquire lock with TTL
    const lockItem = {
      lockId: lockKey,
      status: 'locked',
      lockedAt: new Date().toISOString(),
      ttl: Math.floor(Date.now() / 1000) + visibilityTimeout
    };
    
    try {
      // Use transaction to ensure atomicity
      await dynamodb.transactWrite({
        TransactItems: [
          {
            Put: {
              TableName: this.lockTable,
              Item: lockItem,
              ConditionExpression: 'attribute_not_exists(lockId)'
            }
          },
          {
            ConditionCheck: {
              TableName: this.historyTable,
              Key: { processId: lockKey },
              ConditionExpression: 'attribute_not_exists(processId)'
            }
          }
        ]
      }).promise();
    } catch (error) {
      if (error.code === 'TransactionCanceledException') {
        // Either locked or already processed
        const history = await this.checkHistory(lockKey);
        if (history) {
          console.log('Already processed:', lockKey);
          return history.result as T;
        }
        throw new Error('Resource is locked');
      }
      throw error;
    }
    
    // Step 2: Execute processing
    try {
      const result = await processor();
      
      // Step 3: Record success and release lock atomically
      await this.recordSuccess(lockKey, result);
      
      return result;
    } catch (error) {
      // Lock will auto-expire via TTL
      console.error('Processing failed, lock will expire:', lockKey);
      throw error;
    }
  }
  
  private async recordSuccess<T>(lockKey: string, result: T) {
    await dynamodb.transactWrite({
      TransactItems: [
        {
          Put: {
            TableName: this.historyTable,
            Item: {
              processId: lockKey,
              status: 'completed',
              completedAt: new Date().toISOString(),
              result: result,
              ttl: Math.floor(Date.now() / 1000) + 86400 * 90 // 90 days
            }
          }
        },
        {
          Delete: {
            TableName: this.lockTable,
            Key: { lockId: lockKey }
          }
        }
      ]
    }).promise();
  }
  
  private async checkHistory(lockKey: string) {
    const result = await dynamodb.getItem({
      TableName: this.historyTable,
      Key: { processId: lockKey }
    }).promise();
    
    return result.Item;
  }
}

Using the Lock Service

export class EmailProcessor {
  private lockService = new ExpiringLockService();
  
  async processEmailRequest(message: SQSMessage): Promise<void> {
    const body = JSON.parse(message.Body);
    
    // Derive lock key from message content
    const lockKey = `email#${body.userId}#${body.campaignId}#${body.eventId}`;
    
    // Lock timeout matches SQS visibility timeout
    const visibilityTimeout = 300; // 5 minutes
    
    await this.lockService.acquireLockAndProcess(
      lockKey,
      visibilityTimeout,
      async () => {
        // This will only execute once, even with retries
        await this.sendEmail(body);
        
        // Return any result data
        return {
          messageId: 'ses-message-id',
          sentAt: new Date().toISOString()
        };
      }
    );
  }
  
  private async sendEmail(emailData: any) {
    // Actual email sending logic
    const result = await ses.sendEmail({
      Source: 'noreply@example.com',
      Destination: { ToAddresses: [emailData.to] },
      Message: {
        Subject: { Data: emailData.subject },
        Body: { Text: { Data: emailData.body } }
      }
    }).promise();
    
    return result.MessageId;
  }
}

Key Benefits

  • Automatic cleanup on failure via TTL
  • Atomic lock acquisition and history check
  • No manual lock release required
  • Idempotent by design
  • Works across distributed systems