-- Async ingest diagnostics without filesystem logs. -- -- Replace the values in params before running. -- completion_size must match eventhub.batch.completion-size from application.yml. -- 1) Import-run overview. with params as ( select '667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id, '54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id, 5000::integer as completion_size ) select p.id, p.package_type, p.status, p.batch_no, p.event_family, p.extraction_code, p.extraction_source_kind, p.chunk_from, p.chunk_to, p.event_count, p.error_message, p.received_at, p.completed_at from eventhub.data_package p join params prm on prm.import_run_id = p.import_run_id order by p.package_type, p.batch_no, p.received_at, p.package_key; -- 2) Exact async-ingest status for one DB_EXTRACT package. -- event_count on DB_EXTRACT is populated after executeBatch returns and before -- async wait starts, so it represents extracted/mapped events while waiting. with params as ( select '667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id, '54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id, 5000::integer as completion_size ), extract_package as ( select p.id, p.import_run_id, p.event_source_id, p.tenant_key, p.package_key, p.status, p.event_count as extracted_event_count, p.event_family, p.business_date, p.extraction_code, p.extraction_source_kind, p.source_group_type, p.source_group_entity_id, p.source_group_code, p.import_scope_type, p.root_source_org_entity_id, p.root_source_org_code, p.include_children, p.chunk_from, p.chunk_to, coalesce((p.metadata ->> 'chunkSequence')::integer, 1) as chunk_sequence, es.provider_key, es.source_kind, es.source_key, es.source_instance_key, es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code || ':RUN-' || p.import_run_id::text || ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as async_external_package_id, p.tenant_key || ':' || es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':' || case when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null then 'NO_GROUP' else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '') end || ':' || coalesce(p.import_scope_type, 'TENANT_ALL') || ':' || case when p.root_source_org_entity_id is null and p.root_source_org_code is null then 'ALL' else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '') end || ':' || case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':' || coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':' || coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':' || p.event_family || ':' || coalesce(p.business_date::text, 'NO_DATE') || ':' || es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code || ':RUN-' || p.import_run_id::text || ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key from eventhub.data_package p join eventhub.event_source es on es.id = p.event_source_id join params prm on prm.extraction_package_id = p.id where p.package_type = 'DB_EXTRACT' ), camel_batches as ( select c.id, c.package_key, c.status, c.received_at, c.completed_at, c.event_count as inserted_count, coalesce((c.metadata ->> 'eventCount')::integer, 0) as received_count, c.metadata ->> 'externalPackageId' as external_package_id, c.metadata ->> 'aggregatePackageKey' as aggregate_package_key, c.error_message from eventhub.data_package c join extract_package e on c.event_source_id = e.event_source_id and c.tenant_key = e.tenant_key where c.package_type = 'CAMEL_BATCH' and c.package_key like e.aggregate_package_key || ':CAMEL-%' ), camel_summary as ( select count(*) as observed_camel_batches, count(*) filter (where status in ('IMPORTED', 'EMPTY')) as successful_camel_batches, count(*) filter (where status = 'FAILED') as failed_camel_batches, count(*) filter (where status = 'IMPORTING') as importing_camel_batches, coalesce(sum(received_count), 0) as received_events, coalesce(sum(inserted_count), 0) as inserted_events from camel_batches ) select e.id as extraction_package_id, e.import_run_id, e.package_key as extraction_package_key, e.status as extraction_status, e.extracted_event_count, case when e.extracted_event_count <= 0 then 0 else ((e.extracted_event_count - 1) / prm.completion_size) + 1 end as expected_camel_batches, s.observed_camel_batches, s.successful_camel_batches, s.failed_camel_batches, s.importing_camel_batches, s.received_events, s.inserted_events, e.async_external_package_id, e.aggregate_package_key from extract_package e cross join camel_summary s cross join params prm; -- 3) Child CAMEL_BATCH rows for the selected DB_EXTRACT package. with params as ( select '667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id, '54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id, 5000::integer as completion_size ), extract_package as ( select p.id, p.event_source_id, p.tenant_key, p.tenant_key || ':' || es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':' || case when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null then 'NO_GROUP' else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '') end || ':' || coalesce(p.import_scope_type, 'TENANT_ALL') || ':' || case when p.root_source_org_entity_id is null and p.root_source_org_code is null then 'ALL' else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '') end || ':' || case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':' || coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':' || coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':' || p.event_family || ':' || coalesce(p.business_date::text, 'NO_DATE') || ':' || es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code || ':RUN-' || p.import_run_id::text || ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key from eventhub.data_package p join eventhub.event_source es on es.id = p.event_source_id join params prm on prm.extraction_package_id = p.id where p.package_type = 'DB_EXTRACT' ) select c.id, c.package_key, c.status, c.received_at, c.completed_at, coalesce((c.metadata ->> 'eventCount')::integer, 0) as received_count, c.event_count as inserted_count, c.metadata ->> 'externalPackageId' as external_package_id, c.metadata ->> 'aggregatePackageKey' as aggregate_package_key, c.error_message from eventhub.data_package c join extract_package e on c.event_source_id = e.event_source_id and c.tenant_key = e.tenant_key where c.package_type = 'CAMEL_BATCH' and c.package_key like e.aggregate_package_key || ':CAMEL-%' order by c.received_at, c.package_key; -- 4) Recent CAMEL_BATCH rows for the run, independent of a single DB_EXTRACT package. with params as ( select '667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id, '54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id, 5000::integer as completion_size ) select c.id, c.event_source_id, c.package_key, c.status, c.received_at, c.completed_at, c.event_count as inserted_count, c.metadata ->> 'eventFamily' as event_family, c.metadata ->> 'eventSource' as event_source, c.metadata ->> 'externalPackageId' as external_package_id, c.metadata ->> 'aggregatePackageKey' as aggregate_package_key from eventhub.data_package c join params prm on c.package_type = 'CAMEL_BATCH' where c.package_key like '%RUN-' || prm.import_run_id::text || '%' or coalesce(c.metadata ->> 'externalPackageId', '') like '%RUN-' || prm.import_run_id::text || '%' order by c.received_at desc, c.package_key desc; -- 5) Per event type: extracted vs received by CAMEL vs imported into eventhub.event. with params as ( select '4d21ef91-c979-451b-9055-d574506843bf'::uuid as import_run_id, '20679175-b481-4439-9f66-9b7c7ed336fb'::uuid as extraction_package_id, 5000::integer as completion_size ), extract_package as ( select p.id, p.import_run_id, p.event_source_id, p.tenant_key, p.event_family, p.business_date, p.extraction_code, p.extraction_source_kind, p.source_group_type, p.source_group_entity_id, p.source_group_code, p.import_scope_type, p.root_source_org_entity_id, p.root_source_org_code, p.include_children, p.metadata, es.provider_key, es.source_kind, es.source_key, es.source_instance_key, p.tenant_key || ':' || es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':' || case when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null then 'NO_GROUP' else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '') end || ':' || coalesce(p.import_scope_type, 'TENANT_ALL') || ':' || case when p.root_source_org_entity_id is null and p.root_source_org_code is null then 'ALL' else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '') end || ':' || case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':' || coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':' || coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':' || p.event_family || ':' || coalesce(p.business_date::text, 'NO_DATE') || ':' || es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code || ':RUN-' || p.import_run_id::text || ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key from eventhub.data_package p join eventhub.event_source es on es.id = p.event_source_id join params prm on prm.extraction_package_id = p.id where p.package_type = 'DB_EXTRACT' ), camel_batches as ( select c.* from eventhub.data_package c join extract_package e on c.event_source_id = e.event_source_id and c.tenant_key = e.tenant_key where c.package_type = 'CAMEL_BATCH' and c.package_key like e.aggregate_package_key || ':CAMEL-%' ), extracted_by_type as ( select key as event_type_key, value::integer as extracted_events from extract_package e, jsonb_each_text(coalesce(e.metadata -> 'extractedEventTypeCounts', '{}'::jsonb)) ), received_by_type as ( select kv.key as event_type_key, sum((kv.value)::integer) as received_events from camel_batches c, jsonb_each_text(coalesce(c.metadata -> 'receivedEventTypeCounts', '{}'::jsonb)) kv group by kv.key ), imported_by_type as ( select e.event_domain || '/' || e.event_type as event_type_key, count(*) as imported_events from camel_batches c join eventhub.event e on e.data_package_id = c.id group by e.event_domain || '/' || e.event_type ) select coalesce(x.event_type_key, r.event_type_key, i.event_type_key) as event_type_key, coalesce(x.extracted_events, 0) as extracted_events, coalesce(r.received_events, 0) as received_events, coalesce(i.imported_events, 0) as imported_events from extracted_by_type x full outer join received_by_type r on r.event_type_key = x.event_type_key full outer join imported_by_type i on i.event_type_key = coalesce(x.event_type_key, r.event_type_key) order by event_type_key;