Audio Anomaly Pipeline Implementation
Purpose
This document describes the implemented Mongo -> SQS -> worker pipeline used for reading-audio anomaly detection.
It covers:
- current runtime components
- Mongo data selection rules
- watermark behavior
- SQS message contract
- worker processing flow
- result persistence
- run commands
- operational debugging guidance
This is the implementation-level companion to docs/SQS_WORKER_HLD.md.
Pipeline Summary
The implemented pipeline has two runtime stages:
- Discovery / enqueue stage
- reads source Mongo documents for a given
test_date - selects only reading subject rows where
subject_name == SUBJECT_NAME - uses
subjects[].subject_created_atas the incremental ordering timestamp - pushes normalized work items into SQS
- persists enqueue watermark in
anomaly_pipeline_state
- Worker / inference stage
- reads work items from SQS
- re-checks source existence and result idempotency
- downloads audio
- extracts features
- runs model inference
- stores final results in
anomaly_detection_results - optionally writes a backlink into the source document
Source Data Selection
Source collection:
- from
MONGO_SOURCE_COLLECTION
Source documents are expected to contain:
- root student metadata
assessments[]assessments[].test_dateassessments[].teacher_record.subjects[]assessments[].admin_record.subjects[]
Discovery currently supports both:
teacher_record.subjectsadmin_record.subjects
The discovery query filters to:
assessments.test_date == TEST_DATEsubjects.subject_name == SUBJECT_NAMEsubjects.audio_urlis a non-empty stringsubjects.subject_created_atexists and is a date
Current default:
SUBJECT_NAME=वाचन
Incremental Watermark Rule
The implemented incremental timestamp is:
subjects[].subject_created_at
It is projected into the normalized field:
source_updated_at
Watermark ordering is deterministic:
source_updated_atmongo_doc_idsubject_id
This avoids duplicates and ordering ambiguity when multiple records share the same timestamp.
The watermark is stored in:
anomaly_pipeline_state
Stored fields:
watermark.last_processed_timestampwatermark.last_processed_doc_idwatermark.last_processed_subject_id
Backfill vs Incremental
Backfill
Backfill starts from the beginning of the ordered set for the selected test_date.
Behavior:
- ignores watermark on the first iteration
- discovers all matching rows unless
MAX_RECORDSis set - advances watermark only after successful enqueue
- exits when no more matching rows remain or the max-record limit is reached
Incremental
Incremental resumes from the saved watermark for the selected test_date.
Behavior:
- loads watermark from
anomaly_pipeline_state - reads only records after the watermark
- sleeps for
POLL_INTERVAL_SECONDSwhen no new rows are available - exits when
MAX_RECORDSis reached for that run
Max Record Limiting
The discovery script supports a total run-level cap:
- CLI:
--max-records - env:
MAX_RECORDS
Important:
DISCOVERY_BATCH_SIZEcontrols per-iteration fetch sizeMAX_RECORDScontrols total rows for the whole run
Examples:
DISCOVERY_BATCH_SIZE=100,MAX_RECORDS=100- one full discovery batch at most
DISCOVERY_BATCH_SIZE=100,MAX_RECORDS=250- effective discovery limits become
100,100,50
- effective discovery limits become
Mongo Collections
Source Collection
Configured by:
MONGO_SOURCE_COLLECTION
Example:
student_assessment_record
Pipeline State Collection
Configured by:
MONGO_STATE_COLLECTION
Default:
anomaly_pipeline_state
Purpose:
- store enqueue progress
- store watermark
- store run status and counters
State document identity:
audio_anomaly:enqueue:<source_collection>:<test_date>
Main fields:
_idpipeline_namestagesource_collectiontest_datemodestatuswatermarkstatslast_runcreated_atupdated_at
Result Collection
Configured by:
MONGO_RESULT_COLLECTION
Default:
anomaly_detection_results
Purpose:
- store final anomaly inference output
- provide idempotent completion state
Each result document is keyed by a deterministic SHA-256 hash of:
source_collectionmongo_doc_idtest_daterecord_typesubject_idaudio_url
This prevents duplicate result writes for repeated SQS deliveries.
SQS Message Contract
Discovery sends one SQS message per audio item.
Current message fields:
source_collectionmongo_doc_idtest_daterecord_typesubject_idsubject_nameaudio_s3_urlsource_updated_atsubject_created_atstudent_idudisetrace_iddiscovered_at
The worker expects this exact payload shape.
Worker Processing Flow
Worker entrypoint:
python3 -m app.workers.sqs_anomaly_worker
Worker flow:
- load model artifacts
- ensure result collection indexes
- poll SQS
- parse messages into
WorkItem - buffer messages until batch threshold or flush timeout
- check whether source row still exists
- check whether result document already exists
- download audio
- extract features
- run model inference
- upsert result rows
- optionally update source backlinks
- ack successful messages in SQS
Messages that fail before durable result write are not acked. They become visible again after SQS visibility timeout and are retried.
Idempotency Rules
Idempotency is enforced at two levels:
- Discovery stage
- excludes rows that already exist in
anomaly_detection_results
- Worker stage
- checks for existing result ids before processing
- final write is an upsert keyed by deterministic
_id
This allows at-least-once SQS delivery without duplicate result documents.
Retry Behavior
Expected retry behavior:
- if audio download fails, message is not acked
- if feature extraction fails, message is not acked
- if inference fails, message is not acked
- if Mongo result upsert fails, message is not acked
After SQS visibility timeout expires, the message becomes visible again and may be retried.
This is why repeated receive counts can appear in worker logs.
Current Failure Mode Observed
One observed runtime issue is a high number of failures during:
item_failed_pre_inference
This means the failure happened before model prediction, most commonly in:
- audio download
- feature extraction
Given the current defaults:
s3_timeout_seconds = 3.0
the most likely practical cause is slow or timing-out audio downloads.
The worker logs already distinguish:
- source eligibility issues
- existing result dedupe issues
- pre-inference failures
- successful result writes
Logging and Debugging
Logging is JSON structured.
Useful discovery events:
state_run_startedwatermark_loadedwatermark_missingdiscovery_run_starteddiscovery_run_once_starteddiscovery_page_fetch_starteddiscovery_page_fetch_completeddiscovery_page_fetcheddiscovery_candidates_resolvedenqueue_chunk_startedenqueue_chunk_completedenqueue_batch_failedwatermark_advancedstate_run_finished
Useful worker events:
worker_startedmessages_polledmessage_bufferedbatch_dispatchedbatch_processing_startedexisting_results_checkedeligible_keys_resolvedinference_batch_starteditem_failed_pre_inferencefeature_build_completedinference_batch_completedresults_upsertedprocessed_items_ackedbatch_processedthroughput
Recommended debug setting:
LOG_LEVEL=INFO
Environment Variables
Important worker variables:
SQS_QUEUE_URLMONGO_URIMONGO_DB_NAMEMONGO_SOURCE_COLLECTIONMONGO_RESULT_COLLECTIONPIPELINE_NAMEBATCH_SIZEMAX_CONCURRENCY
Important discovery variables:
AWS_REGIONTEST_DATESUBJECT_NAMEDISCOVERY_BATCH_SIZEMAX_RECORDSPOLL_INTERVAL_SECONDSMONGO_TO_SQS_MODEMONGO_STATE_COLLECTIONMONGO_RESULT_COLLECTION
Optional:
ENABLE_SOURCE_BACKLINK_UPDATESQS_FIFO_GROUP_IDLOG_LEVEL
Running the Pipeline
Run worker:
cd /Users/pushkar/workspace/projects/vopa/anaomoly-detection/audio-anomaly-detection
PYTHONPATH=. python3 -m app.workers.sqs_anomaly_worker
Run backfill discovery:
cd /Users/pushkar/workspace/projects/vopa/anaomoly-detection/audio-anomaly-detection
PYTHONPATH=. python3 scripts/mongo_to_sqs_stream.py --mode backfill --test-date 2025-12-20 --batch-size 100 --max-records 100
Run incremental discovery:
cd /Users/pushkar/workspace/projects/vopa/anaomoly-detection/audio-anomaly-detection
PYTHONPATH=. python3 scripts/mongo_to_sqs_stream.py --mode incremental --test-date 2025-12-20 --batch-size 100 --max-records 100
Validation Queries
Check state:
db.anomaly_pipeline_state.find({ test_date: "2025-12-20" }).pretty()
Check result count:
db.anomaly_detection_results.countDocuments({
"source.test_date": ISODate("2025-12-20T00:00:00.000Z")
})
Check sample results:
db.anomaly_detection_results.find({
"source.test_date": ISODate("2025-12-20T00:00:00.000Z")
}).limit(5).pretty()
Recommended Mongo Indexes
Source collection:
db.student_assessment_record.createIndex({ "assessments.test_date": 1 })
db.student_assessment_record.createIndex({ "assessments.teacher_record.subjects.subject_name": 1 })
db.student_assessment_record.createIndex({ "assessments.admin_record.subjects.subject_name": 1 })
db.student_assessment_record.createIndex({ "assessments.teacher_record.subjects.subject_created_at": 1 })
db.student_assessment_record.createIndex({ "assessments.admin_record.subjects.subject_created_at": 1 })
State collection:
db.anomaly_pipeline_state.createIndex(
{ pipeline_name: 1, stage: 1, source_collection: 1, test_date: 1 },
{ unique: true }
)
Result collection:
db.anomaly_detection_results.createIndex({
"source.mongo_doc_id": 1,
"source.test_date": 1,
"source.record_type": 1,
"source.subject_id": 1
})
db.anomaly_detection_results.createIndex({
"source.test_date": 1,
"processing.processed_at": 1
})
db.anomaly_detection_results.createIndex({
"source.test_date": 1,
"inference.is_anomaly": 1
})
Implementation Files
Main implementation files:
app/workers/common.pyapp/workers/discovery_service.pyapp/workers/mongo_repository.pyapp/workers/sqs_client.pyapp/workers/worker_service.pyapp/workers/inference_engine.pyscripts/mongo_to_sqs_stream.pyscripts/seed_sqs.pyscripts/push_audio_urls_to_sqs.py
Tests:
tests/test_worker_pipeline.py