Harden async ingest tracking and diagnostics

This commit is contained in:
trifonovt 2026-05-04 11:57:51 +02:00
parent d0a8d44082
commit b8817e18d9
32 changed files with 1129 additions and 58 deletions

View File

@ -624,6 +624,10 @@ This allows a single EventHub import run to process many original tachograph pac
} }
``` ```
For tachograph JDBC extraction, `sourcePackageId` is the original `FileLog.ID`. It is stored directly on `eventhub.event.source_package_id` and also resolved to `source_package_entity_id` for joins to source master data.
`eventhub.event` is designed as a TimescaleDB hypertable partitioned by `occurred_at`. Source-record idempotency is therefore enforced through `eventhub.event_source_record(source_record_key_hash)` instead of a unique index on the hypertable, because Timescale unique indexes must include the partitioning column.
### Tachograph import endpoints ### Tachograph import endpoints
`POST /api/eventhub/acquisition/tachograph/imports/plan` returns the calculated event-family extraction plan and time chunks. `POST /api/eventhub/acquisition/tachograph/imports/plan` returns the calculated event-family extraction plan and time chunks.

View File

@ -0,0 +1,338 @@
-- Async ingest diagnostics without filesystem logs.
--
-- Replace the values in params before running.
-- completion_size must match eventhub.batch.completion-size from application.yml.
-- 1) Import-run overview.
with params as (
select
'667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id,
'54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id,
5000::integer as completion_size
)
select
p.id,
p.package_type,
p.status,
p.batch_no,
p.event_family,
p.extraction_code,
p.extraction_source_kind,
p.chunk_from,
p.chunk_to,
p.event_count,
p.error_message,
p.received_at,
p.completed_at
from eventhub.data_package p
join params prm on prm.import_run_id = p.import_run_id
order by p.package_type, p.batch_no, p.received_at, p.package_key;
-- 2) Exact async-ingest status for one DB_EXTRACT package.
-- event_count on DB_EXTRACT is populated after executeBatch returns and before
-- async wait starts, so it represents extracted/mapped events while waiting.
with params as (
select
'667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id,
'54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id,
5000::integer as completion_size
),
extract_package as (
select
p.id,
p.import_run_id,
p.event_source_id,
p.tenant_key,
p.package_key,
p.status,
p.event_count as extracted_event_count,
p.event_family,
p.business_date,
p.extraction_code,
p.extraction_source_kind,
p.source_group_type,
p.source_group_entity_id,
p.source_group_code,
p.import_scope_type,
p.root_source_org_entity_id,
p.root_source_org_code,
p.include_children,
p.chunk_from,
p.chunk_to,
coalesce((p.metadata ->> 'chunkSequence')::integer, 1) as chunk_sequence,
es.provider_key,
es.source_kind,
es.source_key,
es.source_instance_key,
es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code
|| ':RUN-' || p.import_run_id::text
|| ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as async_external_package_id,
p.tenant_key || ':'
|| es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':'
|| case
when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null
then 'NO_GROUP'
else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '')
end || ':'
|| coalesce(p.import_scope_type, 'TENANT_ALL') || ':'
|| case
when p.root_source_org_entity_id is null and p.root_source_org_code is null
then 'ALL'
else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '')
end || ':'
|| case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':'
|| coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':'
|| coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':'
|| p.event_family || ':'
|| coalesce(p.business_date::text, 'NO_DATE') || ':'
|| es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code
|| ':RUN-' || p.import_run_id::text
|| ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key
from eventhub.data_package p
join eventhub.event_source es
on es.id = p.event_source_id
join params prm
on prm.extraction_package_id = p.id
where p.package_type = 'DB_EXTRACT'
),
camel_batches as (
select
c.id,
c.package_key,
c.status,
c.received_at,
c.completed_at,
c.event_count as inserted_count,
coalesce((c.metadata ->> 'eventCount')::integer, 0) as received_count,
c.metadata ->> 'externalPackageId' as external_package_id,
c.metadata ->> 'aggregatePackageKey' as aggregate_package_key,
c.error_message
from eventhub.data_package c
join extract_package e
on c.event_source_id = e.event_source_id
and c.tenant_key = e.tenant_key
where c.package_type = 'CAMEL_BATCH'
and c.package_key like e.aggregate_package_key || ':CAMEL-%'
),
camel_summary as (
select
count(*) as observed_camel_batches,
count(*) filter (where status in ('IMPORTED', 'EMPTY')) as successful_camel_batches,
count(*) filter (where status = 'FAILED') as failed_camel_batches,
count(*) filter (where status = 'IMPORTING') as importing_camel_batches,
coalesce(sum(received_count), 0) as received_events,
coalesce(sum(inserted_count), 0) as inserted_events
from camel_batches
)
select
e.id as extraction_package_id,
e.import_run_id,
e.package_key as extraction_package_key,
e.status as extraction_status,
e.extracted_event_count,
case
when e.extracted_event_count <= 0 then 0
else ((e.extracted_event_count - 1) / prm.completion_size) + 1
end as expected_camel_batches,
s.observed_camel_batches,
s.successful_camel_batches,
s.failed_camel_batches,
s.importing_camel_batches,
s.received_events,
s.inserted_events,
e.async_external_package_id,
e.aggregate_package_key
from extract_package e
cross join camel_summary s
cross join params prm;
-- 3) Child CAMEL_BATCH rows for the selected DB_EXTRACT package.
with params as (
select
'667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id,
'54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id,
5000::integer as completion_size
),
extract_package as (
select
p.id,
p.event_source_id,
p.tenant_key,
p.tenant_key || ':'
|| es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':'
|| case
when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null
then 'NO_GROUP'
else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '')
end || ':'
|| coalesce(p.import_scope_type, 'TENANT_ALL') || ':'
|| case
when p.root_source_org_entity_id is null and p.root_source_org_code is null
then 'ALL'
else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '')
end || ':'
|| case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':'
|| coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':'
|| coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':'
|| p.event_family || ':'
|| coalesce(p.business_date::text, 'NO_DATE') || ':'
|| es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code
|| ':RUN-' || p.import_run_id::text
|| ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key
from eventhub.data_package p
join eventhub.event_source es
on es.id = p.event_source_id
join params prm
on prm.extraction_package_id = p.id
where p.package_type = 'DB_EXTRACT'
)
select
c.id,
c.package_key,
c.status,
c.received_at,
c.completed_at,
coalesce((c.metadata ->> 'eventCount')::integer, 0) as received_count,
c.event_count as inserted_count,
c.metadata ->> 'externalPackageId' as external_package_id,
c.metadata ->> 'aggregatePackageKey' as aggregate_package_key,
c.error_message
from eventhub.data_package c
join extract_package e
on c.event_source_id = e.event_source_id
and c.tenant_key = e.tenant_key
where c.package_type = 'CAMEL_BATCH'
and c.package_key like e.aggregate_package_key || ':CAMEL-%'
order by c.received_at, c.package_key;
-- 4) Recent CAMEL_BATCH rows for the run, independent of a single DB_EXTRACT package.
with params as (
select
'667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id,
'54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id,
5000::integer as completion_size
)
select
c.id,
c.event_source_id,
c.package_key,
c.status,
c.received_at,
c.completed_at,
c.event_count as inserted_count,
c.metadata ->> 'eventFamily' as event_family,
c.metadata ->> 'eventSource' as event_source,
c.metadata ->> 'externalPackageId' as external_package_id,
c.metadata ->> 'aggregatePackageKey' as aggregate_package_key
from eventhub.data_package c
join params prm
on c.package_type = 'CAMEL_BATCH'
where c.package_key like '%RUN-' || prm.import_run_id::text || '%'
or coalesce(c.metadata ->> 'externalPackageId', '') like '%RUN-' || prm.import_run_id::text || '%'
order by c.received_at desc, c.package_key desc;
-- 5) Per event type: extracted vs received by CAMEL vs imported into eventhub.event.
with params as (
select
'4d21ef91-c979-451b-9055-d574506843bf'::uuid as import_run_id,
'20679175-b481-4439-9f66-9b7c7ed336fb'::uuid as extraction_package_id,
5000::integer as completion_size
),
extract_package as (
select
p.id,
p.import_run_id,
p.event_source_id,
p.tenant_key,
p.event_family,
p.business_date,
p.extraction_code,
p.extraction_source_kind,
p.source_group_type,
p.source_group_entity_id,
p.source_group_code,
p.import_scope_type,
p.root_source_org_entity_id,
p.root_source_org_code,
p.include_children,
p.metadata,
es.provider_key,
es.source_kind,
es.source_key,
es.source_instance_key,
p.tenant_key || ':'
|| es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':'
|| case
when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null
then 'NO_GROUP'
else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '')
end || ':'
|| coalesce(p.import_scope_type, 'TENANT_ALL') || ':'
|| case
when p.root_source_org_entity_id is null and p.root_source_org_code is null
then 'ALL'
else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '')
end || ':'
|| case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':'
|| coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':'
|| coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':'
|| p.event_family || ':'
|| coalesce(p.business_date::text, 'NO_DATE') || ':'
|| es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code
|| ':RUN-' || p.import_run_id::text
|| ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key
from eventhub.data_package p
join eventhub.event_source es
on es.id = p.event_source_id
join params prm
on prm.extraction_package_id = p.id
where p.package_type = 'DB_EXTRACT'
),
camel_batches as (
select c.*
from eventhub.data_package c
join extract_package e
on c.event_source_id = e.event_source_id
and c.tenant_key = e.tenant_key
where c.package_type = 'CAMEL_BATCH'
and c.package_key like e.aggregate_package_key || ':CAMEL-%'
),
extracted_by_type as (
select
key as event_type_key,
value::integer as extracted_events
from extract_package e,
jsonb_each_text(coalesce(e.metadata -> 'extractedEventTypeCounts', '{}'::jsonb))
),
received_by_type as (
select
kv.key as event_type_key,
sum((kv.value)::integer) as received_events
from camel_batches c,
jsonb_each_text(coalesce(c.metadata -> 'receivedEventTypeCounts', '{}'::jsonb)) kv
group by kv.key
),
imported_by_type as (
select
e.event_domain || '/' || e.event_type as event_type_key,
count(*) as imported_events
from camel_batches c
join eventhub.event e
on e.data_package_id = c.id
group by e.event_domain || '/' || e.event_type
)
select
coalesce(x.event_type_key, r.event_type_key, i.event_type_key) as event_type_key,
coalesce(x.extracted_events, 0) as extracted_events,
coalesce(r.received_events, 0) as received_events,
coalesce(i.imported_events, 0) as imported_events
from extracted_by_type x
full outer join received_by_type r
on r.event_type_key = x.event_type_key
full outer join imported_by_type i
on i.event_type_key = coalesce(x.event_type_key, r.event_type_key)
order by event_type_key;

