Skip to main content

SQS Audio Anomaly Worker - High Level Design (HLD)

Implementation-level details live in:

  • docs/AUDIO_ANOMALY_PIPELINE_IMPLEMENTATION.md

1. Purpose

Build a resilient, horizontally scalable background processing service that:

  • discovers audio assessment records in Mongo for a specific test_date
  • enqueues them to SQS for decoupled processing
  • performs anomaly inference in batches
  • persists final results to a dedicated Mongo result collection with idempotent guarantees

The service is designed for 10K to 900K records/day and runs as an independent ECS Fargate worker process.

2. Scope

In scope:

  • Mongo discovery with backfill and incremental watermarking
  • SQS message consumption and batching
  • Audio download and inference
  • Idempotent MongoDB result persistence
  • Safe SQS acknowledgement and retry behavior
  • ECS-friendly lifecycle and autoscaling

Out of scope:

  • Upstream producer logic that enqueues messages
  • Model training and model deployment workflow
  • UI/reporting dashboards

3. Functional Requirements Mapping

  • Mongo discovery for one test_date
  • Backfill mode and incremental mode
  • Continuous SQS polling with long-poll receive
  • Receive messages in batches of up to 10
  • Aggregate to inference batches (default 100)
  • Run batched model inference
  • Bulk upsert MongoDB results in chunks of 500
  • Delete only successfully processed messages from SQS
  • Concurrency throttling using semaphore
  • Idempotency: process only if a result document does not already exist
  • Graceful SIGTERM shutdown
  • Structured logging and throughput metrics
  • Failures remain in queue for retry and DLQ routing

4. High-Level Architecture

graph TD
A[Mongo Source Collection] --> B[Discovery Service]
B --> C[anomaly_pipeline_state]
B --> D[SQS Queue]
D --> E[ECS Fargate Worker Tasks]
E --> F[In-Memory Buffer]
F --> G[Batch Orchestrator]
G --> H[Async Audio Downloader]
H --> I[Feature Extraction + Scaler]
I --> J[Model Batch Inference]
J --> K[anomaly_detection_results]
K --> L[SQS Delete Ack]
E --> M[Structured Logs + Throughput Metrics]
D --> N[DLQ]

5. Runtime Components

  • Worker Process: one process per ECS task, asynchronous event loop.
  • Discovery Service: scans source Mongo rows for a single test_date, normalizes both teacher_record and admin_record, and advances enqueue watermark.
  • SQS Poller: long-poll receiver pulling up to 10 messages per request.
  • Batch Buffer: accumulates messages until BATCH_SIZE or flush timeout.
  • Batch Executor: runs batch pipeline under MAX_CONCURRENCY guard.
  • Inference Engine: local artifacts loaded once at startup.
  • Persistence Layer: source existence check plus result upsert.
  • Ack Manager: deletes only records that are completed or intentionally skipped (already checked).

6. Data Flow

  1. Discovery service loads watermark from anomaly_pipeline_state.
  2. It scans source Mongo rows for the selected test_date, covering both teacher_record.subjects and admin_record.subjects.
  3. It filters to the configured reading subject, default subject_name = "वाचन".
  4. It uses subjects[].subject_created_at as the incremental watermark timestamp and sorts by (subject_created_at, mongo_doc_id, subject_id).
  5. It skips rows that already have a result document in anomaly_detection_results.
  6. It enqueues normalized messages to SQS and advances watermark only after successful enqueue.
  7. Worker polls SQS and parses message payload.
  8. Worker rechecks source existence and result idempotency.
  9. Worker downloads audio and runs batched inference.
  10. Worker bulk upserts final results into anomaly_detection_results.
  11. Worker deletes corresponding SQS messages only after durable result write.

7. Scalability Strategy

  • Horizontal: multiple ECS tasks process the same queue safely.
  • Horizontal discovery is possible, but simplest deployment is a single discovery task per test_date.
  • Vertical within task: controlled parallelism via MAX_CONCURRENCY.
  • Backpressure: bounded buffer prevents unbounded memory growth.
  • Batch I/O: bulk read/filter and bulk write to reduce DB round-trips.

Recommended autoscaling input:

  • CloudWatch ApproximateNumberOfMessagesVisible
  • Preferred custom metric: BacklogPerTask = VisibleMessages / RunningTasks

8. Reliability Strategy

  • At-least-once processing semantics.
  • Idempotent result upsert prevents duplicate processing effects.
  • Message delete occurs only after durable DB update.
  • Poison messages naturally move to DLQ after redrive max receive count.
  • Graceful shutdown drains buffer and in-flight tasks before exit.

9. Performance and Capacity

  • Expected shape: nightly burst after 7 PM.
  • Controls:
    • BATCH_SIZE default 100
    • MAX_CONCURRENCY default 4
    • Mongo bulk write chunk 500
    • S3 download timeout 3s
    • Inference batch timeout 5s
  • Throughput metric: records/minute emitted every minute.

10. Security and Compliance

  • IAM least privilege for SQS and S3 access.
  • Secrets (Mongo URI) managed via ECS secrets manager integration.
  • Private network path to Mongo (VPC peering/private endpoint).
  • Structured logs avoid sensitive payload fields.

11. Deployment Topology

  • Same repository and container image as API is acceptable.
  • Separate ECS service/task definition for worker command:
    • python -m app.workers.sqs_anomaly_worker
  • API and worker scale independently.

12. Risks and Mitigations

  • Slow audio/model path:
    • mitigate with semaphore and timeout controls.
  • Mongo overload under spikes:
    • mitigate via bulk writes, capped concurrency, autoscaling.
  • Duplicate message delivery:
    • mitigate with idempotent update condition.
  • Queue backlog growth:
    • mitigate with target-tracking autoscaling and nightly pre-scale.