339 lines
13 KiB
SQL
339 lines
13 KiB
SQL
-- Async ingest diagnostics without filesystem logs.
|
|
--
|
|
-- Replace the values in params before running.
|
|
-- completion_size must match eventhub.batch.completion-size from application.yml.
|
|
|
|
-- 1) Import-run overview.
|
|
with params as (
|
|
select
|
|
'667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id,
|
|
'54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id,
|
|
5000::integer as completion_size
|
|
)
|
|
select
|
|
p.id,
|
|
p.package_type,
|
|
p.status,
|
|
p.batch_no,
|
|
p.event_family,
|
|
p.extraction_code,
|
|
p.extraction_source_kind,
|
|
p.chunk_from,
|
|
p.chunk_to,
|
|
p.event_count,
|
|
p.error_message,
|
|
p.received_at,
|
|
p.completed_at
|
|
from eventhub.data_package p
|
|
join params prm on prm.import_run_id = p.import_run_id
|
|
order by p.package_type, p.batch_no, p.received_at, p.package_key;
|
|
|
|
|
|
-- 2) Exact async-ingest status for one DB_EXTRACT package.
|
|
-- event_count on DB_EXTRACT is populated after executeBatch returns and before
|
|
-- async wait starts, so it represents extracted/mapped events while waiting.
|
|
with params as (
|
|
select
|
|
'667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id,
|
|
'54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id,
|
|
5000::integer as completion_size
|
|
),
|
|
extract_package as (
|
|
select
|
|
p.id,
|
|
p.import_run_id,
|
|
p.event_source_id,
|
|
p.tenant_key,
|
|
p.package_key,
|
|
p.status,
|
|
p.event_count as extracted_event_count,
|
|
p.event_family,
|
|
p.business_date,
|
|
p.extraction_code,
|
|
p.extraction_source_kind,
|
|
p.source_group_type,
|
|
p.source_group_entity_id,
|
|
p.source_group_code,
|
|
p.import_scope_type,
|
|
p.root_source_org_entity_id,
|
|
p.root_source_org_code,
|
|
p.include_children,
|
|
p.chunk_from,
|
|
p.chunk_to,
|
|
coalesce((p.metadata ->> 'chunkSequence')::integer, 1) as chunk_sequence,
|
|
es.provider_key,
|
|
es.source_kind,
|
|
es.source_key,
|
|
es.source_instance_key,
|
|
es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code
|
|
|| ':RUN-' || p.import_run_id::text
|
|
|| ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as async_external_package_id,
|
|
p.tenant_key || ':'
|
|
|| es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':'
|
|
|| case
|
|
when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null
|
|
then 'NO_GROUP'
|
|
else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '')
|
|
end || ':'
|
|
|| coalesce(p.import_scope_type, 'TENANT_ALL') || ':'
|
|
|| case
|
|
when p.root_source_org_entity_id is null and p.root_source_org_code is null
|
|
then 'ALL'
|
|
else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '')
|
|
end || ':'
|
|
|| case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':'
|
|
|| coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':'
|
|
|| coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':'
|
|
|| p.event_family || ':'
|
|
|| coalesce(p.business_date::text, 'NO_DATE') || ':'
|
|
|| es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code
|
|
|| ':RUN-' || p.import_run_id::text
|
|
|| ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key
|
|
from eventhub.data_package p
|
|
join eventhub.event_source es
|
|
on es.id = p.event_source_id
|
|
join params prm
|
|
on prm.extraction_package_id = p.id
|
|
where p.package_type = 'DB_EXTRACT'
|
|
),
|
|
camel_batches as (
|
|
select
|
|
c.id,
|
|
c.package_key,
|
|
c.status,
|
|
c.received_at,
|
|
c.completed_at,
|
|
c.event_count as inserted_count,
|
|
coalesce((c.metadata ->> 'eventCount')::integer, 0) as received_count,
|
|
c.metadata ->> 'externalPackageId' as external_package_id,
|
|
c.metadata ->> 'aggregatePackageKey' as aggregate_package_key,
|
|
c.error_message
|
|
from eventhub.data_package c
|
|
join extract_package e
|
|
on c.event_source_id = e.event_source_id
|
|
and c.tenant_key = e.tenant_key
|
|
where c.package_type = 'CAMEL_BATCH'
|
|
and c.package_key like e.aggregate_package_key || ':CAMEL-%'
|
|
),
|
|
camel_summary as (
|
|
select
|
|
count(*) as observed_camel_batches,
|
|
count(*) filter (where status in ('IMPORTED', 'EMPTY')) as successful_camel_batches,
|
|
count(*) filter (where status = 'FAILED') as failed_camel_batches,
|
|
count(*) filter (where status = 'IMPORTING') as importing_camel_batches,
|
|
coalesce(sum(received_count), 0) as received_events,
|
|
coalesce(sum(inserted_count), 0) as inserted_events
|
|
from camel_batches
|
|
)
|
|
select
|
|
e.id as extraction_package_id,
|
|
e.import_run_id,
|
|
e.package_key as extraction_package_key,
|
|
e.status as extraction_status,
|
|
e.extracted_event_count,
|
|
case
|
|
when e.extracted_event_count <= 0 then 0
|
|
else ((e.extracted_event_count - 1) / prm.completion_size) + 1
|
|
end as expected_camel_batches,
|
|
s.observed_camel_batches,
|
|
s.successful_camel_batches,
|
|
s.failed_camel_batches,
|
|
s.importing_camel_batches,
|
|
s.received_events,
|
|
s.inserted_events,
|
|
e.async_external_package_id,
|
|
e.aggregate_package_key
|
|
from extract_package e
|
|
cross join camel_summary s
|
|
cross join params prm;
|
|
|
|
|
|
-- 3) Child CAMEL_BATCH rows for the selected DB_EXTRACT package.
|
|
with params as (
|
|
select
|
|
'667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id,
|
|
'54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id,
|
|
5000::integer as completion_size
|
|
),
|
|
extract_package as (
|
|
select
|
|
p.id,
|
|
p.event_source_id,
|
|
p.tenant_key,
|
|
p.tenant_key || ':'
|
|
|| es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':'
|
|
|| case
|
|
when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null
|
|
then 'NO_GROUP'
|
|
else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '')
|
|
end || ':'
|
|
|| coalesce(p.import_scope_type, 'TENANT_ALL') || ':'
|
|
|| case
|
|
when p.root_source_org_entity_id is null and p.root_source_org_code is null
|
|
then 'ALL'
|
|
else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '')
|
|
end || ':'
|
|
|| case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':'
|
|
|| coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':'
|
|
|| coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':'
|
|
|| p.event_family || ':'
|
|
|| coalesce(p.business_date::text, 'NO_DATE') || ':'
|
|
|| es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code
|
|
|| ':RUN-' || p.import_run_id::text
|
|
|| ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key
|
|
from eventhub.data_package p
|
|
join eventhub.event_source es
|
|
on es.id = p.event_source_id
|
|
join params prm
|
|
on prm.extraction_package_id = p.id
|
|
where p.package_type = 'DB_EXTRACT'
|
|
)
|
|
select
|
|
c.id,
|
|
c.package_key,
|
|
c.status,
|
|
c.received_at,
|
|
c.completed_at,
|
|
coalesce((c.metadata ->> 'eventCount')::integer, 0) as received_count,
|
|
c.event_count as inserted_count,
|
|
c.metadata ->> 'externalPackageId' as external_package_id,
|
|
c.metadata ->> 'aggregatePackageKey' as aggregate_package_key,
|
|
c.error_message
|
|
from eventhub.data_package c
|
|
join extract_package e
|
|
on c.event_source_id = e.event_source_id
|
|
and c.tenant_key = e.tenant_key
|
|
where c.package_type = 'CAMEL_BATCH'
|
|
and c.package_key like e.aggregate_package_key || ':CAMEL-%'
|
|
order by c.received_at, c.package_key;
|
|
|
|
|
|
-- 4) Recent CAMEL_BATCH rows for the run, independent of a single DB_EXTRACT package.
|
|
with params as (
|
|
select
|
|
'667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id,
|
|
'54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id,
|
|
5000::integer as completion_size
|
|
)
|
|
select
|
|
c.id,
|
|
c.event_source_id,
|
|
c.package_key,
|
|
c.status,
|
|
c.received_at,
|
|
c.completed_at,
|
|
c.event_count as inserted_count,
|
|
c.metadata ->> 'eventFamily' as event_family,
|
|
c.metadata ->> 'eventSource' as event_source,
|
|
c.metadata ->> 'externalPackageId' as external_package_id,
|
|
c.metadata ->> 'aggregatePackageKey' as aggregate_package_key
|
|
from eventhub.data_package c
|
|
join params prm
|
|
on c.package_type = 'CAMEL_BATCH'
|
|
where c.package_key like '%RUN-' || prm.import_run_id::text || '%'
|
|
or coalesce(c.metadata ->> 'externalPackageId', '') like '%RUN-' || prm.import_run_id::text || '%'
|
|
order by c.received_at desc, c.package_key desc;
|
|
|
|
|
|
-- 5) Per event type: extracted vs received by CAMEL vs imported into eventhub.event.
|
|
with params as (
|
|
select
|
|
'4d21ef91-c979-451b-9055-d574506843bf'::uuid as import_run_id,
|
|
'20679175-b481-4439-9f66-9b7c7ed336fb'::uuid as extraction_package_id,
|
|
5000::integer as completion_size
|
|
),
|
|
extract_package as (
|
|
select
|
|
p.id,
|
|
p.import_run_id,
|
|
p.event_source_id,
|
|
p.tenant_key,
|
|
p.event_family,
|
|
p.business_date,
|
|
p.extraction_code,
|
|
p.extraction_source_kind,
|
|
p.source_group_type,
|
|
p.source_group_entity_id,
|
|
p.source_group_code,
|
|
p.import_scope_type,
|
|
p.root_source_org_entity_id,
|
|
p.root_source_org_code,
|
|
p.include_children,
|
|
p.metadata,
|
|
es.provider_key,
|
|
es.source_kind,
|
|
es.source_key,
|
|
es.source_instance_key,
|
|
p.tenant_key || ':'
|
|
|| es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':'
|
|
|| case
|
|
when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null
|
|
then 'NO_GROUP'
|
|
else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '')
|
|
end || ':'
|
|
|| coalesce(p.import_scope_type, 'TENANT_ALL') || ':'
|
|
|| case
|
|
when p.root_source_org_entity_id is null and p.root_source_org_code is null
|
|
then 'ALL'
|
|
else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '')
|
|
end || ':'
|
|
|| case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':'
|
|
|| coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':'
|
|
|| coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':'
|
|
|| p.event_family || ':'
|
|
|| coalesce(p.business_date::text, 'NO_DATE') || ':'
|
|
|| es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code
|
|
|| ':RUN-' || p.import_run_id::text
|
|
|| ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key
|
|
from eventhub.data_package p
|
|
join eventhub.event_source es
|
|
on es.id = p.event_source_id
|
|
join params prm
|
|
on prm.extraction_package_id = p.id
|
|
where p.package_type = 'DB_EXTRACT'
|
|
),
|
|
camel_batches as (
|
|
select c.*
|
|
from eventhub.data_package c
|
|
join extract_package e
|
|
on c.event_source_id = e.event_source_id
|
|
and c.tenant_key = e.tenant_key
|
|
where c.package_type = 'CAMEL_BATCH'
|
|
and c.package_key like e.aggregate_package_key || ':CAMEL-%'
|
|
),
|
|
extracted_by_type as (
|
|
select
|
|
key as event_type_key,
|
|
value::integer as extracted_events
|
|
from extract_package e,
|
|
jsonb_each_text(coalesce(e.metadata -> 'extractedEventTypeCounts', '{}'::jsonb))
|
|
),
|
|
received_by_type as (
|
|
select
|
|
kv.key as event_type_key,
|
|
sum((kv.value)::integer) as received_events
|
|
from camel_batches c,
|
|
jsonb_each_text(coalesce(c.metadata -> 'receivedEventTypeCounts', '{}'::jsonb)) kv
|
|
group by kv.key
|
|
),
|
|
imported_by_type as (
|
|
select
|
|
e.event_domain || '/' || e.event_type as event_type_key,
|
|
count(*) as imported_events
|
|
from camel_batches c
|
|
join eventhub.event e
|
|
on e.data_package_id = c.id
|
|
group by e.event_domain || '/' || e.event_type
|
|
)
|
|
select
|
|
coalesce(x.event_type_key, r.event_type_key, i.event_type_key) as event_type_key,
|
|
coalesce(x.extracted_events, 0) as extracted_events,
|
|
coalesce(r.received_events, 0) as received_events,
|
|
coalesce(i.imported_events, 0) as imported_events
|
|
from extracted_by_type x
|
|
full outer join received_by_type r
|
|
on r.event_type_key = x.event_type_key
|
|
full outer join imported_by_type i
|
|
on i.event_type_key = coalesce(x.event_type_key, r.event_type_key)
|
|
order by event_type_key;
|