View File

@ -0,0 +1,459 @@
# Import Performance and Error Fixes
## Scope
This document summarizes the event-ingest, master-data, schema, locking, retry, and correctness fixes made in the current optimization round for the EventHub import pipeline.
It covers:
- event schema and migration fixes
- event import throughput fixes
- master-data import throughput fixes
- deadlock and transaction-visibility fixes
- connection-reset and retry handling
- async import cursor correctness fixes
- logging and operational visibility improvements
Main committed changes:
- `2e6e1aa` - `Optimize ingestion pipeline and reduce import contention`
- `bd3620b` - `Improve vehicle reference caching during ingest`
Additional related fixes are currently present in the workspace but may not yet be committed.
## 1. Schema and Migration Fixes
### 1.1 `event_detail` / hypertable ordering
Problem:
- executing `eventhub_schema_create.sql` on an empty database failed with `relation "eventhub.event_detail" does not exist`
Fix:
- create `eventhub.event_detail` before `create_hypertable(...)`
- add its foreign key after the hypertable conversion
Files:
- `src/main/resources/db/eventhub_schema_create.sql`
### 1.2 Explicit migrations for event hypertable and source-record support
Problem:
- the runtime schema evolution needed explicit migrations for hypertable conversion and `event_source_record`
Fix:
- add migration for `source_package_id` on `event`
- add migration for `event` hypertable conversion and FK recreation
- add migration to ensure `event_source_record` exists and is backfilled
Files:
- `src/main/resources/db/migration/V9__add_event_source_package_id.sql`
- `src/main/resources/db/migration/V10__make_event_hypertable.sql`
- `src/main/resources/db/migration/V11__ensure_event_source_record.sql`
## 2. Event Import Throughput Fixes
### 2.1 Replace per-event inserts with staged set-based writes
Problem:
- `EventRepository.batchInsert(...)` originally processed events one by one despite the batch API
- this caused one insert/query cycle per event and poor throughput
Fix:
- stage a whole ingest batch into `eventhub_event_import_stage`
- reserve `event_source_record` rows set-wise
- insert `eventhub.event` rows set-wise
- upsert `eventhub.event_detail` rows in batch
Files:
- `src/main/java/at/procon/eventhub/persistence/EventRepository.java`
### 2.2 Fix missing event rows when source records were reserved
Problem:
- after the set-based refactor, some runs created `event_source_record` rows without creating `event` rows
Cause:
- the insert statement reserved source records and then tried to re-read them through the base table in the same data-modifying CTE chain
Fix:
- use the `RETURNING` rows from the source-record reservation CTE directly
- also support already-existing source records that still miss the `event` row
Files:
- `src/main/java/at/procon/eventhub/persistence/EventRepository.java`
### 2.3 Stream extraction instead of materializing full result sets
Problem:
- extraction loaded full SQL chunks into memory before handing them to Camel
Fix:
- stream rows directly from JDBC to `direct:eventhub-normalized-input`
- keep only counters and watermark information
Files:
- `src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java`
- `src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java`
### 2.4 Increase batch size and enable parallel queue draining
Problem:
- the async ingest route drained too slowly with `1000`-event batches and a single consumer
Fix:
- raise Camel completion size from `1000` to `5000`
- enable `4` concurrent SEDA consumers
Files:
- `src/main/java/at/procon/eventhub/config/EventHubProperties.java`
- `src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java`
- `src/main/resources/application.yml`
### 2.5 Give each Camel flush its own package key
Problem:
- multiple flushes of the same extraction package reused the same `data_package` identity
- logs were misleading and `event_count` on the package row was overwritten by later flushes
Fix:
- derive a unique `packageKey` per completed Camel batch using the aggregate package key plus the Camel exchange id
- preserve both the aggregate key and the child key in metadata
Files:
- `src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java`
### 2.6 Improve batch-local entity and vehicle caching
Problem:
- after the bulk insert refactor, the main remaining hot path was still reference resolution
Fixes:
- cache entity ids in the batch by `entityType + sourceEntityId`
- cache vehicle resolutions inside a batch
- later extend vehicle caching to be range-aware for registration-based assignment lookups:
- direct vehicle identifiers cache without time sensitivity
- registration-based resolutions cache over assignment validity intervals
Files:
- `src/main/java/at/procon/eventhub/persistence/EventRepository.java`
- `src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java`
## 3. Master-Data Import Throughput Fixes
### 3.1 Set-based master entity and relation upserts
Problem:
- source master data was previously written row by row
Fix:
- stage master entities and relations into temporary tables
- run set-based `insert ... select ... on conflict do update`
Files:
- `src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java`
### 3.2 Stream and chunk master-data refresh
Problem:
- the refresh path loaded large source master-data result sets into memory
Fix:
- stream source rows
- flush in chunks of `5000`
Files:
- `src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java`
### 3.3 Bulk vehicle reconciliation from master data
Problem:
- reconciling vehicles and registrations from master data was done row by row
Fix:
- replace the loop with set-based SQL for vehicles, registrations, and projected assignments
Files:
- `src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java`
## 4. Deadlock and Contention Fixes
### 4.1 Remove unnecessary hot-row updates on vehicle and registration rows
Problem:
- event import updated `vehicle.updated_at` and `vehicle_registration.updated_at` even when no new information was being added
- this created deadlocks under parallel ingest
Fix:
- only update `vehicle` when missing `source_vehicle_entity_id` or `vin` can actually be filled
- only update `vehicle_registration` when missing source id, nation, or registration number can actually be filled
- stop using event import as a generic "touch row" path
Files:
- `src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java`
### 4.2 Make event-time source master entity resolution "find or create", not "update on conflict"
Problem:
- concurrent event batches could deadlock on `eventhub.source_master_entity` through `INSERT ... ON CONFLICT DO UPDATE`
Fix:
- first `SELECT id`
- if missing, `INSERT ... ON CONFLICT DO NOTHING RETURNING id`
- if another transaction won the race, select again
- do not update existing master entity rows during event ingest
Files:
- `src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java`
### 4.3 Fix race handling when `RETURNING` returns no row
Problem:
- if a concurrent transaction inserted the entity first, the resolver could still fail unexpectedly
Fix:
- allow the `RETURNING` path to yield `null`
- retry with a follow-up `SELECT`
Files:
- `src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java`
## 5. Transaction Visibility and Correctness Fixes
### 5.1 Remove outer transaction around full tachograph execution
Problem:
- master-data refresh logs showed completion, but master-data rows were not visible yet because the outer import method still held the transaction open
Fix:
- remove the outer transaction from `startAndExecuteImport(...)`
- keep chunk-level and package-level transactions independent
Files:
- `src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java`
### 5.2 Preserve the original ingest exception if package failure marking also fails
Problem:
- when ingest failed and `markFailed(...)` also failed because of a broken connection, the secondary bookkeeping error hid the real root cause
Fix:
- wrap `dataPackageRepository.markFailed(...)` in its own `try/catch`
- log the bookkeeping failure
- keep the original ingest exception and attach the bookkeeping failure as suppressed
Files:
- `src/main/java/at/procon/eventhub/service/EventHubIngestionService.java`
### 5.3 Do not advance import cursors before async ingest really finishes
Problem:
- extraction previously marked packages imported and advanced `import_cursor` before the async `CAMEL_BATCH` ingest was durably finished
- this could skip source data on the next run if async ingest later failed
Fix:
- add grouped child-batch status lookup on `data_package`
- make extraction package completion wait for all derived `CAMEL_BATCH` rows to reach terminal success
- fail the planned extraction package if child batches fail or time out
- only advance the cursor after the async ingest succeeded
- make "Completed import run" mean durable ingest completion instead of extraction completion
Files:
- `src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java`
- `src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java`
- `src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java`
## 6. Connection Reset and Retry Hardening
### 6.1 Retry transient DB failures in the Camel ingest route
Problem:
- long-running imports hit transient failures such as deadlocks and connection resets
Fix:
- add Camel redelivery with exponential backoff for:
- `CannotAcquireLockException`
- `PessimisticLockingFailureException`
- `DataAccessResourceFailureException`
- `TransientDataAccessException`
Files:
- `src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java`
### 6.2 Tune Hikari for shorter-lived and healthier pooled connections
Problem:
- `SQLSTATE 08006` / `Connection reset` events left broken pool entries behind
Fix:
- configure Hikari with explicit pool sizing and connection lifetime / keepalive settings:
- `maximum-pool-size: 16`
- `minimum-idle: 4`
- `connection-timeout: 30000`
- `validation-timeout: 5000`
- `idle-timeout: 300000`
- `keepalive-time: 120000`
- `max-lifetime: 540000`
Files:
- `src/main/resources/application.yml`
## 7. Observability Improvements
### 7.1 Master-data progress logging
Added logs for:
- refresh start
- per-section progress
- per-chunk counts
- `byType` breakdowns
- section completion
- reconciliation start and result
Files:
- `src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java`
### 7.2 Event extraction progress logging
Added logs for:
- extraction start
- progress every `5000` mapped events
- final mapped totals with `byType`
Files:
- `src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java`
### 7.3 Event ingest throughput logging
Added logs for:
- `receivedCount`
- `insertedCount`
- `elapsedMs`
- `receivedPerSecond`
- `byType`
Files:
- `src/main/java/at/procon/eventhub/service/EventHubIngestionService.java`
### 7.4 Async-ingest wait progress logging
Added logs for:
- number of expected child batches
- observed child batches
- successful / failed / importing child-batch counts while the import executor waits for durable completion
Files:
- `src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java`
## 8. Operational Notes
### Throughput effect seen during the optimization round
Observed progression during the work:
- roughly `30` events/sec before the later cache and blocking fixes
- roughly `300` rows/sec after the main contention and stuck-session cleanup work
This is a major improvement, but large historical backfills are still expensive.
### What remains expensive
The main remaining bottleneck is still reference resolution in the ingest hot path, especially:
- driver entity resolution
- source-package entity resolution
- vehicle / registration lookup and creation
The next major optimization step would be set-based pre-resolution of references per ingest batch instead of resolving them one event at a time.
### Safe rerun behavior
- event ingest remains idempotent through `event_source_record.source_record_key_hash`
- already imported events should generally be kept
- when historical cursor corruption existed, repair should target `import_cursor`, not wholesale deletion of imported events
## 9. Main Files Touched
- `src/main/java/at/procon/eventhub/persistence/EventRepository.java`
- `src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java`
- `src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java`
- `src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java`
- `src/main/java/at/procon/eventhub/service/EventHubIngestionService.java`
- `src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java`
- `src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java`
- `src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java`
- `src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java`
- `src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java`
- `src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java`
- `src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java`
- `src/main/java/at/procon/eventhub/config/EventHubProperties.java`
- `src/main/resources/application.yml`
- `src/main/resources/db/eventhub_schema_create.sql`
- `src/main/resources/db/migration/V9__add_event_source_package_id.sql`
- `src/main/resources/db/migration/V10__make_event_hypertable.sql`
- `src/main/resources/db/migration/V11__ensure_event_source_record.sql`

