Azure Event Hubs Listener Icon Azure Event Hubs Listener

Description

The Azure Event Hubs Listener transform listens indefinitely to an Event Hub on the Microsoft Azure cloud platform.

Supported Engines

Hop Engine

Supported

Spark

Maybe Supported

Flink

Maybe Supported

Dataflow

Maybe Supported

Connection options

  • Event Hubs namespace: the name of your Event Hubs namespace

  • Event Hubs instance name: the name of the Event Hub, the instance itself.

  • Event Hub connection string: Typically the same as the SAS Key Connection String

  • SAS Policy key name: the name of the policy in the "Shared Access Policies" section of the Event Hubs Namespace in the Azure dashboard. This needs to be a policy with the "Send" claim enabled.

  • SAS Key connection string: You can use the value in the policy labeled "Connection string–primary key"

  • Batch size: the number of messages (events) to send in one batch with each call to Azure.

  • Message field: the field containing the message to use as the events. Please note you can use a transform like JSON Output or "Java Script" to assemble the message.

  • Consumer Group name: If you didn’t create a specific group, just use $Default

  • Storage Container name: the name of your storage container

  • Storage Connection String: the key1 or key2 Connection string from the Access keys section of your storage account.

Tuning options

  • Batch size: The number of events to get at once from the event hub and to (optionally) pass to a batching pipeline (see below)

  • Prefetch size: (optional) the number of events to pre-fetch

Output fields

  • Message output field : the name of the output field containing the message (event)

  • Partition ID field : the name of the output field for the partition ID

  • Offset field name : the name of the output field for the event offset

  • Sequence number field name : the name of the output field for the event sequence number

  • Host (owner) field name : the name of the output field for the event host

  • Enqueued time field name : the name of the output field for the time the event was queued.

Batch processing options

In order to make sure that all records are processed safely before we checkpoint the events stream, we need to run all logic we need to run, update everything we need to update, before we do so. To do this we pass all the rows we read in one batch to the specified batch pipeline (which will run single threaded)

  • Batch pipeline: the batch pipeline file name

  • pipeline input transform: the name of the "Injector" or "Get records from Stream" transform to send the events to in the batch pipeline.

  • pipeline output transform: (optional) the name of the transform to read from in the batch pipeline and to use as output of this Listener transform.

  • Maximum wait time (ms): (optional but highly recommended!) The maximum time to wait before the batch pipeline is executed despite the fact that you might not have the specified number of records in the batch. This is useful to prevent stale records from getting stuck in the batch pipeline input for longer periods of time when you might not have new events arriving from a hub.

Important notes

The batch pipeline starts with an Injector transform. It receives the following fields:

  • message (String)

  • partitionId (String)

  • offset (String)

  • sequenceNumber (Integer)

  • host (String)

  • enqueuedTime (Timestamp)