Skip to main content

Audio Anomaly Pipeline Implementation

Purpose

This document describes the implemented Mongo -> SQS -> worker pipeline used for reading-audio anomaly detection.

It covers:

  • current runtime components
  • Mongo data selection rules
  • watermark behavior
  • SQS message contract
  • worker processing flow
  • result persistence
  • run commands
  • operational debugging guidance

This is the implementation-level companion to docs/SQS_WORKER_HLD.md.

Pipeline Summary

The implemented pipeline has two runtime stages:

  1. Discovery / enqueue stage
  • reads source Mongo documents for a given test_date
  • selects only reading subject rows where subject_name == SUBJECT_NAME
  • uses subjects[].subject_created_at as the incremental ordering timestamp
  • pushes normalized work items into SQS
  • persists enqueue watermark in anomaly_pipeline_state
  1. Worker / inference stage
  • reads work items from SQS
  • re-checks source existence and result idempotency
  • downloads audio
  • extracts features
  • runs model inference
  • stores final results in anomaly_detection_results
  • optionally writes a backlink into the source document

Source Data Selection

Source collection:

  • from MONGO_SOURCE_COLLECTION

Source documents are expected to contain:

  • root student metadata
  • assessments[]
  • assessments[].test_date
  • assessments[].teacher_record.subjects[]
  • assessments[].admin_record.subjects[]

Discovery currently supports both:

  • teacher_record.subjects
  • admin_record.subjects

The discovery query filters to:

  • assessments.test_date == TEST_DATE
  • subjects.subject_name == SUBJECT_NAME
  • subjects.audio_url is a non-empty string
  • subjects.subject_created_at exists and is a date

Current default:

  • SUBJECT_NAME=वाचन

Incremental Watermark Rule

The implemented incremental timestamp is:

  • subjects[].subject_created_at

It is projected into the normalized field:

  • source_updated_at

Watermark ordering is deterministic:

  1. source_updated_at
  2. mongo_doc_id
  3. subject_id

This avoids duplicates and ordering ambiguity when multiple records share the same timestamp.

The watermark is stored in:

  • anomaly_pipeline_state

Stored fields:

  • watermark.last_processed_timestamp
  • watermark.last_processed_doc_id
  • watermark.last_processed_subject_id

Backfill vs Incremental

Backfill

Backfill starts from the beginning of the ordered set for the selected test_date.

Behavior:

  • ignores watermark on the first iteration
  • discovers all matching rows unless MAX_RECORDS is set
  • advances watermark only after successful enqueue
  • exits when no more matching rows remain or the max-record limit is reached

Incremental

Incremental resumes from the saved watermark for the selected test_date.

Behavior:

  • loads watermark from anomaly_pipeline_state
  • reads only records after the watermark
  • sleeps for POLL_INTERVAL_SECONDS when no new rows are available
  • exits when MAX_RECORDS is reached for that run

Max Record Limiting

The discovery script supports a total run-level cap:

  • CLI: --max-records
  • env: MAX_RECORDS

Important:

  • DISCOVERY_BATCH_SIZE controls per-iteration fetch size
  • MAX_RECORDS controls total rows for the whole run

Examples:

  • DISCOVERY_BATCH_SIZE=100, MAX_RECORDS=100
    • one full discovery batch at most
  • DISCOVERY_BATCH_SIZE=100, MAX_RECORDS=250
    • effective discovery limits become 100, 100, 50

Mongo Collections

Source Collection

Configured by:

  • MONGO_SOURCE_COLLECTION

Example:

  • student_assessment_record

Pipeline State Collection

Configured by:

  • MONGO_STATE_COLLECTION

Default:

  • anomaly_pipeline_state

Purpose:

  • store enqueue progress
  • store watermark
  • store run status and counters

State document identity:

  • audio_anomaly:enqueue:<source_collection>:<test_date>

Main fields:

  • _id
  • pipeline_name
  • stage
  • source_collection
  • test_date
  • mode
  • status
  • watermark
  • stats
  • last_run
  • created_at
  • updated_at

Result Collection

Configured by:

  • MONGO_RESULT_COLLECTION

Default:

  • anomaly_detection_results

Purpose:

  • store final anomaly inference output
  • provide idempotent completion state

Each result document is keyed by a deterministic SHA-256 hash of:

  • source_collection
  • mongo_doc_id
  • test_date
  • record_type
  • subject_id
  • audio_url

This prevents duplicate result writes for repeated SQS deliveries.

SQS Message Contract

Discovery sends one SQS message per audio item.

Current message fields:

  • source_collection
  • mongo_doc_id
  • test_date
  • record_type
  • subject_id
  • subject_name
  • audio_s3_url
  • source_updated_at
  • subject_created_at
  • student_id
  • udise
  • trace_id
  • discovered_at

The worker expects this exact payload shape.

Worker Processing Flow

Worker entrypoint:

python3 -m app.workers.sqs_anomaly_worker

Worker flow:

  1. load model artifacts
  2. ensure result collection indexes
  3. poll SQS
  4. parse messages into WorkItem
  5. buffer messages until batch threshold or flush timeout
  6. check whether source row still exists
  7. check whether result document already exists
  8. download audio
  9. extract features
  10. run model inference
  11. upsert result rows
  12. optionally update source backlinks
  13. ack successful messages in SQS

Messages that fail before durable result write are not acked. They become visible again after SQS visibility timeout and are retried.

Idempotency Rules