View File

@ -8,6 +8,7 @@ import at.procon.eventhub.dto.ImportScopeDto;
import at.procon.eventhub.service.EventHubEventSorter; import at.procon.eventhub.service.EventHubEventSorter;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
@ -50,6 +51,7 @@ public class EventHubBatchBuildProcessor implements Processor {
metadata.put("aggregatePackageKey", aggregatePackageKey); metadata.put("aggregatePackageKey", aggregatePackageKey);
metadata.put("camelExchangeId", exchange.getExchangeId()); metadata.put("camelExchangeId", exchange.getExchangeId());
metadata.put("eventCount", sortedEvents.size()); metadata.put("eventCount", sortedEvents.size());
metadata.put("receivedEventTypeCounts", eventTypeCounts(sortedEvents));
if (packageInfo != null) { if (packageInfo != null) {
metadata.put("tenantKey", packageInfo.tenantKey()); metadata.put("tenantKey", packageInfo.tenantKey());
metadata.put("eventSource", packageInfo.eventSource().stableKey()); metadata.put("eventSource", packageInfo.eventSource().stableKey());
@ -70,4 +72,14 @@ public class EventHubBatchBuildProcessor implements Processor {
metadata metadata
)); ));
} }
private Map<String, Integer> eventTypeCounts(List<EventHubEventDto> events) {
Map<String, Integer> counts = new LinkedHashMap<>();
for (EventHubEventDto event : events) {
String domain = event.eventDomain() == null ? "UNKNOWN_DOMAIN" : event.eventDomain().name();
String type = event.eventType() == null ? "UNKNOWN_EVENT" : event.eventType().name();
counts.merge(domain + "/" + type, 1, Integer::sum);
}
return counts;
}
} }

