Stream processing
The stream processing layer deploys ten independent Flink applications on Amazon Managed Service for Apache Flink for real-time telemetry processing, plus two additional applications for FleetWise Edge integration. Each application consumes from a dedicated Kafka topic and writes to its own DynamoDB table, ensuring separation of concerns and independent scaling.
Flink applications
Ten applications process different aspects of telemetry data. All share a single JAR artifact with a UniversalProcessor entry point that routes to the correct processor class based on application configuration.
| Application | Kafka Topic | DynamoDB Table | Purpose |
|---|---|---|---|
|
SimulatorPreprocessor |
|
— |
Decodes gzip+base64 telemetry from the simulator, outputs clean JSON to |
|
EventDrivenTelemetryProcessor |
|
— |
Routes messages to domain-specific topics and writes Last Known State to Redis |
|
TelemetryProcessor |
|
|
Writes telemetry records, tags with tripId |
|
TripProcessor |
|
|
Trip lifecycle (create/update/complete) |
|
SafetyProcessor |
|
|
Safety event detection |
|
MaintenanceProcessor |
|
|
Maintenance alert generation |
|
FWTelemetryProcessor |
|
— |
Decodes FWE protobuf, maps CAN signals to standard format, outputs to |
|
CampaignSyncProcessor |
|
|
Resolves campaigns, pushes decoder manifests and collection schemes to FWE agents via IoT Core MQTT |
|
GeofenceProcessor |
|
|
Evaluates vehicle positions against active geofences, generates boundary crossing events |
|
OEMTelemetryProcessor |
|
— |
Transforms OEM-specific telemetry to standard format using S3-hosted transform manifests, outputs to |
FleetWise telemetry processor
The FWTelemetryProcessor decodes protobuf-encoded telemetry uploaded by FleetWise Edge agents. It reads the decoder manifest from DynamoDB to map CAN signal IDs to human-readable signal names (for example, signal ID 1 maps to Vehicle.Speed), converts units to the standard format, and outputs JSON records to the cms-telemetry-preprocessed Kafka topic. This enables the same downstream processors (TripProcessor, SafetyProcessor, MaintenanceProcessor) to process FleetWise telemetry without modification.
The processor maps all 66 signals defined in the decoder manifest, including speed, engine temperature, tire pressure (all four wheels), battery voltage, GPS coordinates, and ignition status.
Campaign sync processor
The CampaignSyncProcessor manages the FleetWise Edge agent lifecycle:
-
Agent publishes a checkin protobuf to
cms/fleetwise/vehicles/{vin}/checkins -
IoT Rule routes the checkin to the
fw-checkinKafka topic -
CampaignSyncProcessor consumes the checkin and extracts the vehicle VIN
-
Processor queries the
cms-dev-campaignsDynamoDB table for active campaigns targeting the vehicle -
Processor generates protobuf decoder manifest and collection scheme messages
-
Processor publishes the schemes to
cms/fleetwise/vehicles/{vin}/collection_schemes_and_decoder_manifestsvia IoT Core MQTT -
Agent receives the schemes and begins collecting the specified signals
Campaign sync status is tracked per vehicle:
-
PENDING — Campaign has been pushed to the agent but the agent has not yet confirmed receipt
-
HEALTHY — Agent confirmed receipt of the collection scheme in its checkin
document_sync_ids
When all campaigns for a vehicle are suspended, the processor pushes an empty CollectionSchemes protobuf to clear the agent’s active collection.
Simulator preprocessor
The SimulatorPreprocessor decodes gzip-compressed, base64-encoded telemetry messages published by the MQTT Direct simulator. The simulator compresses telemetry payloads to reduce IoT Core message costs and bandwidth. This processor reads from the cms-telemetry-raw Kafka topic, base64-decodes and gunzips each message, validates the resulting JSON, and writes clean JSON records to the cms-telemetry-preprocessed topic.
Pipeline:
-
Simulator publishes gzip+base64 telemetry to IoT Core
-
IoT Rule routes to MSK
cms-telemetry-rawtopic -
SimulatorPreprocessor decodes and decompresses
-
Clean JSON written to
cms-telemetry-preprocessed -
Downstream processors (EventDrivenTelemetryProcessor, TripProcessor, SafetyProcessor, MaintenanceProcessor) consume from
cms-telemetry-preprocessed
This design means both MQTT Direct and FleetWise Edge telemetry converge on the same cms-telemetry-preprocessed topic, enabling a single set of downstream processors for both modes.
Geofence processor
The GeofenceProcessor evaluates vehicle positions against active geofences stored in DynamoDB. It reads telemetry from the cms-telemetry-preprocessed Kafka topic, extracts latitude and longitude from each message, and checks whether the vehicle is inside or outside each active geofence.
How it works:
-
Reads telemetry messages from
cms-telemetry-preprocessed -
Extracts vehicle ID, latitude, and longitude
-
Queries the
cms-dev-storage-geofencesDynamoDB table for active geofences (vehicle-specific and global) -
Calculates distance from vehicle position to geofence center using the Haversine formula
-
Compares distance against the geofence radius
-
On boundary crossing (enter or exit), generates a safety event and writes to the
cms-dev-storage-safety-eventsDynamoDB table -
Deduplicates events: only fires once per boundary crossing direction (enter or exit), preventing repeated alerts while a vehicle remains inside or outside a geofence
Geofence types:
-
CIRCLE — Defined by center coordinates (latitude, longitude) and radius in kilometers
-
Vehicle-specific — Geofences assigned to a specific vehicle ID
-
Global — Geofences with
vehicleId=ALLthat apply to every vehicle
OEM telemetry processor
The OEMTelemetryProcessor transforms telemetry from third-party OEM APIs into the standard signal catalog format. This enables the guidance to ingest data from any OEM without modifying the core processing pipeline.
How it works:
-
Reads raw OEM telemetry from the
cms-telemetry-oemKafka topic -
Extracts the
oem_sourcefield to identify which OEM sent the data -
Loads the corresponding transform manifest from S3 (cached in memory per OEM)
-
Applies field mappings, unit conversions, and data type transformations defined in the manifest
-
Outputs standard CMS-format JSON to the
cms-telemetry-rawtopic
Transform manifest capabilities:
-
Field mapping — Maps OEM field paths to signal catalog names (for example,
vehicle_data.speed_kmh→speed) -
Unit conversion — Multiplier, formula, or lookup table conversions (for example, km/h to mph, Celsius to Fahrenheit)
-
Conditional mapping — Apply mappings only when conditions are met (for example, EV-only signals)
-
Validation — Required signal checks and range validation
Transform manifests are stored in S3 with versioning enabled. Customers create their own manifests by copying the provided template and customizing field mappings for their OEM data format.
Trip detection application
The TripProcessor uses Flink’s KeyedProcessFunction with per-vehicle keyed state to detect trip boundaries through ignition signal transitions. This stateful approach eliminates DynamoDB reads during normal operation.
Transition-based detection:
| Previous Ignition | Current Ignition | Action |
|---|---|---|
|
OFF (or null) |
ON |
Start trip: Generate tripId, PutItem to DynamoDB |
|
ON |
ON |
Update: Accumulate route/metrics in Flink state, flush every 5 messages |
|
ON |
OFF |
End trip: UpdateItem with COMPLETED status |
|
OFF |
OFF |
Ignore |
State per vehicle (Flink ValueState):
-
tripId— generated fromvehicleId + timestampat trip start -
vehicleId,driverId— from telemetry payload -
ignitionOn— previous ignition state for transition detection -
routeBuffer— GPS points accumulated in memory -
maxSpeed,totalDistance,telemetryCount— running metrics
DynamoDB write optimization:
The stateful design reduces DynamoDB operations significantly compared to a stateless approach:
| Operation | Stateless (per message) | Stateful (v2) |
|---|---|---|
|
DynamoDB reads |
2-3 (GetItem + GSI query) |
0 |
|
DynamoDB writes |
1 PutItem (full record) |
1 UpdateItem every 5 messages |
|
20-message trip total |
~60 reads + 20 writes |
0 reads + 5 writes |
Mid-trip updates use UpdateItem (not PutItem) to append route points and update metrics without overwriting the full record. This eliminates race conditions from concurrent writes.
Single trip ownership:
Only the TripProcessor writes to the trips table. The TelemetryProcessor tags telemetry records with tripId for querying but does not write to the trips table. This prevents data clobbering between processors.
No simulator dependencies:
The TripProcessor does not require engineEvent strings or simulator-provided tripId. It generates trip identifiers from vehicleId + timestamp when not provided, making it compatible with both the MQTT simulator and CAN bus data from FleetWise Edge Agent. In FWE mode, the TripProcessor detects trip boundaries from the ignition signal (ignitionOn: true/false) decoded by the FWTelemetryProcessor from CAN signal ID 4.
Safety event application
Identifies unsafe driving behaviors from telemetry signals.
Detection rules:
-
Hard braking: deceleration > 0.4g
-
Rapid acceleration: acceleration > 0.35g
-
Harsh cornering: lateral acceleration > 0.4g
-
Speed violation: speed > threshold
-
AEB/ABS/ESC activation events
-
Seatbelt violations, phone usage detection
Output:
-
Write to DynamoDB safety events table
-
Update driver safety score (integrated with trip metrics)
Maintenance alert application
Generates predictive maintenance alerts from vehicle health signals.
Monitoring:
-
Engine temperature > 240°F (critical)
-
Tire pressure < 28 PSI or > 35 PSI
-
Battery voltage < 12.0V
-
Oil pressure < 20 PSI
-
Brake wear, filter life, oil life thresholds
-
DTC (Diagnostic Trouble Code) detection
Output:
-
Write to DynamoDB maintenance alerts table
-
Deduplicated per trip (one alert per type per trip)
Flink configuration
Runtime:
-
Flink version: 1.18.1
-
Parallelism: 1 (development) or 2-4 (production)
-
Checkpointing: Every 60 seconds, exactly-once semantics
-
State backend: Managed by Amazon Managed Service for Apache Flink
Resources:
-
KPUs: 1 per application (development)
-
Memory: 4 GB per KPU
-
vCPUs: 1 per KPU
Deployment:
All ten applications share a single JAR (cms-telemetry-processor-1.0.0.zip) stored in S3. The UniversalProcessor entry point routes to the correct processor class based on the Flink application’s runtime properties.
Kafka resilience:
All processors use a shared KafkaConfig.withReconnect() utility that applies reconnect backoff, connection keepalive, and session timeout settings to prevent stale SSL connections to MSK. Key properties include reconnect.backoff.ms=1000, reconnect.backoff.max.ms=10000, connections.max.idle.ms=540000, and metadata.max.age.ms=300000.
CloudWatch alarms:
The FlinkStack creates CloudWatch alarms for processor health monitoring:
-
Downtime alarms — Fire when a processor has more than 1 minute of downtime in a 5-minute window. Applied to all critical processors.
-
Idle processing alarms — Fire when the FWTelemetryProcessor or TripProcessor processes 0 records in a 10-minute window, indicating a pipeline stall.