Idempotency is enforced at two levels:

  1. Discovery stage
  • excludes rows that already exist in anomaly_detection_results
  1. Worker stage
  • checks for existing result ids before processing
  • final write is an upsert keyed by deterministic _id

This allows at-least-once SQS delivery without duplicate result documents.

Retry Behavior

Expected retry behavior:

  • if audio download fails, message is not acked
  • if feature extraction fails, message is not acked
  • if inference fails, message is not acked
  • if Mongo result upsert fails, message is not acked

After SQS visibility timeout expires, the message becomes visible again and may be retried.

This is why repeated receive counts can appear in worker logs.

Current Failure Mode Observed

One observed runtime issue is a high number of failures during:

  • item_failed_pre_inference

This means the failure happened before model prediction, most commonly in:

  • audio download
  • feature extraction

Given the current defaults:

  • s3_timeout_seconds = 3.0

the most likely practical cause is slow or timing-out audio downloads.

The worker logs already distinguish:

  • source eligibility issues
  • existing result dedupe issues
  • pre-inference failures
  • successful result writes

Logging and Debugging

Logging is JSON structured.

Useful discovery events:

  • state_run_started
  • watermark_loaded
  • watermark_missing
  • discovery_run_started
  • discovery_run_once_started
  • discovery_page_fetch_started
  • discovery_page_fetch_completed
  • discovery_page_fetched
  • discovery_candidates_resolved
  • enqueue_chunk_started
  • enqueue_chunk_completed
  • enqueue_batch_failed
  • watermark_advanced
  • state_run_finished

Useful worker events:

  • worker_started
  • messages_polled
  • message_buffered
  • batch_dispatched
  • batch_processing_started
  • existing_results_checked
  • eligible_keys_resolved
  • inference_batch_started
  • item_failed_pre_inference
  • feature_build_completed
  • inference_batch_completed
  • results_upserted
  • processed_items_acked
  • batch_processed
  • throughput

Recommended debug setting:

LOG_LEVEL=INFO

Environment Variables

Important worker variables:

  • SQS_QUEUE_URL
  • MONGO_URI
  • MONGO_DB_NAME
  • MONGO_SOURCE_COLLECTION
  • MONGO_RESULT_COLLECTION
  • PIPELINE_NAME
  • BATCH_SIZE
  • MAX_CONCURRENCY

Important discovery variables:

  • AWS_REGION
  • TEST_DATE
  • SUBJECT_NAME
  • DISCOVERY_BATCH_SIZE
  • MAX_RECORDS
  • POLL_INTERVAL_SECONDS
  • MONGO_TO_SQS_MODE
  • MONGO_STATE_COLLECTION
  • MONGO_RESULT_COLLECTION

Optional:

  • ENABLE_SOURCE_BACKLINK_UPDATE
  • SQS_FIFO_GROUP_ID
  • LOG_LEVEL

Running the Pipeline

Run worker:

cd /Users/pushkar/workspace/projects/vopa/anaomoly-detection/audio-anomaly-detection
PYTHONPATH=. python3 -m app.workers.sqs_anomaly_worker

Run backfill discovery:

cd /Users/pushkar/workspace/projects/vopa/anaomoly-detection/audio-anomaly-detection
PYTHONPATH=. python3 scripts/mongo_to_sqs_stream.py --mode backfill --test-date 2025-12-20 --batch-size 100 --max-records 100

Run incremental discovery:

cd /Users/pushkar/workspace/projects/vopa/anaomoly-detection/audio-anomaly-detection
PYTHONPATH=. python3 scripts/mongo_to_sqs_stream.py --mode incremental --test-date 2025-12-20 --batch-size 100 --max-records 100

Validation Queries

Check state:

db.anomaly_pipeline_state.find({ test_date: "2025-12-20" }).pretty()

Check result count:

db.anomaly_detection_results.countDocuments({
"source.test_date": ISODate("2025-12-20T00:00:00.000Z")
})

Check sample results:

db.anomaly_detection_results.find({
"source.test_date": ISODate("2025-12-20T00:00:00.000Z")
}).limit(5).pretty()

Source collection:

db.student_assessment_record.createIndex({ "assessments.test_date": 1 })
db.student_assessment_record.createIndex({ "assessments.teacher_record.subjects.subject_name": 1 })
db.student_assessment_record.createIndex({ "assessments.admin_record.subjects.subject_name": 1 })
db.student_assessment_record.createIndex({ "assessments.teacher_record.subjects.subject_created_at": 1 })
db.student_assessment_record.createIndex({ "assessments.admin_record.subjects.subject_created_at": 1 })

State collection:

db.anomaly_pipeline_state.createIndex(
{ pipeline_name: 1, stage: 1, source_collection: 1, test_date: 1 },
{ unique: true }
)

Result collection:

db.anomaly_detection_results.createIndex({
"source.mongo_doc_id": 1,
"source.test_date": 1,
"source.record_type": 1,
"source.subject_id": 1
})

db.anomaly_detection_results.createIndex({
"source.test_date": 1,
"processing.processed_at": 1
})

db.anomaly_detection_results.createIndex({
"source.test_date": 1,
"inference.is_anomaly": 1
})

Implementation Files

Main implementation files:

  • app/workers/common.py
  • app/workers/discovery_service.py
  • app/workers/mongo_repository.py
  • app/workers/sqs_client.py
  • app/workers/worker_service.py
  • app/workers/inference_engine.py
  • scripts/mongo_to_sqs_stream.py
  • scripts/seed_sqs.py
  • scripts/push_audio_urls_to_sqs.py

Tests:

  • tests/test_worker_pipeline.py