View File

@ -2,8 +2,13 @@ package at.procon.eventhub.camel;
import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.service.EventHubIngestionService; import at.procon.eventhub.service.EventHubIngestionService;
import org.springframework.dao.CannotAcquireLockException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.dao.PessimisticLockingFailureException;
import org.springframework.dao.TransientDataAccessException;
import org.apache.camel.builder.RouteBuilder; import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionSystemException;
@Component @Component
public class EventHubCommonIngestionRoute extends RouteBuilder { public class EventHubCommonIngestionRoute extends RouteBuilder {
@ -37,6 +42,19 @@ public class EventHubCommonIngestionRoute extends RouteBuilder {
public void configure() { public void configure() {
String batchInputUri = batchInputUri(); String batchInputUri = batchInputUri();
onException(
CannotAcquireLockException.class,
PessimisticLockingFailureException.class,
DataAccessResourceFailureException.class,
TransientDataAccessException.class,
TransactionSystemException.class
)
.maximumRedeliveries(5)
.redeliveryDelay(2000)
.backOffMultiplier(2.0)
.useExponentialBackOff()
.retryAttemptedLogLevel(org.apache.camel.LoggingLevel.WARN);
from("direct:eventhub-normalized-input") from("direct:eventhub-normalized-input")
.routeId("eventhub-normalized-input-route") .routeId("eventhub-normalized-input-route")
.process(validationProcessor) .process(validationProcessor)

View File

@ -29,6 +29,7 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
private static final Duration ASYNC_INGEST_AWAIT_TIMEOUT = Duration.ofHours(6); private static final Duration ASYNC_INGEST_AWAIT_TIMEOUT = Duration.ofHours(6);
private static final Duration ASYNC_INGEST_POLL_INTERVAL = Duration.ofSeconds(2); private static final Duration ASYNC_INGEST_POLL_INTERVAL = Duration.ofSeconds(2);
private static final Duration ASYNC_INGEST_FAILURE_GRACE_PERIOD = Duration.ofSeconds(90); private static final Duration ASYNC_INGEST_FAILURE_GRACE_PERIOD = Duration.ofSeconds(90);
private static final Duration ASYNC_INGEST_STALL_GRACE_PERIOD = Duration.ofSeconds(90);
private static final Duration ASYNC_INGEST_PROGRESS_LOG_INTERVAL = Duration.ofSeconds(30); private static final Duration ASYNC_INGEST_PROGRESS_LOG_INTERVAL = Duration.ofSeconds(30);
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
@ -186,6 +187,12 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
plannedPackage.planItem(), plannedPackage.planItem(),
plannedPackage.chunk() plannedPackage.chunk()
); );
dataPackageRepository.updateEventCount(plannedPackage.packageId(), result.eventsInserted());
if (!result.eventTypeCounts().isEmpty()) {
dataPackageRepository.mergeMetadata(plannedPackage.packageId(), Map.of(
"extractedEventTypeCounts", result.eventTypeCounts()
));
}
awaitAsyncIngestCompletion(importRunId, request, plannedPackage, result); awaitAsyncIngestCompletion(importRunId, request, plannedPackage, result);
results.add(result); results.add(result);
dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted()); dataPackageRepository.markImported(plannedPackage.packageId(), result.eventsInserted());
@ -231,6 +238,7 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
Instant deadline = Instant.now().plus(ASYNC_INGEST_AWAIT_TIMEOUT); Instant deadline = Instant.now().plus(ASYNC_INGEST_AWAIT_TIMEOUT);
Instant nextProgressLogAt = Instant.now().plus(ASYNC_INGEST_PROGRESS_LOG_INTERVAL); Instant nextProgressLogAt = Instant.now().plus(ASYNC_INGEST_PROGRESS_LOG_INTERVAL);
Instant failedStateObservedAt = null; Instant failedStateObservedAt = null;
Instant lastStateChangeAt = Instant.now();
CamelBatchGroupStatus previousState = null; CamelBatchGroupStatus previousState = null;
while (Instant.now().isBefore(deadline)) { while (Instant.now().isBefore(deadline)) {
@ -244,6 +252,9 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
} }
boolean stateChanged = previousState == null || !previousState.equals(state); boolean stateChanged = previousState == null || !previousState.equals(state);
if (stateChanged) {
lastStateChangeAt = Instant.now();
}
if (state.failedCount() > 0 && stateChanged) { if (state.failedCount() > 0 && stateChanged) {
failedStateObservedAt = Instant.now(); failedStateObservedAt = Instant.now();
} else if (state.failedCount() == 0) { } else if (state.failedCount() == 0) {
@ -263,6 +274,21 @@ public abstract class AbstractImportExecutionService<R extends ImportRunRequest,
); );
} }
if (state.totalCount() < expectedCamelBatches
&& state.importingCount() == 0
&& state.failedCount() == 0
&& Instant.now().isAfter(lastStateChangeAt.plus(ASYNC_INGEST_STALL_GRACE_PERIOD))) {
throw new IllegalStateException(
"Async EventHub ingest stalled for importRunId=" + importRunId
+ " aggregatePackageKey=" + aggregatePackageKey
+ " expectedCamelBatches=" + expectedCamelBatches
+ " observedCamelBatches=" + state.totalCount()
+ " successfulCamelBatches=" + state.successCount()
+ " importingCamelBatches=" + state.importingCount()
+ " failedCamelBatches=" + state.failedCount()
);
}
if (Instant.now().isAfter(nextProgressLogAt)) { if (Instant.now().isAfter(nextProgressLogAt)) {
log.info("Waiting for async EventHub ingest provider={} importRunId={} extractionPackageId={} aggregatePackageKey={} expectedCamelBatches={} observedCamelBatches={} successfulCamelBatches={} failedCamelBatches={} importingCamelBatches={}", log.info("Waiting for async EventHub ingest provider={} importRunId={} extractionPackageId={} aggregatePackageKey={} expectedCamelBatches={} observedCamelBatches={} successfulCamelBatches={} failedCamelBatches={} importingCamelBatches={}",
providerPackagePrefix(), providerPackagePrefix(),

View File

@ -1,6 +1,7 @@
package at.procon.eventhub.importing; package at.procon.eventhub.importing;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.Map;
public interface ExtractionBatchResult { public interface ExtractionBatchResult {
@ -15,4 +16,8 @@ public interface ExtractionBatchResult {
OffsetDateTime lastSourceRowUpdatedAt(); OffsetDateTime lastSourceRowUpdatedAt();
OffsetDateTime lastOccurredTo(); OffsetDateTime lastOccurredTo();
default Map<String, Integer> eventTypeCounts() {
return Map.of();
}
} }

View File

@ -34,6 +34,7 @@ public class DataPackageRepository {
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
} }
@Transactional(propagation = Propagation.REQUIRES_NEW)
public UUID createPackage( public UUID createPackage(
int eventSourceId, int eventSourceId,
String packageKey, String packageKey,
@ -43,24 +44,78 @@ public class DataPackageRepository {
OffsetDateTime occurredTo, OffsetDateTime occurredTo,
Map<String, Object> metadata Map<String, Object> metadata
) { ) {
return insertPackage( UUID id = UUID.randomUUID();
eventSourceId, SourceGroupRefDto sourceGroup = packageInfo == null ? null : packageInfo.sourceGroup();
null, ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope();
packageKey, SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation();
packageInfo,
packageType, return jdbcTemplate.query(
DataPackageStatus.IMPORTING, con -> {
occurredFrom, var ps = con.prepareStatement("""
occurredTo, insert into eventhub.data_package(
null, id, event_source_id, import_run_id, tenant_key, package_key, package_type, status,
null, source_group_type, source_group_entity_id, source_group_code, source_group_name,
null, import_scope_type, root_source_org_entity_id, root_source_org_code, root_source_org_name,
null, include_children, occurred_from, occurred_to,
null, event_family, business_date, external_package_id,
null, extraction_code, extraction_source_kind, entity_axis, batch_no, chunk_from, chunk_to,
null, source_package_kind, source_package_id, source_package_entity_id,
null, source_package_period_from, source_package_period_to, source_package_imported_at,
metadata received_at, event_count, metadata
) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), ?, ?::jsonb)
on conflict (tenant_key, event_source_id, package_key) do update
set status = excluded.status,
occurred_from = excluded.occurred_from,
occurred_to = excluded.occurred_to,
event_count = excluded.event_count,
metadata = excluded.metadata,
error_message = null,
completed_at = null
returning id
""");
ps.setObject(1, id);
ps.setInt(2, eventSourceId);
ps.setObject(3, null);
ps.setString(4, packageInfo == null ? "default" : packageInfo.tenantKey());
ps.setString(5, packageKey);
ps.setString(6, packageType.name());
ps.setString(7, DataPackageStatus.IMPORTING.name());
ps.setString(8, sourceGroup == null || sourceGroup.type() == null ? null : sourceGroup.type().name());
ps.setString(9, sourceGroup == null ? null : sourceGroup.sourceEntityId());
ps.setString(10, sourceGroup == null ? null : sourceGroup.code());
ps.setString(11, sourceGroup == null ? null : sourceGroup.name());
ps.setString(12, importScope == null || importScope.type() == null ? null : importScope.type().name());
ps.setString(13, rootOrg == null ? null : rootOrg.sourceEntityId());
ps.setString(14, rootOrg == null ? null : rootOrg.code());
ps.setString(15, rootOrg == null ? null : rootOrg.name());
ps.setBoolean(16, importScope != null && importScope.includeChildren());
ps.setObject(17, occurredFrom);
ps.setObject(18, occurredTo);
ps.setString(19, packageInfo == null ? null : packageInfo.eventFamily());
ps.setObject(20, packageInfo == null ? null : packageInfo.businessDate());
ps.setString(21, packageInfo == null ? packageKey : packageInfo.externalPackageId());
ps.setString(22, null);
ps.setString(23, null);
ps.setString(24, null);
ps.setObject(25, null);
ps.setObject(26, null);
ps.setObject(27, null);
ps.setString(28, null);
ps.setString(29, null);
ps.setString(30, null);
ps.setObject(31, null);
ps.setObject(32, null);
ps.setObject(33, null);
ps.setInt(34, 0);
ps.setString(35, toJson(metadata));
return ps;
},
rs -> {
if (!rs.next()) {
throw new IllegalStateException("Could not create or resolve data package " + packageKey);
}
return (UUID) rs.getObject(1);
}
); );
} }
@ -198,6 +253,31 @@ public class DataPackageRepository {
); );
} }
public void updateEventCount(UUID packageId, int eventCount) {
jdbcTemplate.update(
"""
update eventhub.data_package
set event_count = ?
where id = ?
""",
eventCount,
packageId
);
}
public void mergeMetadata(UUID packageId, Map<String, Object> metadata) {
jdbcTemplate.update(
"""
update eventhub.data_package
set metadata = coalesce(metadata, '{}'::jsonb) || ?::jsonb
where id = ?
""",
toJson(metadata),
packageId
);
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void markImported(UUID packageId, int insertedCount) { public void markImported(UUID packageId, int insertedCount) {
jdbcTemplate.update( jdbcTemplate.update(
""" """
@ -225,6 +305,44 @@ public class DataPackageRepository {
); );
} }
public CamelBatchGroupStatus findCamelBatchGroupStatus(int eventSourceId, String tenantKey, String aggregatePackageKey) {
return jdbcTemplate.query(
"""
select count(*) as total_count,
count(*) filter (where status in (?, ?)) as success_count,
count(*) filter (where status = ?) as failed_count,
count(*) filter (where status = ?) as importing_count,
coalesce(max(error_message) filter (where status = ?), '') as failed_message
from eventhub.data_package
where event_source_id = ?
and tenant_key = ?
and package_type = ?
and package_key like ?
""",
rs -> {
if (!rs.next()) {
return CamelBatchGroupStatus.empty();
}
return new CamelBatchGroupStatus(
rs.getInt("total_count"),
rs.getInt("success_count"),
rs.getInt("failed_count"),
rs.getInt("importing_count"),
rs.getString("failed_message")
);
},
DataPackageStatus.IMPORTED.name(),
DataPackageStatus.EMPTY.name(),
DataPackageStatus.FAILED.name(),
DataPackageStatus.IMPORTING.name(),
DataPackageStatus.FAILED.name(),
eventSourceId,
tenantKey,
DataPackageType.CAMEL_BATCH.name(),
aggregatePackageKey + ":CAMEL-%"
);
}
private String toJson(Map<String, Object> value) { private String toJson(Map<String, Object> value) {
try { try {
return objectMapper.writeValueAsString(normalizeMetadataMap(value)); return objectMapper.writeValueAsString(normalizeMetadataMap(value));
@ -291,4 +409,16 @@ public class DataPackageRepository {
} }
return value.toString(); return value.toString();
} }
public record CamelBatchGroupStatus(
int totalCount,
int successCount,
int failedCount,
int importingCount,
String failedMessage
) {
public static CamelBatchGroupStatus empty() {
return new CamelBatchGroupStatus(0, 0, 0, 0, "");
}
}
} }

View File

@ -320,10 +320,7 @@ public class SourceMasterDataRepository {
}, },
rs -> { rs -> {
if (!rs.next()) { if (!rs.next()) {
throw new IllegalStateException( return null;
"Could not resolve source master entity id for "
+ normalizedTenantKey + ":" + eventSourceId + ":" + normalizedEntityType + ":" + normalizedSourceEntityId
);
} }
return (UUID) rs.getObject(1); return (UUID) rs.getObject(1);
} }

View File

@ -524,13 +524,13 @@ public class VehicleIdentityRepository {
jdbcTemplate.update( jdbcTemplate.update(
""" """
update eventhub.vehicle update eventhub.vehicle
set source_vehicle_entity_id = coalesce(source_vehicle_entity_id, ?), set source_vehicle_entity_id = coalesce(source_vehicle_entity_id, cast(? as text)),
vin = coalesce(vin, ?), vin = coalesce(vin, cast(? as text)),
updated_at = now() updated_at = now()
where id = ? where id = ?
and ( and (
(source_vehicle_entity_id is null and ? is not null) (source_vehicle_entity_id is null and cast(? as text) is not null)
or (vin is null and ? is not null) or (vin is null and cast(? as text) is not null)
) )
""", """,
sourceVehicleEntityId, sourceVehicleEntityId,
@ -595,15 +595,15 @@ public class VehicleIdentityRepository {
jdbcTemplate.update( jdbcTemplate.update(
""" """
update eventhub.vehicle_registration update eventhub.vehicle_registration
set source_registration_entity_id = coalesce(source_registration_entity_id, ?), set source_registration_entity_id = coalesce(source_registration_entity_id, cast(? as text)),
nation = coalesce(?, nation), nation = coalesce(cast(? as text), nation),
registration_number = coalesce(?, registration_number), registration_number = coalesce(cast(? as text), registration_number),
updated_at = now() updated_at = now()
where id = ? where id = ?
and ( and (
(source_registration_entity_id is null and ? is not null) (source_registration_entity_id is null and cast(? as text) is not null)
or (nation is null and ? is not null) or (nation is null and cast(? as text) is not null)
or (registration_number is null and ? is not null) or (registration_number is null and cast(? as text) is not null)
) )
""", """,
sourceRegistrationEntityId, sourceRegistrationEntityId,

View File

@ -15,7 +15,8 @@ import java.util.UUID;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.support.TransactionTemplate;
@Service @Service
public class EventHubIngestionService { public class EventHubIngestionService {
@ -27,26 +28,29 @@ public class EventHubIngestionService {
private final EventRepository eventRepository; private final EventRepository eventRepository;
private final EventHubEventValidator validator; private final EventHubEventValidator validator;
private final EventHubEventSorter sorter; private final EventHubEventSorter sorter;
private final TransactionTemplate transactionTemplate;
public EventHubIngestionService( public EventHubIngestionService(
EventSourceRepository eventSourceRepository, EventSourceRepository eventSourceRepository,
DataPackageRepository dataPackageRepository, DataPackageRepository dataPackageRepository,
EventRepository eventRepository, EventRepository eventRepository,
EventHubEventValidator validator, EventHubEventValidator validator,
EventHubEventSorter sorter EventHubEventSorter sorter,
PlatformTransactionManager transactionManager
) { ) {
this.eventSourceRepository = eventSourceRepository; this.eventSourceRepository = eventSourceRepository;
this.dataPackageRepository = dataPackageRepository; this.dataPackageRepository = dataPackageRepository;
this.eventRepository = eventRepository; this.eventRepository = eventRepository;
this.validator = validator; this.validator = validator;
this.sorter = sorter; this.sorter = sorter;
this.transactionTemplate = new TransactionTemplate(transactionManager);
} }
@Transactional
public EventHubPackageResult ingest(EventHubEventBatchDto batch) { public EventHubPackageResult ingest(EventHubEventBatchDto batch) {
if (batch == null || batch.events().isEmpty()) { if (batch == null || batch.events().isEmpty()) {
return new EventHubPackageResult(null, batch == null ? null : batch.packageKey(), 0, 0); return new EventHubPackageResult(null, batch == null ? null : batch.packageKey(), 0, 0);
} }
long startedAtNanos = System.nanoTime();
EventHubPackageRequest packageInfo = batch.packageInfo(); EventHubPackageRequest packageInfo = batch.packageInfo();
if (packageInfo == null) { if (packageInfo == null) {
@ -57,6 +61,7 @@ public class EventHubIngestionService {
} }
EventSourceDto eventSource = packageInfo.eventSource(); EventSourceDto eventSource = packageInfo.eventSource();
String tenantKey = packageInfo.tenantKey();
int eventSourceId = eventSourceRepository.resolveSourceId(packageInfo.tenantKey(), eventSource); int eventSourceId = eventSourceRepository.resolveSourceId(packageInfo.tenantKey(), eventSource);
List<EventHubEventDto> sortedEvents = sorter.sort(batch.events()); List<EventHubEventDto> sortedEvents = sorter.sort(batch.events());
sortedEvents.forEach(validator::validate); sortedEvents.forEach(validator::validate);
@ -72,13 +77,26 @@ public class EventHubIngestionService {
); );
try { try {
int insertedCount = eventRepository.batchInsert(packageId, packageInfo.tenantKey(), eventSourceId, sortedEvents); Integer insertedCount = transactionTemplate.execute(status ->
eventRepository.batchInsert(packageId, tenantKey, eventSourceId, sortedEvents)
);
if (insertedCount == null) {
throw new IllegalStateException("EventHub batch insert returned no result for package " + batch.packageKey());
}
dataPackageRepository.markImported(packageId, insertedCount); dataPackageRepository.markImported(packageId, insertedCount);
log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={} byType={}", long elapsedMs = Math.max(1L, (System.nanoTime() - startedAtNanos) / 1_000_000L);
packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount, eventTypeCounts(sortedEvents)); long receivedPerSecond = Math.max(1L, Math.round(sortedEvents.size() * 1000.0 / elapsedMs));
log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={} elapsedMs={} receivedPerSecond={} byType={}",
packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount, elapsedMs, receivedPerSecond, eventTypeCounts(sortedEvents));
return new EventHubPackageResult(packageId, batch.packageKey(), sortedEvents.size(), insertedCount); return new EventHubPackageResult(packageId, batch.packageKey(), sortedEvents.size(), insertedCount);
} catch (RuntimeException ex) { } catch (RuntimeException ex) {
try {
dataPackageRepository.markFailed(packageId, ex.getMessage()); dataPackageRepository.markFailed(packageId, ex.getMessage());
} catch (RuntimeException markFailedEx) {
ex.addSuppressed(markFailedEx);
log.warn("Failed to mark EventHub acquisition package as failed packageId={} packageKey={}",
packageId, batch.packageKey(), markFailedEx);
}
throw ex; throw ex;
} }
} }

View File

@ -2,6 +2,7 @@ package at.procon.eventhub.tachograph.dto;
import at.procon.eventhub.importing.ExtractionBatchResult; import at.procon.eventhub.importing.ExtractionBatchResult;
import java.time.OffsetDateTime; import java.time.OffsetDateTime;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
public record TachographExtractionBatchResultDto( public record TachographExtractionBatchResultDto(
@ -16,6 +17,11 @@ public record TachographExtractionBatchResultDto(
OffsetDateTime lastSourcePackageImportedAt, OffsetDateTime lastSourcePackageImportedAt,
String lastSourcePackageId, String lastSourcePackageId,
OffsetDateTime lastSourceRowUpdatedAt, OffsetDateTime lastSourceRowUpdatedAt,
OffsetDateTime lastOccurredTo OffsetDateTime lastOccurredTo,
Map<String, Integer> eventTypeCounts
) implements ExtractionBatchResult { ) implements ExtractionBatchResult {
public TachographExtractionBatchResultDto {
eventTypeCounts = eventTypeCounts == null ? Map.of() : Map.copyOf(eventTypeCounts);
}
} }

View File

@ -84,7 +84,8 @@ public class JdbcTachographExtractionBatchExecutor
lastSourcePackageImportedAt(stats, cursor), lastSourcePackageImportedAt(stats, cursor),
lastSourcePackageId(stats, cursor), lastSourcePackageId(stats, cursor),
null, null,
chunk.occurredTo() chunk.occurredTo(),
stats.eventTypeCounts()
); );
} }

View File

@ -42,7 +42,8 @@ public class NoopTachographExtractionBatchExecutor
null, null,
null, null,
null, null,
null null,
java.util.Map.of()
); );
} }

View File

@ -1,5 +1,6 @@
package at.procon.eventhub.tachograph.service; package at.procon.eventhub.tachograph.service;
import at.procon.eventhub.config.EventHubProperties;
import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.dto.EventSourceDto;
import at.procon.eventhub.importing.AbstractImportExecutionService; import at.procon.eventhub.importing.AbstractImportExecutionService;
import at.procon.eventhub.importing.ImportPlanDto; import at.procon.eventhub.importing.ImportPlanDto;
@ -32,10 +33,11 @@ public class TachographImportExecutionService
ImportRunRepository importRunRepository, ImportRunRepository importRunRepository,
DataPackageRepository dataPackageRepository, DataPackageRepository dataPackageRepository,
ImportCursorRepository importCursorRepository, ImportCursorRepository importCursorRepository,
EventHubProperties eventHubProperties,
TachographMasterDataRefreshService masterDataRefreshService, TachographMasterDataRefreshService masterDataRefreshService,
TachographExtractionBatchExecutor extractionBatchExecutor TachographExtractionBatchExecutor extractionBatchExecutor
) { ) {
super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository); super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository, eventHubProperties);
this.planService = planService; this.planService = planService;
this.masterDataRefreshService = masterDataRefreshService; this.masterDataRefreshService = masterDataRefreshService;
this.extractionBatchExecutor = extractionBatchExecutor; this.extractionBatchExecutor = extractionBatchExecutor;

