Lineage observation hub

Apache Hop provides an in-engine lineage observation hub for recording metadata-level observations (pipeline context, file I/O, HTTP traffic, structural hints) and delivering them asynchronously to pluggable lineage sinks. The design keeps third-party integrations (OpenLineage, Datadog, custom exporters) out of the engine: sinks are separate plugins that map Hop’s neutral events to their own formats.

The public API lives under the Java package org.apache.hop.lineage in the hop-engine module.

Goals

  • Central entry point — Producers call LineageHub.getInstance().emit(…​) instead of talking to each backend directly.

  • Safe defaults — The hub is off until HOP_LINEAGE_ENABLED is set to Y.

  • Non-blocking pipelines — Events are queued and dispatched on a dedicated worker thread; sinks run in batch with per-sink error isolation.

  • Extensibility — New backends ship as ILineageSink implementations registered via @LineageSinkPlugin.

Package layout

Package Role

org.apache.hop.lineage

LineageVariables — Hop variables (@Variable) that describe hub configuration.

org.apache.hop.lineage.context

LineageContext, LineageSubjectType, LineagePortableFilename — Immutable correlation (subject type, log channel, pipeline/workflow/transform/action names, hopFilename as on the engine/meta, optional hopFilenamePortableKey as ${PROJECT_HOME}/… when derivable, optional attributes). Run identity for a single execution is the log channel id on the context.

org.apache.hop.lineage.model

LineageEvent, LineageEventKind, LineagePayload and concrete payloads (structural, file I/O, HTTP, run lifecycle).

org.apache.hop.lineage.hub

LineageHub, LineageConfiguration — Queue, batching, flush, shutdown.

org.apache.hop.lineage.spi

ILineageSink — Plugin SPI.

org.apache.hop.lineage.plugin

LineageSinkPlugin, LineageSinkPluginType — Plugin type registered in HopEnvironment.

org.apache.hop.lineage.xp

Extension points that call LineageHub.flushQuietly() on pipeline/workflow/server lifecycle (see [lineage-flush]).

Event model

Each observation is a LineageEvent:

  • Event id — Unique id (UUID by default).

  • Timestamp — Milliseconds since epoch.

  • LineageEventKind — One of RUN_LIFECYCLE, TRANSFORM_SCHEMA, FILE_IO, HTTP_IO.

  • LineageContext — Who/what produced the observation (see above).

  • LineagePayload — Optional typed payload; may be null if the kind and context are enough.

Payload types are plain Java types (strings, longs, enums) so sinks can serialize them without dragging engine objects across boundaries:

  • FileIoLineagePayload — Operation (READ, WRITE, COPY, MOVE, DELETE), source/target URIs, byte counts, success, message, and optional FileIoContentSchema: tabular columns actually read/written (including JSONPath / YAML path / XPath locators) plus optional structureRoots (FileIoSchemaNode tree) merged from those paths for nested file layouts. Text File Output and JSON Output attach the file column layout (not the full transform output row when a subset is written).

  • HttpLineagePayloadCLIENT vs SERVER, method, URL, status, request/response bytes, duration, success, message.

  • RunLifecycleLineagePayloadSTARTED / FINISHED plus optional detail text.

  • TransformSchemaLineagePayloadINPUT or OUTPUT plus a list of field names, Hop type names, length, and precision (boundary schema; emitted when a transform finishes). Fields usually come from runtime row metadata on the transform or row sets. If input was never observed (e.g. Abort when getRow() immediately returns null), input is derived via PipelineMeta#getPrevTransformFields. If output was never written (e.g. Detect empty stream with a non-empty stream), output is derived via PipelineMeta#getTransformFields. Graph-derived events set context attribute transformSchemaSource to DESIGN_GRAPH.

Convenience factory: LineageEvent.of(kind, context, payload) assigns id and current time.

Hub behavior

LineageHub is a singleton (getInstance()). When lineage is disabled, emit returns immediately.

When enabled:

  1. The first emit starts a daemon worker thread (Hop-LineageHub-Dispatcher).

  2. Events are placed on a bounded queue. If the queue is full, the event is dropped and an error is logged (drops are counted).

  3. The worker builds batches up to HOP_LINEAGE_BATCH_MAX, waiting up to HOP_LINEAGE_BATCH_LINGER_MS for additional events before sending a partial batch.

  4. Each batch is passed to every loaded sink via ILineageSink.accept(List<LineageEvent>). If one sink throws, others still receive the same batch; failures are logged.

flush() blocks until queued events (and coalesced flush markers) have been processed. flushQuietly() catches and logs errors.

shutdown() stops the worker and calls shutdown() on sinks. It is invoked from HopEnvironment.shutdown() and HopEnvironment.reset().

After PluginRegistry.init(), HopEnvironment calls LineageHub.getInstance().environmentReady() so sink lists can be rebuilt when the environment is re-initialized.

