SQS Audio Anomaly Worker - High Level Design (HLD)
Implementation-level details live in:
docs/AUDIO_ANOMALY_PIPELINE_IMPLEMENTATION.md
1. Purpose
Build a resilient, horizontally scalable background processing service that:
- discovers audio assessment records in Mongo for a specific
test_date - enqueues them to SQS for decoupled processing
- performs anomaly inference in batches
- persists final results to a dedicated Mongo result collection with idempotent guarantees
The service is designed for 10K to 900K records/day and runs as an independent ECS Fargate worker process.
2. Scope
In scope:
- Mongo discovery with backfill and incremental watermarking
- SQS message consumption and batching
- Audio download and inference
- Idempotent MongoDB result persistence
- Safe SQS acknowledgement and retry behavior
- ECS-friendly lifecycle and autoscaling
Out of scope:
- Upstream producer logic that enqueues messages
- Model training and model deployment workflow
- UI/reporting dashboards
3. Functional Requirements Mapping
- Mongo discovery for one
test_date - Backfill mode and incremental mode
- Continuous SQS polling with long-poll receive
- Receive messages in batches of up to 10
- Aggregate to inference batches (default 100)
- Run batched model inference
- Bulk upsert MongoDB results in chunks of 500
- Delete only successfully processed messages from SQS
- Concurrency throttling using semaphore
- Idempotency: process only if a result document does not already exist
- Graceful SIGTERM shutdown
- Structured logging and throughput metrics
- Failures remain in queue for retry and DLQ routing
4. High-Level Architecture
graph TD
A[Mongo Source Collection] --> B[Discovery Service]
B --> C[anomaly_pipeline_state]
B --> D[SQS Queue]
D --> E[ECS Fargate Worker Tasks]
E --> F[In-Memory Buffer]
F --> G[Batch Orchestrator]
G --> H[Async Audio Downloader]
H --> I[Feature Extraction + Scaler]
I --> J[Model Batch Inference]
J --> K[anomaly_detection_results]
K --> L[SQS Delete Ack]
E --> M[Structured Logs + Throughput Metrics]
D --> N[DLQ]
5. Runtime Components
- Worker Process: one process per ECS task, asynchronous event loop.
- Discovery Service: scans source Mongo rows for a single
test_date, normalizes bothteacher_recordandadmin_record, and advances enqueue watermark. - SQS Poller: long-poll receiver pulling up to 10 messages per request.
- Batch Buffer: accumulates messages until
BATCH_SIZEor flush timeout. - Batch Executor: runs batch pipeline under
MAX_CONCURRENCYguard. - Inference Engine: local artifacts loaded once at startup.
- Persistence Layer: source existence check plus result upsert.
- Ack Manager: deletes only records that are completed or intentionally skipped (already checked).
6. Data Flow
- Discovery service loads watermark from
anomaly_pipeline_state. - It scans source Mongo rows for the selected
test_date, covering bothteacher_record.subjectsandadmin_record.subjects. - It filters to the configured reading subject, default
subject_name = "वाचन". - It uses
subjects[].subject_created_atas the incremental watermark timestamp and sorts by(subject_created_at, mongo_doc_id, subject_id). - It skips rows that already have a result document in
anomaly_detection_results. - It enqueues normalized messages to SQS and advances watermark only after successful enqueue.
- Worker polls SQS and parses message payload.
- Worker rechecks source existence and result idempotency.
- Worker downloads audio and runs batched inference.
- Worker bulk upserts final results into
anomaly_detection_results. - Worker deletes corresponding SQS messages only after durable result write.
7. Scalability Strategy
- Horizontal: multiple ECS tasks process the same queue safely.
- Horizontal discovery is possible, but simplest deployment is a single discovery task per
test_date. - Vertical within task: controlled parallelism via
MAX_CONCURRENCY. - Backpressure: bounded buffer prevents unbounded memory growth.
- Batch I/O: bulk read/filter and bulk write to reduce DB round-trips.
Recommended autoscaling input:
- CloudWatch
ApproximateNumberOfMessagesVisible - Preferred custom metric:
BacklogPerTask = VisibleMessages / RunningTasks
8. Reliability Strategy
- At-least-once processing semantics.
- Idempotent result upsert prevents duplicate processing effects.
- Message delete occurs only after durable DB update.
- Poison messages naturally move to DLQ after redrive max receive count.
- Graceful shutdown drains buffer and in-flight tasks before exit.
9. Performance and Capacity
- Expected shape: nightly burst after 7 PM.
- Controls:
BATCH_SIZEdefault 100MAX_CONCURRENCYdefault 4- Mongo bulk write chunk 500
- S3 download timeout 3s
- Inference batch timeout 5s
- Throughput metric: records/minute emitted every minute.
10. Security and Compliance
- IAM least privilege for SQS and S3 access.
- Secrets (Mongo URI) managed via ECS secrets manager integration.
- Private network path to Mongo (VPC peering/private endpoint).
- Structured logs avoid sensitive payload fields.
11. Deployment Topology
- Same repository and container image as API is acceptable.
- Separate ECS service/task definition for worker command:
python -m app.workers.sqs_anomaly_worker
- API and worker scale independently.
12. Risks and Mitigations
- Slow audio/model path:
- mitigate with semaphore and timeout controls.
- Mongo overload under spikes:
- mitigate via bulk writes, capped concurrency, autoscaling.
- Duplicate message delivery:
- mitigate with idempotent update condition.
- Queue backlog growth:
- mitigate with target-tracking autoscaling and nightly pre-scale.