View File

@ -5,12 +5,29 @@ spring:
url: jdbc:postgresql://localhost:5432/eventhub url: jdbc:postgresql://localhost:5432/eventhub
username: postgres username: postgres
password: P54!pcd#Wi password: P54!pcd#Wi
hikari:
maximum-pool-size: 16
minimum-idle: 4
connection-timeout: 30000
validation-timeout: 5000
idle-timeout: 60000
keepalive-time: 30000
max-lifetime: 90000
flyway: flyway:
enabled: true enabled: true
default-schema: eventhub default-schema: eventhub
schemas: eventhub schemas: eventhub
create-schemas: true create-schemas: true
logging:
file:
name: ${EVENTHUB_LOG_FILE:logs/eventhub-ingestion-service.log}
logback:
rollingpolicy:
max-file-size: ${EVENTHUB_LOG_MAX_FILE_SIZE:50MB}
max-history: ${EVENTHUB_LOG_MAX_HISTORY:14}
total-size-cap: ${EVENTHUB_LOG_TOTAL_SIZE_CAP:1GB}
server: server:
port: 8085 port: 8085

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<springProperty scope="context" name="logFile" source="logging.file.name" defaultValue="logs/eventhub-ingestion-service.log"/>
<springProperty scope="context" name="maxFileSize" source="logging.logback.rollingpolicy.max-file-size" defaultValue="50MB"/>
<springProperty scope="context" name="maxHistory" source="logging.logback.rollingpolicy.max-history" defaultValue="14"/>
<springProperty scope="context" name="totalSizeCap" source="logging.logback.rollingpolicy.total-size-cap" defaultValue="1GB"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX} %-5level ${PID:-unknown} --- [%thread] %logger{39} : %msg%n%ex</pattern>
<charset>UTF-8</charset>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${logFile}</file>
<append>true</append>
<immediateFlush>true</immediateFlush>
<encoder>
<pattern>%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX} %-5level ${PID:-unknown} --- [%thread] %logger{39} : %msg%n%ex</pattern>
<charset>UTF-8</charset>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
<fileNamePattern>${logFile}.%d{yyyy-MM-dd}.%i.gz</fileNamePattern>
<maxFileSize>${maxFileSize}</maxFileSize>
<maxHistory>${maxHistory}</maxHistory>
<totalSizeCap>${totalSizeCap}</totalSizeCap>
</rollingPolicy>
</appender>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</root>
</configuration>