The queue is allocated lazily on the first emit from the resolved configuration, so HOP_LINEAGE_QUEUE_CAPACITY applies to the singleton as well as to isolated test hubs. The capacity is fixed for the lifetime of the queue; to change it, restart the JVM (or call shutdown() and emit again in tests).

Run lifecycle (extension points)

The org.apache.hop.lineage.xp package registers extension points that emit RUN_LIFECYCLE events (via LineageRunLifecycleEmitter) and flush the hub where needed:

  • PipelineStartLineageHubPipelineStartXp — phase STARTED

  • PipelineFinishLineageHubPipelineFinishXp — phase FINISHING (transforms stopped, cleanup)

  • PipelineCompletedLineageHubPipelineCompletedXp — phase FINISHED or FAILED (from pipeline error count), then flushQuietly()

  • WorkflowStartLineageHubWorkflowStartXp — phase STARTED

  • WorkflowFinishLineageHubWorkflowFinishXp — phase FINISHED or FAILED (from workflow/result errors), then flushQuietly()

  • HopServerShutdownLineageHubServerShutdownFlushXp — flush only

  • TransformBeforeStartLineageHubTransformStartXp — transform phase STARTED (local pipeline engine; transform log channel + pipeline correlation attributes)

  • TransformFinishedLineageHubTransformFinishXp — transform FINISHED or FAILED from transform error count, then TRANSFORM_SCHEMA events for observed input/output row shapes when non-empty

  • WorkflowBeforeActionExecutionLineageHubWorkflowActionBeforeXp — action STARTED (workflow log channel + action name; template action log channel id as attribute when present)

  • WorkflowAfterActionExecutionLineageHubWorkflowActionAfterXp — action FINISHED or FAILED from the action Result; uses the executed action’s log channel id when available (WorkflowExecutionExtension.actionLogChannelId / actionExecutionResult)

Remote (RemotePipelineEngine) and Beam (BeamPipelineEngine) engines also call PipelineStart and PipelineFinish so they align with the local pipeline engine.

Beam and other engines that do not use the classic Hop thread-per-transform model typically do not fire TransformBeforeStart / TransformFinished the same way as the local Pipeline engine.

Configuration

Variables are defined on org.apache.hop.lineage.LineageVariables and appear in the variable registry (ENGINE scope) like other Hop settings.

Variable Default Meaning

HOP_LINEAGE_ENABLED

N

Set to Y to enable the hub.

HOP_LINEAGE_QUEUE_CAPACITY

10000

Bounded queue capacity. Read on first emit; the queue retains this size for its lifetime.

HOP_LINEAGE_BATCH_MAX

100

Maximum events per batch per sink.

HOP_LINEAGE_BATCH_LINGER_MS

250

How long the worker waits for more events before sending a smaller batch.

HOP_LINEAGE_SINK_IDS

(empty)

Comma-separated sink plugin ids (case-insensitive). Empty means all discovered sinks are loaded.

LineageConfiguration.resolve() reads the active IVariables (typically after Variables.initializeFrom so system properties and Hop config apply).

LineageSink plugins

Sinks implement org.apache.hop.lineage.spi.ILineageSink:

  • init(IVariables variables, ILogChannel log) — Optional; called once before the first batch.

  • accept(List<LineageEvent> events) — Receives a non-empty batch; should not hold references to the list after returning.

  • shutdown() — Optional cleanup.

The implementation class is annotated with @LineageSinkPlugin (id, optional name, description), mirroring other Hop plugin types (see Plugin Development).

The plugin type LineageSinkPluginType is registered in HopEnvironment.getStandardPluginTypes() alongside transforms, actions, execution info locations, etc. Plugins are discovered from the classpath like other native/engine plugins.

No sink implementations ship with the engine. Integrators implement ILineageSink, annotate the class with @LineageSinkPlugin(id = "…​"), and package it as a Hop plugin. With no sinks loaded, the hub remains a no-op even when HOP_LINEAGE_ENABLED=Y.

Emitting events from code

Producers (pipeline/workflow hooks, VFS usage, HTTP client/server filters, transforms) should build a LineageContext (often via LineageContext.builder()), optionally attach a payload, then emit:

// IVariables vars = pipeline; // or workflow — must expose PROJECT_HOME when normalizing
LineageEvent event =
    LineageEvent.of(
        LineageEventKind.FILE_IO,
        LineageContext.builder()
            .subjectType(LineageSubjectType.ACTION)
            .hopFilename(filename)
            .hopFilenamePortableKey(
                LineagePortableFilename.portableKey(filename, vars))
            .build(),
        new FileIoLineagePayload(
            FileIoOperation.COPY, sourceUri, targetUri, bytes, true, null));
LineageHub.getInstance().emit(event);

Keep call sites thin: one object construction plus emit. Heavy work belongs in sinks or async code inside the sink implementation.