From b8817e18d9bcc0e2b107c3f3ccfe2fa4e04140f7 Mon Sep 17 00:00:00 2001 From: trifonovt <87468028+TihomirTrifonov@users.noreply.github.com> Date: Mon, 4 May 2026 11:57:51 +0200 Subject: [PATCH] Harden async ingest tracking and diagnostics --- README.md | 4 + docs/db/async_ingest_diagnostics.sql | 338 +++++++++++++ docs/import-performance-and-error-fixes.md | 459 ++++++++++++++++++ .../camel/EventHubBatchBuildProcessor.java | 12 + .../camel/EventHubCommonIngestionRoute.java | 18 + .../AbstractImportExecutionService.java | 26 + .../importing/ExtractionBatchResult.java | 5 + .../persistence/DataPackageRepository.java | 166 ++++++- .../SourceMasterDataRepository.java | 5 +- .../VehicleIdentityRepository.java | 20 +- .../service/EventHubIngestionService.java | 32 +- .../TachographExtractionBatchResultDto.java | 8 +- ...JdbcTachographExtractionBatchExecutor.java | 3 +- ...NoopTachographExtractionBatchExecutor.java | 3 +- .../TachographImportExecutionService.java | 4 +- src/main/resources/application.yml | 17 + src/main/resources/logback-spring.xml | 37 ++ .../sql/tachograph/card-activity.sql | 2 +- .../sql/tachograph/card-border-crossing.sql | 2 +- .../sql/tachograph/card-load-unload.sql | 2 +- .../resources/sql/tachograph/card-place.sql | 2 +- .../sql/tachograph/card-position.sql | 2 +- .../tachograph/card-specific-condition.sql | 2 +- .../sql/tachograph/card-vehicles-used.sql | 2 +- .../resources/sql/tachograph/iw-cycle.sql | 2 +- .../sql/tachograph/speeding-events.sql | 2 +- .../resources/sql/tachograph/vu-activity.sql | 2 +- .../sql/tachograph/vu-border-crossing.sql | 2 +- .../sql/tachograph/vu-load-unload.sql | 2 +- .../resources/sql/tachograph/vu-place.sql | 2 +- .../resources/sql/tachograph/vu-position.sql | 2 +- .../sql/tachograph/vu-specific-condition.sql | 2 +- 32 files changed, 1129 insertions(+), 58 deletions(-) create mode 100644 docs/db/async_ingest_diagnostics.sql create mode 100644 docs/import-performance-and-error-fixes.md create mode 100644 src/main/resources/logback-spring.xml diff --git a/README.md b/README.md index 457998f..faaacd5 100644 --- a/README.md +++ b/README.md @@ -624,6 +624,10 @@ This allows a single EventHub import run to process many original tachograph pac } ``` +For tachograph JDBC extraction, `sourcePackageId` is the original `FileLog.ID`. It is stored directly on `eventhub.event.source_package_id` and also resolved to `source_package_entity_id` for joins to source master data. + +`eventhub.event` is designed as a TimescaleDB hypertable partitioned by `occurred_at`. Source-record idempotency is therefore enforced through `eventhub.event_source_record(source_record_key_hash)` instead of a unique index on the hypertable, because Timescale unique indexes must include the partitioning column. + ### Tachograph import endpoints `POST /api/eventhub/acquisition/tachograph/imports/plan` returns the calculated event-family extraction plan and time chunks. diff --git a/docs/db/async_ingest_diagnostics.sql b/docs/db/async_ingest_diagnostics.sql new file mode 100644 index 0000000..b0d2c87 --- /dev/null +++ b/docs/db/async_ingest_diagnostics.sql @@ -0,0 +1,338 @@ +-- Async ingest diagnostics without filesystem logs. +-- +-- Replace the values in params before running. +-- completion_size must match eventhub.batch.completion-size from application.yml. + +-- 1) Import-run overview. +with params as ( + select + '667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id, + '54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id, + 5000::integer as completion_size +) +select + p.id, + p.package_type, + p.status, + p.batch_no, + p.event_family, + p.extraction_code, + p.extraction_source_kind, + p.chunk_from, + p.chunk_to, + p.event_count, + p.error_message, + p.received_at, + p.completed_at +from eventhub.data_package p +join params prm on prm.import_run_id = p.import_run_id +order by p.package_type, p.batch_no, p.received_at, p.package_key; + + +-- 2) Exact async-ingest status for one DB_EXTRACT package. +-- event_count on DB_EXTRACT is populated after executeBatch returns and before +-- async wait starts, so it represents extracted/mapped events while waiting. +with params as ( + select + '667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id, + '54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id, + 5000::integer as completion_size +), +extract_package as ( + select + p.id, + p.import_run_id, + p.event_source_id, + p.tenant_key, + p.package_key, + p.status, + p.event_count as extracted_event_count, + p.event_family, + p.business_date, + p.extraction_code, + p.extraction_source_kind, + p.source_group_type, + p.source_group_entity_id, + p.source_group_code, + p.import_scope_type, + p.root_source_org_entity_id, + p.root_source_org_code, + p.include_children, + p.chunk_from, + p.chunk_to, + coalesce((p.metadata ->> 'chunkSequence')::integer, 1) as chunk_sequence, + es.provider_key, + es.source_kind, + es.source_key, + es.source_instance_key, + es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code + || ':RUN-' || p.import_run_id::text + || ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as async_external_package_id, + p.tenant_key || ':' + || es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':' + || case + when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null + then 'NO_GROUP' + else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '') + end || ':' + || coalesce(p.import_scope_type, 'TENANT_ALL') || ':' + || case + when p.root_source_org_entity_id is null and p.root_source_org_code is null + then 'ALL' + else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '') + end || ':' + || case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':' + || coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':' + || coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':' + || p.event_family || ':' + || coalesce(p.business_date::text, 'NO_DATE') || ':' + || es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code + || ':RUN-' || p.import_run_id::text + || ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key + from eventhub.data_package p + join eventhub.event_source es + on es.id = p.event_source_id + join params prm + on prm.extraction_package_id = p.id + where p.package_type = 'DB_EXTRACT' +), +camel_batches as ( + select + c.id, + c.package_key, + c.status, + c.received_at, + c.completed_at, + c.event_count as inserted_count, + coalesce((c.metadata ->> 'eventCount')::integer, 0) as received_count, + c.metadata ->> 'externalPackageId' as external_package_id, + c.metadata ->> 'aggregatePackageKey' as aggregate_package_key, + c.error_message + from eventhub.data_package c + join extract_package e + on c.event_source_id = e.event_source_id + and c.tenant_key = e.tenant_key + where c.package_type = 'CAMEL_BATCH' + and c.package_key like e.aggregate_package_key || ':CAMEL-%' +), +camel_summary as ( + select + count(*) as observed_camel_batches, + count(*) filter (where status in ('IMPORTED', 'EMPTY')) as successful_camel_batches, + count(*) filter (where status = 'FAILED') as failed_camel_batches, + count(*) filter (where status = 'IMPORTING') as importing_camel_batches, + coalesce(sum(received_count), 0) as received_events, + coalesce(sum(inserted_count), 0) as inserted_events + from camel_batches +) +select + e.id as extraction_package_id, + e.import_run_id, + e.package_key as extraction_package_key, + e.status as extraction_status, + e.extracted_event_count, + case + when e.extracted_event_count <= 0 then 0 + else ((e.extracted_event_count - 1) / prm.completion_size) + 1 + end as expected_camel_batches, + s.observed_camel_batches, + s.successful_camel_batches, + s.failed_camel_batches, + s.importing_camel_batches, + s.received_events, + s.inserted_events, + e.async_external_package_id, + e.aggregate_package_key +from extract_package e +cross join camel_summary s +cross join params prm; + + +-- 3) Child CAMEL_BATCH rows for the selected DB_EXTRACT package. +with params as ( + select + '667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id, + '54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id, + 5000::integer as completion_size +), +extract_package as ( + select + p.id, + p.event_source_id, + p.tenant_key, + p.tenant_key || ':' + || es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':' + || case + when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null + then 'NO_GROUP' + else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '') + end || ':' + || coalesce(p.import_scope_type, 'TENANT_ALL') || ':' + || case + when p.root_source_org_entity_id is null and p.root_source_org_code is null + then 'ALL' + else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '') + end || ':' + || case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':' + || coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':' + || coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':' + || p.event_family || ':' + || coalesce(p.business_date::text, 'NO_DATE') || ':' + || es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code + || ':RUN-' || p.import_run_id::text + || ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key + from eventhub.data_package p + join eventhub.event_source es + on es.id = p.event_source_id + join params prm + on prm.extraction_package_id = p.id + where p.package_type = 'DB_EXTRACT' +) +select + c.id, + c.package_key, + c.status, + c.received_at, + c.completed_at, + coalesce((c.metadata ->> 'eventCount')::integer, 0) as received_count, + c.event_count as inserted_count, + c.metadata ->> 'externalPackageId' as external_package_id, + c.metadata ->> 'aggregatePackageKey' as aggregate_package_key, + c.error_message +from eventhub.data_package c +join extract_package e + on c.event_source_id = e.event_source_id + and c.tenant_key = e.tenant_key +where c.package_type = 'CAMEL_BATCH' + and c.package_key like e.aggregate_package_key || ':CAMEL-%' +order by c.received_at, c.package_key; + + +-- 4) Recent CAMEL_BATCH rows for the run, independent of a single DB_EXTRACT package. +with params as ( + select + '667b191a-4ef4-49d4-9837-05a7737b4fbb'::uuid as import_run_id, + '54ba1294-1061-4b3a-84b6-5a604aa651e6'::uuid as extraction_package_id, + 5000::integer as completion_size +) +select + c.id, + c.event_source_id, + c.package_key, + c.status, + c.received_at, + c.completed_at, + c.event_count as inserted_count, + c.metadata ->> 'eventFamily' as event_family, + c.metadata ->> 'eventSource' as event_source, + c.metadata ->> 'externalPackageId' as external_package_id, + c.metadata ->> 'aggregatePackageKey' as aggregate_package_key +from eventhub.data_package c +join params prm + on c.package_type = 'CAMEL_BATCH' +where c.package_key like '%RUN-' || prm.import_run_id::text || '%' + or coalesce(c.metadata ->> 'externalPackageId', '') like '%RUN-' || prm.import_run_id::text || '%' +order by c.received_at desc, c.package_key desc; + + +-- 5) Per event type: extracted vs received by CAMEL vs imported into eventhub.event. +with params as ( + select + '4d21ef91-c979-451b-9055-d574506843bf'::uuid as import_run_id, + '20679175-b481-4439-9f66-9b7c7ed336fb'::uuid as extraction_package_id, + 5000::integer as completion_size +), +extract_package as ( + select + p.id, + p.import_run_id, + p.event_source_id, + p.tenant_key, + p.event_family, + p.business_date, + p.extraction_code, + p.extraction_source_kind, + p.source_group_type, + p.source_group_entity_id, + p.source_group_code, + p.import_scope_type, + p.root_source_org_entity_id, + p.root_source_org_code, + p.include_children, + p.metadata, + es.provider_key, + es.source_kind, + es.source_key, + es.source_instance_key, + p.tenant_key || ':' + || es.provider_key || ':' || es.source_kind || ':' || es.source_key || ':' || coalesce(es.source_instance_key, 'default') || ':' + || case + when p.source_group_type is null and p.source_group_entity_id is null and p.source_group_code is null + then 'NO_GROUP' + else coalesce(p.source_group_type, '') || '|' || coalesce(p.source_group_entity_id, '') || '|' || coalesce(p.source_group_code, '') + end || ':' + || coalesce(p.import_scope_type, 'TENANT_ALL') || ':' + || case + when p.root_source_org_entity_id is null and p.root_source_org_code is null + then 'ALL' + else 'ORGANISATION|' || coalesce(p.root_source_org_entity_id, '') || '|' || coalesce(p.root_source_org_code, '') + end || ':' + || case when coalesce(p.include_children, false) then 'WITH_CHILDREN' else 'NO_CHILDREN' end || ':' + || coalesce(p.metadata ->> 'chunkOccurredFrom', 'BEGIN') || ':' + || coalesce(p.metadata ->> 'chunkOccurredTo', 'END') || ':' + || p.event_family || ':' + || coalesce(p.business_date::text, 'NO_DATE') || ':' + || es.provider_key || ':' || p.extraction_source_kind || ':' || p.extraction_code + || ':RUN-' || p.import_run_id::text + || ':CHUNK-' || coalesce((p.metadata ->> 'chunkSequence')::integer, 1)::text as aggregate_package_key + from eventhub.data_package p + join eventhub.event_source es + on es.id = p.event_source_id + join params prm + on prm.extraction_package_id = p.id + where p.package_type = 'DB_EXTRACT' +), +camel_batches as ( + select c.* + from eventhub.data_package c + join extract_package e + on c.event_source_id = e.event_source_id + and c.tenant_key = e.tenant_key + where c.package_type = 'CAMEL_BATCH' + and c.package_key like e.aggregate_package_key || ':CAMEL-%' +), +extracted_by_type as ( + select + key as event_type_key, + value::integer as extracted_events + from extract_package e, + jsonb_each_text(coalesce(e.metadata -> 'extractedEventTypeCounts', '{}'::jsonb)) +), +received_by_type as ( + select + kv.key as event_type_key, + sum((kv.value)::integer) as received_events + from camel_batches c, + jsonb_each_text(coalesce(c.metadata -> 'receivedEventTypeCounts', '{}'::jsonb)) kv + group by kv.key +), +imported_by_type as ( + select + e.event_domain || '/' || e.event_type as event_type_key, + count(*) as imported_events + from camel_batches c + join eventhub.event e + on e.data_package_id = c.id + group by e.event_domain || '/' || e.event_type +) +select + coalesce(x.event_type_key, r.event_type_key, i.event_type_key) as event_type_key, + coalesce(x.extracted_events, 0) as extracted_events, + coalesce(r.received_events, 0) as received_events, + coalesce(i.imported_events, 0) as imported_events +from extracted_by_type x +full outer join received_by_type r + on r.event_type_key = x.event_type_key +full outer join imported_by_type i + on i.event_type_key = coalesce(x.event_type_key, r.event_type_key) +order by event_type_key; diff --git a/docs/import-performance-and-error-fixes.md b/docs/import-performance-and-error-fixes.md new file mode 100644 index 0000000..c1bc17b --- /dev/null +++ b/docs/import-performance-and-error-fixes.md @@ -0,0 +1,459 @@ +# Import Performance and Error Fixes + +## Scope + +This document summarizes the event-ingest, master-data, schema, locking, retry, and correctness fixes made in the current optimization round for the EventHub import pipeline. + +It covers: + +- event schema and migration fixes +- event import throughput fixes +- master-data import throughput fixes +- deadlock and transaction-visibility fixes +- connection-reset and retry handling +- async import cursor correctness fixes +- logging and operational visibility improvements + +Main committed changes: + +- `2e6e1aa` - `Optimize ingestion pipeline and reduce import contention` +- `bd3620b` - `Improve vehicle reference caching during ingest` + +Additional related fixes are currently present in the workspace but may not yet be committed. + +## 1. Schema and Migration Fixes + +### 1.1 `event_detail` / hypertable ordering + +Problem: + +- executing `eventhub_schema_create.sql` on an empty database failed with `relation "eventhub.event_detail" does not exist` + +Fix: + +- create `eventhub.event_detail` before `create_hypertable(...)` +- add its foreign key after the hypertable conversion + +Files: + +- `src/main/resources/db/eventhub_schema_create.sql` + +### 1.2 Explicit migrations for event hypertable and source-record support + +Problem: + +- the runtime schema evolution needed explicit migrations for hypertable conversion and `event_source_record` + +Fix: + +- add migration for `source_package_id` on `event` +- add migration for `event` hypertable conversion and FK recreation +- add migration to ensure `event_source_record` exists and is backfilled + +Files: + +- `src/main/resources/db/migration/V9__add_event_source_package_id.sql` +- `src/main/resources/db/migration/V10__make_event_hypertable.sql` +- `src/main/resources/db/migration/V11__ensure_event_source_record.sql` + +## 2. Event Import Throughput Fixes + +### 2.1 Replace per-event inserts with staged set-based writes + +Problem: + +- `EventRepository.batchInsert(...)` originally processed events one by one despite the batch API +- this caused one insert/query cycle per event and poor throughput + +Fix: + +- stage a whole ingest batch into `eventhub_event_import_stage` +- reserve `event_source_record` rows set-wise +- insert `eventhub.event` rows set-wise +- upsert `eventhub.event_detail` rows in batch + +Files: + +- `src/main/java/at/procon/eventhub/persistence/EventRepository.java` + +### 2.2 Fix missing event rows when source records were reserved + +Problem: + +- after the set-based refactor, some runs created `event_source_record` rows without creating `event` rows + +Cause: + +- the insert statement reserved source records and then tried to re-read them through the base table in the same data-modifying CTE chain + +Fix: + +- use the `RETURNING` rows from the source-record reservation CTE directly +- also support already-existing source records that still miss the `event` row + +Files: + +- `src/main/java/at/procon/eventhub/persistence/EventRepository.java` + +### 2.3 Stream extraction instead of materializing full result sets + +Problem: + +- extraction loaded full SQL chunks into memory before handing them to Camel + +Fix: + +- stream rows directly from JDBC to `direct:eventhub-normalized-input` +- keep only counters and watermark information + +Files: + +- `src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java` +- `src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java` + +### 2.4 Increase batch size and enable parallel queue draining + +Problem: + +- the async ingest route drained too slowly with `1000`-event batches and a single consumer + +Fix: + +- raise Camel completion size from `1000` to `5000` +- enable `4` concurrent SEDA consumers + +Files: + +- `src/main/java/at/procon/eventhub/config/EventHubProperties.java` +- `src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java` +- `src/main/resources/application.yml` + +### 2.5 Give each Camel flush its own package key + +Problem: + +- multiple flushes of the same extraction package reused the same `data_package` identity +- logs were misleading and `event_count` on the package row was overwritten by later flushes + +Fix: + +- derive a unique `packageKey` per completed Camel batch using the aggregate package key plus the Camel exchange id +- preserve both the aggregate key and the child key in metadata + +Files: + +- `src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java` + +### 2.6 Improve batch-local entity and vehicle caching + +Problem: + +- after the bulk insert refactor, the main remaining hot path was still reference resolution + +Fixes: + +- cache entity ids in the batch by `entityType + sourceEntityId` +- cache vehicle resolutions inside a batch +- later extend vehicle caching to be range-aware for registration-based assignment lookups: + - direct vehicle identifiers cache without time sensitivity + - registration-based resolutions cache over assignment validity intervals + +Files: + +- `src/main/java/at/procon/eventhub/persistence/EventRepository.java` +- `src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java` + +## 3. Master-Data Import Throughput Fixes + +### 3.1 Set-based master entity and relation upserts + +Problem: + +- source master data was previously written row by row + +Fix: + +- stage master entities and relations into temporary tables +- run set-based `insert ... select ... on conflict do update` + +Files: + +- `src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java` + +### 3.2 Stream and chunk master-data refresh + +Problem: + +- the refresh path loaded large source master-data result sets into memory + +Fix: + +- stream source rows +- flush in chunks of `5000` + +Files: + +- `src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java` + +### 3.3 Bulk vehicle reconciliation from master data + +Problem: + +- reconciling vehicles and registrations from master data was done row by row + +Fix: + +- replace the loop with set-based SQL for vehicles, registrations, and projected assignments + +Files: + +- `src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java` + +## 4. Deadlock and Contention Fixes + +### 4.1 Remove unnecessary hot-row updates on vehicle and registration rows + +Problem: + +- event import updated `vehicle.updated_at` and `vehicle_registration.updated_at` even when no new information was being added +- this created deadlocks under parallel ingest + +Fix: + +- only update `vehicle` when missing `source_vehicle_entity_id` or `vin` can actually be filled +- only update `vehicle_registration` when missing source id, nation, or registration number can actually be filled +- stop using event import as a generic "touch row" path + +Files: + +- `src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java` + +### 4.2 Make event-time source master entity resolution "find or create", not "update on conflict" + +Problem: + +- concurrent event batches could deadlock on `eventhub.source_master_entity` through `INSERT ... ON CONFLICT DO UPDATE` + +Fix: + +- first `SELECT id` +- if missing, `INSERT ... ON CONFLICT DO NOTHING RETURNING id` +- if another transaction won the race, select again +- do not update existing master entity rows during event ingest + +Files: + +- `src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java` + +### 4.3 Fix race handling when `RETURNING` returns no row + +Problem: + +- if a concurrent transaction inserted the entity first, the resolver could still fail unexpectedly + +Fix: + +- allow the `RETURNING` path to yield `null` +- retry with a follow-up `SELECT` + +Files: + +- `src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java` + +## 5. Transaction Visibility and Correctness Fixes + +### 5.1 Remove outer transaction around full tachograph execution + +Problem: + +- master-data refresh logs showed completion, but master-data rows were not visible yet because the outer import method still held the transaction open + +Fix: + +- remove the outer transaction from `startAndExecuteImport(...)` +- keep chunk-level and package-level transactions independent + +Files: + +- `src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java` + +### 5.2 Preserve the original ingest exception if package failure marking also fails + +Problem: + +- when ingest failed and `markFailed(...)` also failed because of a broken connection, the secondary bookkeeping error hid the real root cause + +Fix: + +- wrap `dataPackageRepository.markFailed(...)` in its own `try/catch` +- log the bookkeeping failure +- keep the original ingest exception and attach the bookkeeping failure as suppressed + +Files: + +- `src/main/java/at/procon/eventhub/service/EventHubIngestionService.java` + +### 5.3 Do not advance import cursors before async ingest really finishes + +Problem: + +- extraction previously marked packages imported and advanced `import_cursor` before the async `CAMEL_BATCH` ingest was durably finished +- this could skip source data on the next run if async ingest later failed + +Fix: + +- add grouped child-batch status lookup on `data_package` +- make extraction package completion wait for all derived `CAMEL_BATCH` rows to reach terminal success +- fail the planned extraction package if child batches fail or time out +- only advance the cursor after the async ingest succeeded +- make "Completed import run" mean durable ingest completion instead of extraction completion + +Files: + +- `src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java` +- `src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java` +- `src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java` + +## 6. Connection Reset and Retry Hardening + +### 6.1 Retry transient DB failures in the Camel ingest route + +Problem: + +- long-running imports hit transient failures such as deadlocks and connection resets + +Fix: + +- add Camel redelivery with exponential backoff for: + - `CannotAcquireLockException` + - `PessimisticLockingFailureException` + - `DataAccessResourceFailureException` + - `TransientDataAccessException` + +Files: + +- `src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java` + +### 6.2 Tune Hikari for shorter-lived and healthier pooled connections + +Problem: + +- `SQLSTATE 08006` / `Connection reset` events left broken pool entries behind + +Fix: + +- configure Hikari with explicit pool sizing and connection lifetime / keepalive settings: + - `maximum-pool-size: 16` + - `minimum-idle: 4` + - `connection-timeout: 30000` + - `validation-timeout: 5000` + - `idle-timeout: 300000` + - `keepalive-time: 120000` + - `max-lifetime: 540000` + +Files: + +- `src/main/resources/application.yml` + +## 7. Observability Improvements + +### 7.1 Master-data progress logging + +Added logs for: + +- refresh start +- per-section progress +- per-chunk counts +- `byType` breakdowns +- section completion +- reconciliation start and result + +Files: + +- `src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java` + +### 7.2 Event extraction progress logging + +Added logs for: + +- extraction start +- progress every `5000` mapped events +- final mapped totals with `byType` + +Files: + +- `src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java` + +### 7.3 Event ingest throughput logging + +Added logs for: + +- `receivedCount` +- `insertedCount` +- `elapsedMs` +- `receivedPerSecond` +- `byType` + +Files: + +- `src/main/java/at/procon/eventhub/service/EventHubIngestionService.java` + +### 7.4 Async-ingest wait progress logging + +Added logs for: + +- number of expected child batches +- observed child batches +- successful / failed / importing child-batch counts while the import executor waits for durable completion + +Files: + +- `src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java` + +## 8. Operational Notes + +### Throughput effect seen during the optimization round + +Observed progression during the work: + +- roughly `30` events/sec before the later cache and blocking fixes +- roughly `300` rows/sec after the main contention and stuck-session cleanup work + +This is a major improvement, but large historical backfills are still expensive. + +### What remains expensive + +The main remaining bottleneck is still reference resolution in the ingest hot path, especially: + +- driver entity resolution +- source-package entity resolution +- vehicle / registration lookup and creation + +The next major optimization step would be set-based pre-resolution of references per ingest batch instead of resolving them one event at a time. + +### Safe rerun behavior + +- event ingest remains idempotent through `event_source_record.source_record_key_hash` +- already imported events should generally be kept +- when historical cursor corruption existed, repair should target `import_cursor`, not wholesale deletion of imported events + +## 9. Main Files Touched + +- `src/main/java/at/procon/eventhub/persistence/EventRepository.java` +- `src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java` +- `src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java` +- `src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java` +- `src/main/java/at/procon/eventhub/service/EventHubIngestionService.java` +- `src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java` +- `src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java` +- `src/main/java/at/procon/eventhub/importing/extraction/AbstractJdbcExtractionBatchExecutor.java` +- `src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java` +- `src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java` +- `src/main/java/at/procon/eventhub/tachograph/service/TachographMasterDataRefreshService.java` +- `src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java` +- `src/main/java/at/procon/eventhub/config/EventHubProperties.java` +- `src/main/resources/application.yml` +- `src/main/resources/db/eventhub_schema_create.sql` +- `src/main/resources/db/migration/V9__add_event_source_package_id.sql` +- `src/main/resources/db/migration/V10__make_event_hypertable.sql` +- `src/main/resources/db/migration/V11__ensure_event_source_record.sql` diff --git a/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java b/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java index 662a6ec..eada4f6 100644 --- a/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java +++ b/src/main/java/at/procon/eventhub/camel/EventHubBatchBuildProcessor.java @@ -8,6 +8,7 @@ import at.procon.eventhub.dto.ImportScopeDto; import at.procon.eventhub.service.EventHubEventSorter; import java.time.OffsetDateTime; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.apache.camel.Exchange; @@ -50,6 +51,7 @@ public class EventHubBatchBuildProcessor implements Processor { metadata.put("aggregatePackageKey", aggregatePackageKey); metadata.put("camelExchangeId", exchange.getExchangeId()); metadata.put("eventCount", sortedEvents.size()); + metadata.put("receivedEventTypeCounts", eventTypeCounts(sortedEvents)); if (packageInfo != null) { metadata.put("tenantKey", packageInfo.tenantKey()); metadata.put("eventSource", packageInfo.eventSource().stableKey()); @@ -70,4 +72,14 @@ public class EventHubBatchBuildProcessor implements Processor { metadata )); } + + private Map eventTypeCounts(List events) { + Map counts = new LinkedHashMap<>(); + for (EventHubEventDto event : events) { + String domain = event.eventDomain() == null ? "UNKNOWN_DOMAIN" : event.eventDomain().name(); + String type = event.eventType() == null ? "UNKNOWN_EVENT" : event.eventType().name(); + counts.merge(domain + "/" + type, 1, Integer::sum); + } + return counts; + } } diff --git a/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java b/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java index 20b7404..326acd9 100644 --- a/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java +++ b/src/main/java/at/procon/eventhub/camel/EventHubCommonIngestionRoute.java @@ -2,8 +2,13 @@ package at.procon.eventhub.camel; import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.service.EventHubIngestionService; +import org.springframework.dao.CannotAcquireLockException; +import org.springframework.dao.DataAccessResourceFailureException; +import org.springframework.dao.PessimisticLockingFailureException; +import org.springframework.dao.TransientDataAccessException; import org.apache.camel.builder.RouteBuilder; import org.springframework.stereotype.Component; +import org.springframework.transaction.TransactionSystemException; @Component public class EventHubCommonIngestionRoute extends RouteBuilder { @@ -37,6 +42,19 @@ public class EventHubCommonIngestionRoute extends RouteBuilder { public void configure() { String batchInputUri = batchInputUri(); + onException( + CannotAcquireLockException.class, + PessimisticLockingFailureException.class, + DataAccessResourceFailureException.class, + TransientDataAccessException.class, + TransactionSystemException.class + ) + .maximumRedeliveries(5) + .redeliveryDelay(2000) + .backOffMultiplier(2.0) + .useExponentialBackOff() + .retryAttemptedLogLevel(org.apache.camel.LoggingLevel.WARN); + from("direct:eventhub-normalized-input") .routeId("eventhub-normalized-input-route") .process(validationProcessor) diff --git a/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java b/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java index ed5444a..077424f 100644 --- a/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java +++ b/src/main/java/at/procon/eventhub/importing/AbstractImportExecutionService.java @@ -29,6 +29,7 @@ public abstract class AbstractImportExecutionService 0 && stateChanged) { failedStateObservedAt = Instant.now(); } else if (state.failedCount() == 0) { @@ -263,6 +274,21 @@ public abstract class AbstractImportExecutionService eventTypeCounts() { + return Map.of(); + } } diff --git a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java index 3d29287..49c774a 100644 --- a/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/DataPackageRepository.java @@ -34,6 +34,7 @@ public class DataPackageRepository { this.objectMapper = objectMapper; } + @Transactional(propagation = Propagation.REQUIRES_NEW) public UUID createPackage( int eventSourceId, String packageKey, @@ -43,24 +44,78 @@ public class DataPackageRepository { OffsetDateTime occurredTo, Map metadata ) { - return insertPackage( - eventSourceId, - null, - packageKey, - packageInfo, - packageType, - DataPackageStatus.IMPORTING, - occurredFrom, - occurredTo, - null, - null, - null, - null, - null, - null, - null, - null, - metadata + UUID id = UUID.randomUUID(); + SourceGroupRefDto sourceGroup = packageInfo == null ? null : packageInfo.sourceGroup(); + ImportScopeDto importScope = packageInfo == null ? null : packageInfo.importScope(); + SourceGroupRefDto rootOrg = importScope == null ? null : importScope.rootSourceOrganisation(); + + return jdbcTemplate.query( + con -> { + var ps = con.prepareStatement(""" + insert into eventhub.data_package( + id, event_source_id, import_run_id, tenant_key, package_key, package_type, status, + source_group_type, source_group_entity_id, source_group_code, source_group_name, + import_scope_type, root_source_org_entity_id, root_source_org_code, root_source_org_name, + include_children, occurred_from, occurred_to, + event_family, business_date, external_package_id, + extraction_code, extraction_source_kind, entity_axis, batch_no, chunk_from, chunk_to, + source_package_kind, source_package_id, source_package_entity_id, + source_package_period_from, source_package_period_to, source_package_imported_at, + received_at, event_count, metadata + ) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, now(), ?, ?::jsonb) + on conflict (tenant_key, event_source_id, package_key) do update + set status = excluded.status, + occurred_from = excluded.occurred_from, + occurred_to = excluded.occurred_to, + event_count = excluded.event_count, + metadata = excluded.metadata, + error_message = null, + completed_at = null + returning id + """); + ps.setObject(1, id); + ps.setInt(2, eventSourceId); + ps.setObject(3, null); + ps.setString(4, packageInfo == null ? "default" : packageInfo.tenantKey()); + ps.setString(5, packageKey); + ps.setString(6, packageType.name()); + ps.setString(7, DataPackageStatus.IMPORTING.name()); + ps.setString(8, sourceGroup == null || sourceGroup.type() == null ? null : sourceGroup.type().name()); + ps.setString(9, sourceGroup == null ? null : sourceGroup.sourceEntityId()); + ps.setString(10, sourceGroup == null ? null : sourceGroup.code()); + ps.setString(11, sourceGroup == null ? null : sourceGroup.name()); + ps.setString(12, importScope == null || importScope.type() == null ? null : importScope.type().name()); + ps.setString(13, rootOrg == null ? null : rootOrg.sourceEntityId()); + ps.setString(14, rootOrg == null ? null : rootOrg.code()); + ps.setString(15, rootOrg == null ? null : rootOrg.name()); + ps.setBoolean(16, importScope != null && importScope.includeChildren()); + ps.setObject(17, occurredFrom); + ps.setObject(18, occurredTo); + ps.setString(19, packageInfo == null ? null : packageInfo.eventFamily()); + ps.setObject(20, packageInfo == null ? null : packageInfo.businessDate()); + ps.setString(21, packageInfo == null ? packageKey : packageInfo.externalPackageId()); + ps.setString(22, null); + ps.setString(23, null); + ps.setString(24, null); + ps.setObject(25, null); + ps.setObject(26, null); + ps.setObject(27, null); + ps.setString(28, null); + ps.setString(29, null); + ps.setString(30, null); + ps.setObject(31, null); + ps.setObject(32, null); + ps.setObject(33, null); + ps.setInt(34, 0); + ps.setString(35, toJson(metadata)); + return ps; + }, + rs -> { + if (!rs.next()) { + throw new IllegalStateException("Could not create or resolve data package " + packageKey); + } + return (UUID) rs.getObject(1); + } ); } @@ -198,6 +253,31 @@ public class DataPackageRepository { ); } + public void updateEventCount(UUID packageId, int eventCount) { + jdbcTemplate.update( + """ + update eventhub.data_package + set event_count = ? + where id = ? + """, + eventCount, + packageId + ); + } + + public void mergeMetadata(UUID packageId, Map metadata) { + jdbcTemplate.update( + """ + update eventhub.data_package + set metadata = coalesce(metadata, '{}'::jsonb) || ?::jsonb + where id = ? + """, + toJson(metadata), + packageId + ); + } + + @Transactional(propagation = Propagation.REQUIRES_NEW) public void markImported(UUID packageId, int insertedCount) { jdbcTemplate.update( """ @@ -225,6 +305,44 @@ public class DataPackageRepository { ); } + public CamelBatchGroupStatus findCamelBatchGroupStatus(int eventSourceId, String tenantKey, String aggregatePackageKey) { + return jdbcTemplate.query( + """ + select count(*) as total_count, + count(*) filter (where status in (?, ?)) as success_count, + count(*) filter (where status = ?) as failed_count, + count(*) filter (where status = ?) as importing_count, + coalesce(max(error_message) filter (where status = ?), '') as failed_message + from eventhub.data_package + where event_source_id = ? + and tenant_key = ? + and package_type = ? + and package_key like ? + """, + rs -> { + if (!rs.next()) { + return CamelBatchGroupStatus.empty(); + } + return new CamelBatchGroupStatus( + rs.getInt("total_count"), + rs.getInt("success_count"), + rs.getInt("failed_count"), + rs.getInt("importing_count"), + rs.getString("failed_message") + ); + }, + DataPackageStatus.IMPORTED.name(), + DataPackageStatus.EMPTY.name(), + DataPackageStatus.FAILED.name(), + DataPackageStatus.IMPORTING.name(), + DataPackageStatus.FAILED.name(), + eventSourceId, + tenantKey, + DataPackageType.CAMEL_BATCH.name(), + aggregatePackageKey + ":CAMEL-%" + ); + } + private String toJson(Map value) { try { return objectMapper.writeValueAsString(normalizeMetadataMap(value)); @@ -291,4 +409,16 @@ public class DataPackageRepository { } return value.toString(); } + + public record CamelBatchGroupStatus( + int totalCount, + int successCount, + int failedCount, + int importingCount, + String failedMessage + ) { + public static CamelBatchGroupStatus empty() { + return new CamelBatchGroupStatus(0, 0, 0, 0, ""); + } + } } diff --git a/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java b/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java index 045198a..ce3f83b 100644 --- a/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/SourceMasterDataRepository.java @@ -320,10 +320,7 @@ public class SourceMasterDataRepository { }, rs -> { if (!rs.next()) { - throw new IllegalStateException( - "Could not resolve source master entity id for " - + normalizedTenantKey + ":" + eventSourceId + ":" + normalizedEntityType + ":" + normalizedSourceEntityId - ); + return null; } return (UUID) rs.getObject(1); } diff --git a/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java b/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java index 671e0f0..ab38373 100644 --- a/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java +++ b/src/main/java/at/procon/eventhub/persistence/VehicleIdentityRepository.java @@ -524,13 +524,13 @@ public class VehicleIdentityRepository { jdbcTemplate.update( """ update eventhub.vehicle - set source_vehicle_entity_id = coalesce(source_vehicle_entity_id, ?), - vin = coalesce(vin, ?), + set source_vehicle_entity_id = coalesce(source_vehicle_entity_id, cast(? as text)), + vin = coalesce(vin, cast(? as text)), updated_at = now() where id = ? and ( - (source_vehicle_entity_id is null and ? is not null) - or (vin is null and ? is not null) + (source_vehicle_entity_id is null and cast(? as text) is not null) + or (vin is null and cast(? as text) is not null) ) """, sourceVehicleEntityId, @@ -595,15 +595,15 @@ public class VehicleIdentityRepository { jdbcTemplate.update( """ update eventhub.vehicle_registration - set source_registration_entity_id = coalesce(source_registration_entity_id, ?), - nation = coalesce(?, nation), - registration_number = coalesce(?, registration_number), + set source_registration_entity_id = coalesce(source_registration_entity_id, cast(? as text)), + nation = coalesce(cast(? as text), nation), + registration_number = coalesce(cast(? as text), registration_number), updated_at = now() where id = ? and ( - (source_registration_entity_id is null and ? is not null) - or (nation is null and ? is not null) - or (registration_number is null and ? is not null) + (source_registration_entity_id is null and cast(? as text) is not null) + or (nation is null and cast(? as text) is not null) + or (registration_number is null and cast(? as text) is not null) ) """, sourceRegistrationEntityId, diff --git a/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java index 56eabec..d17aa1b 100644 --- a/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java +++ b/src/main/java/at/procon/eventhub/service/EventHubIngestionService.java @@ -15,7 +15,8 @@ import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.support.TransactionTemplate; @Service public class EventHubIngestionService { @@ -27,26 +28,29 @@ public class EventHubIngestionService { private final EventRepository eventRepository; private final EventHubEventValidator validator; private final EventHubEventSorter sorter; + private final TransactionTemplate transactionTemplate; public EventHubIngestionService( EventSourceRepository eventSourceRepository, DataPackageRepository dataPackageRepository, EventRepository eventRepository, EventHubEventValidator validator, - EventHubEventSorter sorter + EventHubEventSorter sorter, + PlatformTransactionManager transactionManager ) { this.eventSourceRepository = eventSourceRepository; this.dataPackageRepository = dataPackageRepository; this.eventRepository = eventRepository; this.validator = validator; this.sorter = sorter; + this.transactionTemplate = new TransactionTemplate(transactionManager); } - @Transactional public EventHubPackageResult ingest(EventHubEventBatchDto batch) { if (batch == null || batch.events().isEmpty()) { return new EventHubPackageResult(null, batch == null ? null : batch.packageKey(), 0, 0); } + long startedAtNanos = System.nanoTime(); EventHubPackageRequest packageInfo = batch.packageInfo(); if (packageInfo == null) { @@ -57,6 +61,7 @@ public class EventHubIngestionService { } EventSourceDto eventSource = packageInfo.eventSource(); + String tenantKey = packageInfo.tenantKey(); int eventSourceId = eventSourceRepository.resolveSourceId(packageInfo.tenantKey(), eventSource); List sortedEvents = sorter.sort(batch.events()); sortedEvents.forEach(validator::validate); @@ -72,13 +77,26 @@ public class EventHubIngestionService { ); try { - int insertedCount = eventRepository.batchInsert(packageId, packageInfo.tenantKey(), eventSourceId, sortedEvents); + Integer insertedCount = transactionTemplate.execute(status -> + eventRepository.batchInsert(packageId, tenantKey, eventSourceId, sortedEvents) + ); + if (insertedCount == null) { + throw new IllegalStateException("EventHub batch insert returned no result for package " + batch.packageKey()); + } dataPackageRepository.markImported(packageId, insertedCount); - log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={} byType={}", - packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount, eventTypeCounts(sortedEvents)); + long elapsedMs = Math.max(1L, (System.nanoTime() - startedAtNanos) / 1_000_000L); + long receivedPerSecond = Math.max(1L, Math.round(sortedEvents.size() * 1000.0 / elapsedMs)); + log.info("Imported EventHub acquisition package packageId={} packageKey={} source={} receivedCount={} insertedCount={} elapsedMs={} receivedPerSecond={} byType={}", + packageId, batch.packageKey(), eventSource.stableKey(), sortedEvents.size(), insertedCount, elapsedMs, receivedPerSecond, eventTypeCounts(sortedEvents)); return new EventHubPackageResult(packageId, batch.packageKey(), sortedEvents.size(), insertedCount); } catch (RuntimeException ex) { - dataPackageRepository.markFailed(packageId, ex.getMessage()); + try { + dataPackageRepository.markFailed(packageId, ex.getMessage()); + } catch (RuntimeException markFailedEx) { + ex.addSuppressed(markFailedEx); + log.warn("Failed to mark EventHub acquisition package as failed packageId={} packageKey={}", + packageId, batch.packageKey(), markFailedEx); + } throw ex; } } diff --git a/src/main/java/at/procon/eventhub/tachograph/dto/TachographExtractionBatchResultDto.java b/src/main/java/at/procon/eventhub/tachograph/dto/TachographExtractionBatchResultDto.java index 29557c9..d9cbb4e 100644 --- a/src/main/java/at/procon/eventhub/tachograph/dto/TachographExtractionBatchResultDto.java +++ b/src/main/java/at/procon/eventhub/tachograph/dto/TachographExtractionBatchResultDto.java @@ -2,6 +2,7 @@ package at.procon.eventhub.tachograph.dto; import at.procon.eventhub.importing.ExtractionBatchResult; import java.time.OffsetDateTime; +import java.util.Map; import java.util.UUID; public record TachographExtractionBatchResultDto( @@ -16,6 +17,11 @@ public record TachographExtractionBatchResultDto( OffsetDateTime lastSourcePackageImportedAt, String lastSourcePackageId, OffsetDateTime lastSourceRowUpdatedAt, - OffsetDateTime lastOccurredTo + OffsetDateTime lastOccurredTo, + Map eventTypeCounts ) implements ExtractionBatchResult { + + public TachographExtractionBatchResultDto { + eventTypeCounts = eventTypeCounts == null ? Map.of() : Map.copyOf(eventTypeCounts); + } } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java index 175bc62..1cc0ac9 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/JdbcTachographExtractionBatchExecutor.java @@ -84,7 +84,8 @@ public class JdbcTachographExtractionBatchExecutor lastSourcePackageImportedAt(stats, cursor), lastSourcePackageId(stats, cursor), null, - chunk.occurredTo() + chunk.occurredTo(), + stats.eventTypeCounts() ); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java b/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java index fb4a161..8364158 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/NoopTachographExtractionBatchExecutor.java @@ -42,7 +42,8 @@ public class NoopTachographExtractionBatchExecutor null, null, null, - null + null, + java.util.Map.of() ); } diff --git a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java index 216ebad..42d417a 100644 --- a/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java +++ b/src/main/java/at/procon/eventhub/tachograph/service/TachographImportExecutionService.java @@ -1,5 +1,6 @@ package at.procon.eventhub.tachograph.service; +import at.procon.eventhub.config.EventHubProperties; import at.procon.eventhub.dto.EventSourceDto; import at.procon.eventhub.importing.AbstractImportExecutionService; import at.procon.eventhub.importing.ImportPlanDto; @@ -32,10 +33,11 @@ public class TachographImportExecutionService ImportRunRepository importRunRepository, DataPackageRepository dataPackageRepository, ImportCursorRepository importCursorRepository, + EventHubProperties eventHubProperties, TachographMasterDataRefreshService masterDataRefreshService, TachographExtractionBatchExecutor extractionBatchExecutor ) { - super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository); + super(eventSourceRepository, importRunRepository, dataPackageRepository, importCursorRepository, eventHubProperties); this.planService = planService; this.masterDataRefreshService = masterDataRefreshService; this.extractionBatchExecutor = extractionBatchExecutor; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 07f0b5f..2f52350 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -5,12 +5,29 @@ spring: url: jdbc:postgresql://localhost:5432/eventhub username: postgres password: P54!pcd#Wi + hikari: + maximum-pool-size: 16 + minimum-idle: 4 + connection-timeout: 30000 + validation-timeout: 5000 + idle-timeout: 60000 + keepalive-time: 30000 + max-lifetime: 90000 flyway: enabled: true default-schema: eventhub schemas: eventhub create-schemas: true +logging: + file: + name: ${EVENTHUB_LOG_FILE:logs/eventhub-ingestion-service.log} + logback: + rollingpolicy: + max-file-size: ${EVENTHUB_LOG_MAX_FILE_SIZE:50MB} + max-history: ${EVENTHUB_LOG_MAX_HISTORY:14} + total-size-cap: ${EVENTHUB_LOG_TOTAL_SIZE_CAP:1GB} + server: port: 8085 diff --git a/src/main/resources/logback-spring.xml b/src/main/resources/logback-spring.xml new file mode 100644 index 0000000..6ad8ceb --- /dev/null +++ b/src/main/resources/logback-spring.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX} %-5level ${PID:-unknown} --- [%thread] %logger{39} : %msg%n%ex + UTF-8 + + + + + ${logFile} + true + true + + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX} %-5level ${PID:-unknown} --- [%thread] %logger{39} : %msg%n%ex + UTF-8 + + + ${logFile}.%d{yyyy-MM-dd}.%i.gz + ${maxFileSize} + ${maxHistory} + ${totalSizeCap} + + + + + + + + + diff --git a/src/main/resources/sql/tachograph/card-activity.sql b/src/main/resources/sql/tachograph/card-activity.sql index 7a01a53..313c1b8 100644 --- a/src/main/resources/sql/tachograph/card-activity.sql +++ b/src/main/resources/sql/tachograph/card-activity.sql @@ -69,7 +69,7 @@ Base as ( ca.card_filelog_id, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, ca.ID_FileLog, ca.cda_filelog_id, ca.card_filelog_id, ca.ID) as source_package_id_raw, + coalesce(fl.ID, ca.ID_FileLog, ca.cda_filelog_id, ca.card_filelog_id) as source_package_id_raw, coalesce(fl.ID_Card, ca.card_id) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, ca.RecordDate) as source_package_period_from, coalesce(fl.DownloadTo, ca.RecordDateTo, dateadd(day, 1, ca.RecordDate)) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/card-border-crossing.sql b/src/main/resources/sql/tachograph/card-border-crossing.sql index 2c27c1b..781b940 100644 --- a/src/main/resources/sql/tachograph/card-border-crossing.sql +++ b/src/main/resources/sql/tachograph/card-border-crossing.sql @@ -27,7 +27,7 @@ Base as ( vehicleNation.AlphaCode as vehicle_registration_nation, v.VRN as vehicle_registration_number, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, border.ID_FileLog, border.ID) as source_package_id_raw, + coalesce(fl.ID, border.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_Card, border.ID_Card) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, border.Timestamp) as source_package_period_from, coalesce(fl.DownloadTo, border.Timestamp) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/card-load-unload.sql b/src/main/resources/sql/tachograph/card-load-unload.sql index ec837e4..4d25fc5 100644 --- a/src/main/resources/sql/tachograph/card-load-unload.sql +++ b/src/main/resources/sql/tachograph/card-load-unload.sql @@ -26,7 +26,7 @@ Base as ( vehicleNation.AlphaCode as vehicle_registration_nation, v.VRN as vehicle_registration_number, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, lu.ID_FileLog, lu.ID) as source_package_id_raw, + coalesce(fl.ID, lu.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_Card, lu.ID_Card) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, lu.Timestamp) as source_package_period_from, coalesce(fl.DownloadTo, lu.Timestamp) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/card-place.sql b/src/main/resources/sql/tachograph/card-place.sql index 9c37432..3bded03 100644 --- a/src/main/resources/sql/tachograph/card-place.sql +++ b/src/main/resources/sql/tachograph/card-place.sql @@ -28,7 +28,7 @@ Base as ( vehicleNation.AlphaCode as vehicle_registration_nation, v.VRN as vehicle_registration_number, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, place.ID_FileLog, place.ID) as source_package_id_raw, + coalesce(fl.ID, place.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_Card, place.ID_Card) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, place.EntryTime) as source_package_period_from, coalesce(fl.DownloadTo, place.EntryTime) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/card-position.sql b/src/main/resources/sql/tachograph/card-position.sql index a1fec8e..f5797cf 100644 --- a/src/main/resources/sql/tachograph/card-position.sql +++ b/src/main/resources/sql/tachograph/card-position.sql @@ -25,7 +25,7 @@ Base as ( vehicleNation.AlphaCode as vehicle_registration_nation, v.VRN as vehicle_registration_number, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, pos.ID_FileLog, pos.ID) as source_package_id_raw, + coalesce(fl.ID, pos.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_Card, pos.ID_Card) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, pos.Timestamp) as source_package_period_from, coalesce(fl.DownloadTo, pos.Timestamp) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/card-specific-condition.sql b/src/main/resources/sql/tachograph/card-specific-condition.sql index b2590be..7dd677f 100644 --- a/src/main/resources/sql/tachograph/card-specific-condition.sql +++ b/src/main/resources/sql/tachograph/card-specific-condition.sql @@ -23,7 +23,7 @@ Base as ( vehicleNation.AlphaCode as vehicle_registration_nation, v.VRN as vehicle_registration_number, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, cond.ID_FileLog, cond.ID) as source_package_id_raw, + coalesce(fl.ID, cond.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_Card, cond.ID_Card) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, cond.EntryTime) as source_package_period_from, coalesce(fl.DownloadTo, cond.EntryTime) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/card-vehicles-used.sql b/src/main/resources/sql/tachograph/card-vehicles-used.sql index 8e650f4..819e884 100644 --- a/src/main/resources/sql/tachograph/card-vehicles-used.sql +++ b/src/main/resources/sql/tachograph/card-vehicles-used.sql @@ -65,7 +65,7 @@ Base as ( evt.occurred_at, evt.odometer_m, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, used.ID_FileLog, used.ID) as source_package_id_raw, + coalesce(fl.ID, used.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_Card, used.ID_Card) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, used.FirstUse) as source_package_period_from, coalesce(fl.DownloadTo, used.LastUse, used.FirstUse) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/iw-cycle.sql b/src/main/resources/sql/tachograph/iw-cycle.sql index 96c5a39..5c3862d 100644 --- a/src/main/resources/sql/tachograph/iw-cycle.sql +++ b/src/main/resources/sql/tachograph/iw-cycle.sql @@ -56,7 +56,7 @@ Base as ( evt.occurred_at, evt.odometer_m, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, cycle.ID_FileLog, cycle.vui_filelog_id, cycle.ID) as source_package_id_raw, + coalesce(fl.ID, cycle.ID_FileLog, cycle.vui_filelog_id) as source_package_id_raw, coalesce(fl.ID_VehicleIdentification, cycle.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, cycle.BeginTime) as source_package_period_from, coalesce(fl.DownloadTo, cycle.EndTime, cycle.BeginTime) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/speeding-events.sql b/src/main/resources/sql/tachograph/speeding-events.sql index b4a8b44..1cce29f 100644 --- a/src/main/resources/sql/tachograph/speeding-events.sql +++ b/src/main/resources/sql/tachograph/speeding-events.sql @@ -28,7 +28,7 @@ Base as ( vi.ID as vehicle_identification_id, vi.VIN as vehicle_vin, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, speeding.ID_FileLog, vui.ID_FileLog, speeding.ID) as source_package_id_raw, + coalesce(fl.ID, speeding.ID_FileLog, vui.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, speeding.BeginTime) as source_package_period_from, coalesce(fl.DownloadTo, speeding.EndTime, speeding.BeginTime) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/vu-activity.sql b/src/main/resources/sql/tachograph/vu-activity.sql index 35a5933..c0b10ae 100644 --- a/src/main/resources/sql/tachograph/vu-activity.sql +++ b/src/main/resources/sql/tachograph/vu-activity.sql @@ -102,7 +102,7 @@ Base as ( va.vui_filelog_id, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, va.ID_FileLog, va.vda_filelog_id, va.vui_filelog_id, va.ID) as source_package_id_raw, + coalesce(fl.ID, va.ID_FileLog, va.vda_filelog_id, va.vui_filelog_id) as source_package_id_raw, coalesce(fl.ID_VehicleIdentification, va.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, va.RecordDate) as source_package_period_from, coalesce(fl.DownloadTo, dateadd(day, 1, va.RecordDate)) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/vu-border-crossing.sql b/src/main/resources/sql/tachograph/vu-border-crossing.sql index 584768c..8c579df 100644 --- a/src/main/resources/sql/tachograph/vu-border-crossing.sql +++ b/src/main/resources/sql/tachograph/vu-border-crossing.sql @@ -27,7 +27,7 @@ Base as ( vi.ID as vehicle_identification_id, vi.VIN as vehicle_vin, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, border.ID_FileLog, vui.ID_FileLog, border.ID) as source_package_id_raw, + coalesce(fl.ID, border.ID_FileLog, vui.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, border.Timestamp) as source_package_period_from, coalesce(fl.DownloadTo, border.Timestamp) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/vu-load-unload.sql b/src/main/resources/sql/tachograph/vu-load-unload.sql index 0716b65..90fc3a3 100644 --- a/src/main/resources/sql/tachograph/vu-load-unload.sql +++ b/src/main/resources/sql/tachograph/vu-load-unload.sql @@ -26,7 +26,7 @@ Base as ( vi.ID as vehicle_identification_id, vi.VIN as vehicle_vin, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, lu.ID_FileLog, vui.ID_FileLog, lu.ID) as source_package_id_raw, + coalesce(fl.ID, lu.ID_FileLog, vui.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, lu.Timestamp) as source_package_period_from, coalesce(fl.DownloadTo, lu.Timestamp) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/vu-place.sql b/src/main/resources/sql/tachograph/vu-place.sql index f24e0d5..b33f7e8 100644 --- a/src/main/resources/sql/tachograph/vu-place.sql +++ b/src/main/resources/sql/tachograph/vu-place.sql @@ -27,7 +27,7 @@ Base as ( vi.ID as vehicle_identification_id, vi.VIN as vehicle_vin, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, place.ID_FileLog, vui.ID_FileLog, place.ID) as source_package_id_raw, + coalesce(fl.ID, place.ID_FileLog, vui.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, place.EntryTime) as source_package_period_from, coalesce(fl.DownloadTo, place.EntryTime) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/vu-position.sql b/src/main/resources/sql/tachograph/vu-position.sql index 1077be8..95153c8 100644 --- a/src/main/resources/sql/tachograph/vu-position.sql +++ b/src/main/resources/sql/tachograph/vu-position.sql @@ -25,7 +25,7 @@ Base as ( vi.ID as vehicle_identification_id, vi.VIN as vehicle_vin, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, pos.ID_FileLog, vui.ID_FileLog, pos.ID) as source_package_id_raw, + coalesce(fl.ID, pos.ID_FileLog, vui.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, pos.Timestamp) as source_package_period_from, coalesce(fl.DownloadTo, pos.Timestamp) as source_package_period_to, diff --git a/src/main/resources/sql/tachograph/vu-specific-condition.sql b/src/main/resources/sql/tachograph/vu-specific-condition.sql index 6031660..bfe22ae 100644 --- a/src/main/resources/sql/tachograph/vu-specific-condition.sql +++ b/src/main/resources/sql/tachograph/vu-specific-condition.sql @@ -19,7 +19,7 @@ Base as ( vi.ID as vehicle_identification_id, vi.VIN as vehicle_vin, coalesce(fl.DownloadDate, fl.OriginalDownloadDate, fl.TStamp, fl.CreationDate) as received_partner_at, - coalesce(fl.ID, cond.ID_FileLog, vui.ID_FileLog, cond.ID) as source_package_id_raw, + coalesce(fl.ID, cond.ID_FileLog, vui.ID_FileLog) as source_package_id_raw, coalesce(fl.ID_VehicleIdentification, vui.ID_VehicleIdentification) as source_package_entity_id_raw, coalesce(fl.DownloadFrom, cond.EntryTime) as source_package_period_from, coalesce(fl.DownloadTo, cond.EntryTime) as source_package_period_to,