View File

@ -69,7 +69,7 @@ Base as (
ca.card_filelog_id, ca.card_filelog_id,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, ca.ID_FileLog, ca.cda_filelog_id, ca.card_filelog_id, ca.ID) as source_package_id_raw, coalesce(fl.ID, ca.ID_FileLog, ca.cda_filelog_id, ca.card_filelog_id) as source_package_id_raw,
coalesce(fl.ID_Card, ca.card_id) as source_package_entity_id_raw, coalesce(fl.ID_Card, ca.card_id) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, ca.RecordDate) as source_package_period_from, coalesce(fl.DownloadFrom, ca.RecordDate) as source_package_period_from,
coalesce(fl.DownloadTo, ca.RecordDateTo, dateadd(day, 1, ca.RecordDate)) as source_package_period_to, coalesce(fl.DownloadTo, ca.RecordDateTo, dateadd(day, 1, ca.RecordDate)) as source_package_period_to,

View File

@ -27,7 +27,7 @@ Base as (
vehicleNation.AlphaCode as vehicle_registration_nation, vehicleNation.AlphaCode as vehicle_registration_nation,
v.VRN as vehicle_registration_number, v.VRN as vehicle_registration_number,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, border.ID_FileLog, border.ID) as source_package_id_raw, coalesce(fl.ID, border.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_Card, border.ID_Card) as source_package_entity_id_raw, coalesce(fl.ID_Card, border.ID_Card) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, border.Timestamp) as source_package_period_from, coalesce(fl.DownloadFrom, border.Timestamp) as source_package_period_from,
coalesce(fl.DownloadTo, border.Timestamp) as source_package_period_to, coalesce(fl.DownloadTo, border.Timestamp) as source_package_period_to,

