# Driver Home / NotHome Classification + Trip Segmentation — Business Rules ```text # ════════════════════════════════════════════════════════════ # Source: Tachograph M_ (Vehicle Unit, GNSS positions) # + C_ (Driver Card, activities / insertion + removal) # ════════════════════════════════════════════════════════════ # ───────────────────── Constants ───────────────────── CONST NDI_LONG = 7.5h # Threshold for a "long" NDI CONST NDI_VERY_LONG = 24h CONST CARD_REMOVAL_PCT = 0.80 # 80% of the NDI duration CONST VISIT_SHARE = 0.25 # 25% of all (NDI > 7.5h) CONST DBSCAN_EPS = 150m # Geographic metric (Haversine / PostGIS) CONST DBSCAN_MIN_PTS = 3 ENUM HomeStatus { HOME, NOT_HOME } # ───────────────────── Data Model ───────────────────── STRUCT DI: # Driving interval driverId vehicleId start; end posStart; posEnd # (x,y) from M_, nullable gnssTrace[] # (ts, x, y) supporting points from M_ STRUCT NDI: # Non-driving interval (rest) driverId vehicleStart; vehicleEnd # Vehicle of the previous / next DI start; end # = previous.end / next.start pos = null # Assigned (x,y), nullable cardOut = null # Interval [removal, insertion], nullable cluster = null # DBSCAN cluster ID (including NOISE) status = null # HOME | NOT_HOME FUNCTION dur(iv) = iv.end - iv.start # ═══════════ 1. Derive NDIs from consecutive DIs ═══════════ FUNCTION buildNDIs(driverId, dis): # dis sorted chronologically out = [] FOR i IN 1 .. len(dis)-1: prev = dis[i-1]; next = dis[i] out += NDI { driverId : driverId vehicleStart : prev.vehicleId vehicleEnd : next.vehicleId start : prev.end end : next.start pos : assignPos(prev, next) cardOut : cardRemovalInterval(prev.end, next.start) # from C_ events } RETURN out # The vehicle is stationary during the NDI -> assign the last known position FUNCTION assignPos(prev, next): RETURN prev.posEnd ?? next.posStart # nullable # ═══════════ 2. Location Statistics + DBSCAN ═══════════ # Only NDIs > 7.5h with a known position are included in the statistics FUNCTION clusterLongNDIs(allNDIs): longs = [n IN allNDIs WHERE dur(n) > NDI_LONG AND n.pos != null] labels = DBSCAN(points = [n.pos FOR n IN longs], eps = DBSCAN_EPS, minPts = DBSCAN_MIN_PTS, metric = HAVERSINE) FOR n, lbl IN zip(longs, labels): n.cluster = lbl RETURN longs # including the NOISE label # ═══════════ 3. Determine Home Locations ═══════════ FUNCTION determineHomeLocations(longs): # 3a Company home locations (depots): clusters containing > 25% of ALL long NDIs totalCompany = count(longs) # Denominator = all long NDIs companyHome = { c FOR (c, visits) IN groupByCluster(longs) WHERE c != NOISE AND count(visits) / totalCompany > VISIT_SHARE } # 3b Driver home locations (private): for each driver, clusters containing # > 25% of HIS/HER long NDIs, excluding clusters that overlap # with the company clusters driverHome = {} # driverId -> Set FOR (driverId, dnNDIs) IN groupByDriver(longs): totalDriver = count(dnNDIs) FOR (c, visits) IN groupByCluster(dnNDIs): IF c != NOISE AND count(visits) / totalDriver > VISIT_SHARE AND c NOT IN companyHome: # Remove overlap with company locations driverHome[driverId] += c RETURN (companyHome, driverHome) # ═══════════ 4. Home / NotHome Classification ═══════════ FUNCTION classify(n, companyHome, driverHome): # A Card inserted into another vehicle at the end of the NDI IF n.vehicleStart != n.vehicleEnd: RETURN HOME # B Card removed for > 80% of the NDI IF n.cardOut != null AND dur(n.cardOut) > CARD_REMOVAL_PCT * dur(n): RETURN HOME # C Rest > 24h IF dur(n) > NDI_VERY_LONG: RETURN HOME # D No position known IF n.pos == null: RETURN (dur(n) > NDI_LONG) ? HOME : NOT_HOME # E Position known + long NDI -> decide using home-location clusters IF dur(n) > NDI_LONG: IF n.cluster IN companyHome OR n.cluster IN driverHome[n.driverId]: RETURN HOME # Depot / private home ELSE: RETURN NOT_HOME # Overnight stay in the vehicle # Short rest near/at the vehicle RETURN NOT_HOME # ═══════════ 5. Border Crossings -> Trip Segments ═══════════ STRUCT TripSegment: driverId; vehicleId start; end countryFrom; countryTo posFrom; posTo # BorderCrossing = explicit tachograph event (Smart Tachograph v2) # OR country change in the reverse-geocoded GNSS trace FUNCTION buildTripSegments(driverId, dis): # chronological segs = [] segStart = dis[0].start posFrom = dis[0].posStart country = countryOf(dis[0].posStart) # PostGIS / Nominatim FOR di IN dis: FOR p IN di.gnssTrace: # (ts, x, y) from M_ c = countryOf(p) IF c != country: # -> Border crossing segs += TripSegment { driverId : driverId vehicleId : di.vehicleId start : segStart end : p.ts countryFrom : country countryTo : c posFrom : posFrom posTo : p } segStart = p.ts; posFrom = p; country = c # Final segment (last section without another crossing) segs += TripSegment { driverId : driverId vehicleId : dis[last].vehicleId start : segStart end : dis[last].end countryFrom : country countryTo : country posFrom : posFrom posTo : dis[last].posEnd } RETURN segs # ═══════════ Orchestration ═══════════ FUNCTION run(files_M, files_C): acts = parseTacho(files_M, files_C) # DIs + card events allNDIs = [] segments = [] FOR (driverId, dis) IN groupDIsByDriver(acts): dis = sortByTime(dis) allNDIs += buildNDIs(driverId, dis) segments += buildTripSegments(driverId, dis) # Two passes: first cluster, then classify longs = clusterLongNDIs(allNDIs) (companyHome, driverHome) = determineHomeLocations(longs) FOR n IN allNDIs: n.status = classify(n, companyHome, driverHome) RETURN (allNDIs, segments) ```