Skip to main content

Source to SQS to Destination Flow

Purpose

This document explains the current end-to-end pipeline used in this repository:

  1. Source discovery
  2. SQS enqueue
  3. SQS worker processing
  4. Destination writes

It also includes the commands used to run each stage locally.

Current Source Modes

The worker supports two source modes:

  • mongo
  • postgres

The active source mode is selected by:

DISCOVERY_SOURCE_TYPE=postgres

This is resolved in app/workers/repositories/source_repository_factory.py.

End-to-End Summary

Source rows/documents
-> discovery service
-> normalized discovery records
-> SQS messages
-> SQS worker
-> eligibility + idempotency checks
-> audio download
-> feature extraction
-> model inference
-> Mongo result upsert
-> CSV write
-> SQS ack

Stage 1: Source Discovery

Entrypoint:

python3 scripts/mongo_to_sqs_stream.py

Script:

Service:

What discovery does:

  • loads settings from .env
  • reads source records for the configured TEST_DATE
  • filters by SUBJECT_NAME
  • skips records whose result id already exists in Mongo results
  • normalizes each record into a queue payload
  • sends messages to SQS in batches of 10
  • advances the watermark in anomaly_pipeline_state

Postgres Discovery

When DISCOVERY_SOURCE_TYPE=postgres, discovery reads from:

  • POSTGRES_SCHEMA
  • POSTGRES_TABLE

Current source repository:

It fetches rows with:

  • matching test_date
  • matching subject_name
  • non-empty audio_url
  • non-null subject_created_at

It converts each source row into a DiscoveryRecord:

Mongo Discovery

When DISCOVERY_SOURCE_TYPE=mongo, discovery reads from:

  • MONGO_SOURCE_COLLECTION

Repository:

It scans:

  • teacher_record.subjects
  • admin_record.subjects

Stage 2: SQS Message Creation

Discovery builds one SQS message per normalized audio record.

The payload is created in:

Current message fields:

  • source_collection
  • mongo_doc_id
  • source_record_id
  • assessment_record_id for Postgres
  • test_date
  • record_type
  • subject_id
  • subject_name
  • audio_s3_url
  • source_updated_at
  • subject_created_at
  • student_id
  • udise
  • trace_id
  • discovered_at

These messages are pushed using send_message_batch to:

  • SQS_QUEUE_URL

Stage 3: Worker Startup

Entrypoint:

python3 -m app.workers.sqs_anomaly_worker

Module:

Worker service:

At startup the worker:

  • loads env settings
  • creates the SQS client
  • creates the source repository
  • creates the Mongo result lookup/destination
  • creates the CSV destination
  • loads model artifacts
  • starts long-polling SQS

Stage 4: SQS Receive and Parse

SQS receive/parsing is handled by:

The worker:

  • long-polls SQS
  • receives up to 10 messages per request
  • parses each message into a WorkItem

Important:

  • for Postgres, assessment_record_id / source_record_id is used as the work item id
  • malformed messages are logged and deleted

Stage 5: Batch Assembly

The worker buffers WorkItems and flushes batches based on:

  • BATCH_SIZE
  • MAX_CONCURRENCY
  • max_batch_wait_seconds

This happens in:

Stage 6: Idempotency and Eligibility Checks

Before inference, the worker does two checks.

1. Existing Result Check

For each batch item, the worker computes a deterministic result_document_id.

ID builder:

Current dedupe basis:

  • source_collection
  • student_id
  • subject_name
  • test_date
  • audio_s3_url

Those ids are checked in Mongo results by:

Matching ids contribute to:

  • existing_result_count
  • already_existing_count

2. Source Eligibility Check

The worker then verifies that the source row/document still exists.

Postgres path:

Mongo path:

Outcomes:

  • eligible_items
  • skipped_existing
  • skipped_ineligible

Skipped items are acknowledged and now also written to CSV with a status.

Stage 7: Audio Download and Inference

Inference engine:

For each eligible item:

  1. download audio from audio_s3_url
  2. decode waveform with librosa
  3. extract features
  4. scale features
  5. run model prediction

Current download hardening includes:

  • configurable total/connect/read timeout
  • retry for transient download errors
  • capped in-batch download concurrency

Important logs:

  • inference_batch_started
  • item_failed_pre_inference
  • feature_build_completed
  • inference_batch_completed