View File

@ -26,7 +26,7 @@ Base as (
vehicleNation.AlphaCode as vehicle_registration_nation, vehicleNation.AlphaCode as vehicle_registration_nation,
v.VRN as vehicle_registration_number, v.VRN as vehicle_registration_number,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, lu.ID_FileLog, lu.ID) as source_package_id_raw, coalesce(fl.ID, lu.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_Card, lu.ID_Card) as source_package_entity_id_raw, coalesce(fl.ID_Card, lu.ID_Card) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, lu.Timestamp) as source_package_period_from, coalesce(fl.DownloadFrom, lu.Timestamp) as source_package_period_from,
coalesce(fl.DownloadTo, lu.Timestamp) as source_package_period_to, coalesce(fl.DownloadTo, lu.Timestamp) as source_package_period_to,

View File

@ -28,7 +28,7 @@ Base as (
vehicleNation.AlphaCode as vehicle_registration_nation, vehicleNation.AlphaCode as vehicle_registration_nation,
v.VRN as vehicle_registration_number, v.VRN as vehicle_registration_number,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, place.ID_FileLog, place.ID) as source_package_id_raw, coalesce(fl.ID, place.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_Card, place.ID_Card) as source_package_entity_id_raw, coalesce(fl.ID_Card, place.ID_Card) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, place.EntryTime) as source_package_period_from, coalesce(fl.DownloadFrom, place.EntryTime) as source_package_period_from,
coalesce(fl.DownloadTo, place.EntryTime) as source_package_period_to, coalesce(fl.DownloadTo, place.EntryTime) as source_package_period_to,

View File

@ -25,7 +25,7 @@ Base as (
vehicleNation.AlphaCode as vehicle_registration_nation, vehicleNation.AlphaCode as vehicle_registration_nation,
v.VRN as vehicle_registration_number, v.VRN as vehicle_registration_number,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, pos.ID_FileLog, pos.ID) as source_package_id_raw, coalesce(fl.ID, pos.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_Card, pos.ID_Card) as source_package_entity_id_raw, coalesce(fl.ID_Card, pos.ID_Card) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, pos.Timestamp) as source_package_period_from, coalesce(fl.DownloadFrom, pos.Timestamp) as source_package_period_from,
coalesce(fl.DownloadTo, pos.Timestamp) as source_package_period_to, coalesce(fl.DownloadTo, pos.Timestamp) as source_package_period_to,

