Source to SQS to Destination Flow
Purpose
This document explains the current end-to-end pipeline used in this repository:
- Source discovery
- SQS enqueue
- SQS worker processing
- Destination writes
It also includes the commands used to run each stage locally.
Current Source Modes
The worker supports two source modes:
mongopostgres
The active source mode is selected by:
DISCOVERY_SOURCE_TYPE=postgres
This is resolved in app/workers/repositories/source_repository_factory.py.
End-to-End Summary
Source rows/documents
-> discovery service
-> normalized discovery records
-> SQS messages
-> SQS worker
-> eligibility + idempotency checks
-> audio download
-> feature extraction
-> model inference
-> Mongo result upsert
-> CSV write
-> SQS ack
Stage 1: Source Discovery
Entrypoint:
python3 scripts/mongo_to_sqs_stream.py
Script:
Service:
What discovery does:
- loads settings from
.env - reads source records for the configured
TEST_DATE - filters by
SUBJECT_NAME - skips records whose result id already exists in Mongo results
- normalizes each record into a queue payload
- sends messages to SQS in batches of 10
- advances the watermark in
anomaly_pipeline_state
Postgres Discovery
When DISCOVERY_SOURCE_TYPE=postgres, discovery reads from:
POSTGRES_SCHEMAPOSTGRES_TABLE
Current source repository:
It fetches rows with:
- matching
test_date - matching
subject_name - non-empty
audio_url - non-null
subject_created_at
It converts each source row into a DiscoveryRecord:
Mongo Discovery
When DISCOVERY_SOURCE_TYPE=mongo, discovery reads from:
MONGO_SOURCE_COLLECTION
Repository:
It scans:
teacher_record.subjectsadmin_record.subjects
Stage 2: SQS Message Creation
Discovery builds one SQS message per normalized audio record.
The payload is created in:
Current message fields:
source_collectionmongo_doc_idsource_record_idassessment_record_idfor Postgrestest_daterecord_typesubject_idsubject_nameaudio_s3_urlsource_updated_atsubject_created_atstudent_idudisetrace_iddiscovered_at
These messages are pushed using send_message_batch to:
SQS_QUEUE_URL
Stage 3: Worker Startup
Entrypoint:
python3 -m app.workers.sqs_anomaly_worker
Module:
Worker service:
At startup the worker:
- loads env settings
- creates the SQS client
- creates the source repository
- creates the Mongo result lookup/destination
- creates the CSV destination
- loads model artifacts
- starts long-polling SQS
Stage 4: SQS Receive and Parse
SQS receive/parsing is handled by:
The worker:
- long-polls SQS
- receives up to 10 messages per request
- parses each message into a
WorkItem
Important:
- for Postgres,
assessment_record_id/source_record_idis used as the work item id - malformed messages are logged and deleted
Stage 5: Batch Assembly
The worker buffers WorkItems and flushes batches based on:
BATCH_SIZEMAX_CONCURRENCYmax_batch_wait_seconds
This happens in:
Stage 6: Idempotency and Eligibility Checks
Before inference, the worker does two checks.
1. Existing Result Check
For each batch item, the worker computes a deterministic result_document_id.
ID builder:
Current dedupe basis:
source_collectionstudent_idsubject_nametest_dateaudio_s3_url
Those ids are checked in Mongo results by:
Matching ids contribute to:
existing_result_countalready_existing_count
2. Source Eligibility Check
The worker then verifies that the source row/document still exists.
Postgres path:
Mongo path:
Outcomes:
eligible_itemsskipped_existingskipped_ineligible
Skipped items are acknowledged and now also written to CSV with a status.
Stage 7: Audio Download and Inference
Inference engine:
For each eligible item:
- download audio from
audio_s3_url - decode waveform with
librosa - extract features
- scale features
- run model prediction
Current download hardening includes:
- configurable total/connect/read timeout
- retry for transient download errors
- capped in-batch download concurrency
Important logs:
inference_batch_starteditem_failed_pre_inferencefeature_build_completedinference_batch_completed
Stage 8: Destination Writes
Mongo Results
Mongo result destination:
Completed inference rows are upserted into:
MONGO_RESULT_COLLECTION
Default:
anomaly_detection_results
CSV Output
CSV destination:
CSV output path comes from:
CSV_OUTPUT_PATH
The CSV now records all item outcomes, not only successful inference:
completedskipped_existingskipped_ineligiblefailed_pre_inference
Current CSV header:
result_document_id,dedupe_key,source_collection,mongo_doc_id,test_date,record_type,subject_id,subject_name,audio_s3_url,source_updated_at,subject_created_at,student_id,udise,processing_status,error,is_anomaly,anomaly_score,prediction_label,model_name,model_version,trace_id,message_id,receive_count,processed_at
Stage 9: SQS Acknowledgement
The worker deletes SQS messages only after the relevant terminal handling:
- skipped items: delete after skip decision
- completed items: delete after Mongo write
- malformed messages: delete immediately
Delete handling:
Items that fail pre-inference are not automatically acknowledged by the success path and remain retryable based on queue behavior.
Main Commands
1. Run Postgres Discovery
One-time bounded run:
PYTHONPATH=. DISCOVERY_SOURCE_TYPE=postgres python3 scripts/mongo_to_sqs_stream.py --mode one_time --test-date 2026-02-23 --batch-size 100 --max-records 100
Backfill run:
PYTHONPATH=. DISCOVERY_SOURCE_TYPE=postgres python3 scripts/mongo_to_sqs_stream.py --mode backfill --test-date 2026-02-23 --batch-size 1000
Incremental run:
PYTHONPATH=. DISCOVERY_SOURCE_TYPE=postgres python3 scripts/mongo_to_sqs_stream.py --mode incremental --test-date 2026-02-23
2. Run Mongo Discovery
PYTHONPATH=. DISCOVERY_SOURCE_TYPE=mongo python3 scripts/mongo_to_sqs_stream.py --mode backfill --test-date 2026-02-23 --batch-size 1000
3. Run the SQS Worker
PYTHONPATH=. python3 -m app.workers.sqs_anomaly_worker
4. Run Focused Tests
python3 -m pytest tests/test_postgres_worker_repository.py tests/test_csv_result_destination.py -q
Current Env Vars Used by This Flow
Core worker/discovery vars:
SQS_QUEUE_URLMONGO_URIMONGO_DB_NAMEMONGO_SOURCE_COLLECTIONMONGO_STATE_COLLECTIONMONGO_RESULT_COLLECTIONDISCOVERY_SOURCE_TYPEPOSTGRES_DSNPOSTGRES_SCHEMAPOSTGRES_TABLETEST_DATESUBJECT_NAMEBATCH_SIZEMAX_CONCURRENCYDISCOVERY_BATCH_SIZECSV_OUTPUT_PATH
Timeout / retry controls:
S3_TIMEOUT_SECONDSS3_CONNECT_TIMEOUT_SECONDSS3_SOCK_READ_TIMEOUT_SECONDSS3_MAX_RETRIESS3_RETRY_BACKOFF_SECONDSDOWNLOAD_MAX_CONCURRENCYINFERENCE_TIMEOUT_SECONDS
How to Read the Logs
Useful log meanings:
-
existing_results_checked- how many computed result ids already exist in Mongo results
-
eligible_keys_resolved- how many batch items are still valid for processing
-
batch_skipped- no eligible items remained; all were skipped
-
batch_processed- some items were processed successfully
-
item_failed_pre_inference- failed before model prediction, typically download or feature extraction
-
results_upserted- Mongo results written
-
results_csv_appended- CSV rows appended
-
throughput- per-minute ack rate and cumulative counts
Practical Interpretation
If you enqueue 10,000 records:
- not all
10,000necessarily becomecompleted - some may be
skipped_existing - some may be
skipped_ineligible - some may be
failed_pre_inference
With the current CSV behavior, all of those outcomes are now visible in the CSV.