Stage 8: Destination Writes

Mongo Results

Mongo result destination:

Completed inference rows are upserted into:

  • MONGO_RESULT_COLLECTION

Default:

  • anomaly_detection_results

CSV Output

CSV destination:

CSV output path comes from:

  • CSV_OUTPUT_PATH

The CSV now records all item outcomes, not only successful inference:

  • completed
  • skipped_existing
  • skipped_ineligible
  • failed_pre_inference

Current CSV header:

result_document_id,dedupe_key,source_collection,mongo_doc_id,test_date,record_type,subject_id,subject_name,audio_s3_url,source_updated_at,subject_created_at,student_id,udise,processing_status,error,is_anomaly,anomaly_score,prediction_label,model_name,model_version,trace_id,message_id,receive_count,processed_at

Stage 9: SQS Acknowledgement

The worker deletes SQS messages only after the relevant terminal handling:

  • skipped items: delete after skip decision
  • completed items: delete after Mongo write
  • malformed messages: delete immediately

Delete handling:

Items that fail pre-inference are not automatically acknowledged by the success path and remain retryable based on queue behavior.

Main Commands

1. Run Postgres Discovery

One-time bounded run:

PYTHONPATH=. DISCOVERY_SOURCE_TYPE=postgres python3 scripts/mongo_to_sqs_stream.py --mode one_time --test-date 2026-02-23 --batch-size 100 --max-records 100

Backfill run:

PYTHONPATH=. DISCOVERY_SOURCE_TYPE=postgres python3 scripts/mongo_to_sqs_stream.py --mode backfill --test-date 2026-02-23 --batch-size 1000

Incremental run:

PYTHONPATH=. DISCOVERY_SOURCE_TYPE=postgres python3 scripts/mongo_to_sqs_stream.py --mode incremental --test-date 2026-02-23

2. Run Mongo Discovery

PYTHONPATH=. DISCOVERY_SOURCE_TYPE=mongo python3 scripts/mongo_to_sqs_stream.py --mode backfill --test-date 2026-02-23 --batch-size 1000

3. Run the SQS Worker

PYTHONPATH=. python3 -m app.workers.sqs_anomaly_worker

4. Run Focused Tests

python3 -m pytest tests/test_postgres_worker_repository.py tests/test_csv_result_destination.py -q

Current Env Vars Used by This Flow

Core worker/discovery vars:

  • SQS_QUEUE_URL
  • MONGO_URI
  • MONGO_DB_NAME
  • MONGO_SOURCE_COLLECTION
  • MONGO_STATE_COLLECTION
  • MONGO_RESULT_COLLECTION
  • DISCOVERY_SOURCE_TYPE
  • POSTGRES_DSN
  • POSTGRES_SCHEMA
  • POSTGRES_TABLE
  • TEST_DATE
  • SUBJECT_NAME
  • BATCH_SIZE
  • MAX_CONCURRENCY
  • DISCOVERY_BATCH_SIZE
  • CSV_OUTPUT_PATH

Timeout / retry controls:

  • S3_TIMEOUT_SECONDS
  • S3_CONNECT_TIMEOUT_SECONDS
  • S3_SOCK_READ_TIMEOUT_SECONDS
  • S3_MAX_RETRIES
  • S3_RETRY_BACKOFF_SECONDS
  • DOWNLOAD_MAX_CONCURRENCY
  • INFERENCE_TIMEOUT_SECONDS

How to Read the Logs

Useful log meanings:

  • existing_results_checked

    • how many computed result ids already exist in Mongo results
  • eligible_keys_resolved

    • how many batch items are still valid for processing
  • batch_skipped

    • no eligible items remained; all were skipped
  • batch_processed

    • some items were processed successfully
  • item_failed_pre_inference

    • failed before model prediction, typically download or feature extraction
  • results_upserted

    • Mongo results written
  • results_csv_appended

    • CSV rows appended
  • throughput

    • per-minute ack rate and cumulative counts

Practical Interpretation

If you enqueue 10,000 records:

  • not all 10,000 necessarily become completed
  • some may be skipped_existing
  • some may be skipped_ineligible
  • some may be failed_pre_inference

With the current CSV behavior, all of those outcomes are now visible in the CSV.