View File

@ -23,7 +23,7 @@ Base as (
vehicleNation.AlphaCode as vehicle_registration_nation, vehicleNation.AlphaCode as vehicle_registration_nation,
v.VRN as vehicle_registration_number, v.VRN as vehicle_registration_number,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, cond.ID_FileLog, cond.ID) as source_package_id_raw, coalesce(fl.ID, cond.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_Card, cond.ID_Card) as source_package_entity_id_raw, coalesce(fl.ID_Card, cond.ID_Card) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, cond.EntryTime) as source_package_period_from, coalesce(fl.DownloadFrom, cond.EntryTime) as source_package_period_from,
coalesce(fl.DownloadTo, cond.EntryTime) as source_package_period_to, coalesce(fl.DownloadTo, cond.EntryTime) as source_package_period_to,

View File

@ -65,7 +65,7 @@ Base as (
evt.occurred_at, evt.occurred_at,
evt.odometer_m, evt.odometer_m,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, used.ID_FileLog, used.ID) as source_package_id_raw, coalesce(fl.ID, used.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_Card, used.ID_Card) as source_package_entity_id_raw, coalesce(fl.ID_Card, used.ID_Card) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, used.FirstUse) as source_package_period_from, coalesce(fl.DownloadFrom, used.FirstUse) as source_package_period_from,
coalesce(fl.DownloadTo, used.LastUse, used.FirstUse) as source_package_period_to, coalesce(fl.DownloadTo, used.LastUse, used.FirstUse) as source_package_period_to,

View File

@ -56,7 +56,7 @@ Base as (
evt.occurred_at, evt.occurred_at,
evt.odometer_m, evt.odometer_m,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, cycle.ID_FileLog, cycle.vui_filelog_id, cycle.ID) as source_package_id_raw, coalesce(fl.ID, cycle.ID_FileLog, cycle.vui_filelog_id) as source_package_id_raw,
coalesce(fl.ID_VehicleIdentification, cycle.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.ID_VehicleIdentification, cycle.ID_VehicleIdentification) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, cycle.BeginTime) as source_package_period_from, coalesce(fl.DownloadFrom, cycle.BeginTime) as source_package_period_from,
coalesce(fl.DownloadTo, cycle.EndTime, cycle.BeginTime) as source_package_period_to, coalesce(fl.DownloadTo, cycle.EndTime, cycle.BeginTime) as source_package_period_to,

View File

@ -28,7 +28,7 @@ Base as (
vi.ID as vehicle_identification_id, vi.ID as vehicle_identification_id,
vi.VIN as vehicle_vin, vi.VIN as vehicle_vin,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, speeding.ID_FileLog, vui.ID_FileLog, speeding.ID) as source_package_id_raw, coalesce(fl.ID, speeding.ID_FileLog, vui.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, speeding.BeginTime) as source_package_period_from, coalesce(fl.DownloadFrom, speeding.BeginTime) as source_package_period_from,
coalesce(fl.DownloadTo, speeding.EndTime, speeding.BeginTime) as source_package_period_to, coalesce(fl.DownloadTo, speeding.EndTime, speeding.BeginTime) as source_package_period_to,

View File

@ -102,7 +102,7 @@ Base as (
va.vui_filelog_id, va.vui_filelog_id,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, va.ID_FileLog, va.vda_filelog_id, va.vui_filelog_id, va.ID) as source_package_id_raw, coalesce(fl.ID, va.ID_FileLog, va.vda_filelog_id, va.vui_filelog_id) as source_package_id_raw,
coalesce(fl.ID_VehicleIdentification, va.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.ID_VehicleIdentification, va.ID_VehicleIdentification) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, va.RecordDate) as source_package_period_from, coalesce(fl.DownloadFrom, va.RecordDate) as source_package_period_from,
coalesce(fl.DownloadTo, dateadd(day, 1, va.RecordDate)) as source_package_period_to, coalesce(fl.DownloadTo, dateadd(day, 1, va.RecordDate)) as source_package_period_to,

View File

@ -27,7 +27,7 @@ Base as (
vi.ID as vehicle_identification_id, vi.ID as vehicle_identification_id,
vi.VIN as vehicle_vin, vi.VIN as vehicle_vin,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, border.ID_FileLog, vui.ID_FileLog, border.ID) as source_package_id_raw, coalesce(fl.ID, border.ID_FileLog, vui.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, border.Timestamp) as source_package_period_from, coalesce(fl.DownloadFrom, border.Timestamp) as source_package_period_from,
coalesce(fl.DownloadTo, border.Timestamp) as source_package_period_to, coalesce(fl.DownloadTo, border.Timestamp) as source_package_period_to,

View File

@ -26,7 +26,7 @@ Base as (
vi.ID as vehicle_identification_id, vi.ID as vehicle_identification_id,
vi.VIN as vehicle_vin, vi.VIN as vehicle_vin,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, lu.ID_FileLog, vui.ID_FileLog, lu.ID) as source_package_id_raw, coalesce(fl.ID, lu.ID_FileLog, vui.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, lu.Timestamp) as source_package_period_from, coalesce(fl.DownloadFrom, lu.Timestamp) as source_package_period_from,
coalesce(fl.DownloadTo, lu.Timestamp) as source_package_period_to, coalesce(fl.DownloadTo, lu.Timestamp) as source_package_period_to,

View File

@ -27,7 +27,7 @@ Base as (
vi.ID as vehicle_identification_id, vi.ID as vehicle_identification_id,
vi.VIN as vehicle_vin, vi.VIN as vehicle_vin,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, place.ID_FileLog, vui.ID_FileLog, place.ID) as source_package_id_raw, coalesce(fl.ID, place.ID_FileLog, vui.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, place.EntryTime) as source_package_period_from, coalesce(fl.DownloadFrom, place.EntryTime) as source_package_period_from,
coalesce(fl.DownloadTo, place.EntryTime) as source_package_period_to, coalesce(fl.DownloadTo, place.EntryTime) as source_package_period_to,

View File

@ -25,7 +25,7 @@ Base as (
vi.ID as vehicle_identification_id, vi.ID as vehicle_identification_id,
vi.VIN as vehicle_vin, vi.VIN as vehicle_vin,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, pos.ID_FileLog, vui.ID_FileLog, pos.ID) as source_package_id_raw, coalesce(fl.ID, pos.ID_FileLog, vui.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, pos.Timestamp) as source_package_period_from, coalesce(fl.DownloadFrom, pos.Timestamp) as source_package_period_from,
coalesce(fl.DownloadTo, pos.Timestamp) as source_package_period_to, coalesce(fl.DownloadTo, pos.Timestamp) as source_package_period_to,

View File

@ -19,7 +19,7 @@ Base as (
vi.ID as vehicle_identification_id, vi.ID as vehicle_identification_id,
vi.VIN as vehicle_vin, vi.VIN as vehicle_vin,
coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at,
coalesce(fl.ID, cond.ID_FileLog, vui.ID_FileLog, cond.ID) as source_package_id_raw, coalesce(fl.ID, cond.ID_FileLog, vui.ID_FileLog) as source_package_id_raw,
coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw,
coalesce(fl.DownloadFrom, cond.EntryTime) as source_package_period_from, coalesce(fl.DownloadFrom, cond.EntryTime) as source_package_period_from,
coalesce(fl.DownloadTo, cond.EntryTime) as source_package_period_to, coalesce(fl.DownloadTo, cond.EntryTime) as source_package_period_to,