Beam Kafka Consume

Description

The Beam Kafka Consume transform consumes records from a Kafka cluster using the Beam execution engine.

Limitations

The main limitation of the Kafka Consumer is that it currently only supports reading Strings as keys and String and Avro Record as values.

Options

Option Description

Transform name

Name of the transform, this name has to be unique in a single pipeline.

Bootstrap servers

A comma separated list of hosts which are Kafka brokers in a "bootstrap" Kafka cluster.

Topics to consume

A comma separated list of topics to consume.

Group ID

Specifies the ID of a consumer group a Kafka consumer belongs to.

The name of the key output field

The result key field.

The name of the message output field

The result message field.

The message type (default is Sting)

The type of message to get from Kafka.

Schema registry URL (Avro)

The schema registry URL in case you’re consuming Avro Record messages

Schema registry subject (Avro)

The name of the subject (schema name) in the schema registry. This will allow us to know what kind of Avro values are being consumed.

Use processing time

The time when the record is processed by Beam.

Use log append time

The time when the record is appended by the broker.

Use create time

The time when the producer record is created.

Restrict read to committed messages

Restricts reading to committed records only.

Allow offsets to be committed back

Allows committing offsets as to mark an offset as consumed.

Configuration options

A list of configuration parameters.

Parameter

A configuration parameter.

Value

The parameter value.

Type

The value data type.

Avro and Schema registry

Here are some options you need to consume Avro Record values from a Kafka server. The schema of Avro values are not sent to Kafka but to a schema registry. As such you need to have one available. Here are some options you need to set to make this work on a Confluent Cloud Kafka instance. There are various parts of the software stack that need authentication, hence the bit of redundancy. We recommend that you put these options in variables in your environment configuration file.

Option Example

auto.offset.reset

earliest (or latest)

sasl.jaas.config

org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET";

security.protocol

SASL_SSL

basic.auth.credentials.source

USER_INFO

basic.auth.user.info

CLUSTER_API_KEY:CLUSTER_API_SECRET

schema.registry.basic.auth.user.info

SCHEMA_REGISTRY_API_KEY:SCHEMA_REGISTRY_API_SECRET

sasl.mechanism

PLAIN

client.dns.lookup

use_all_dns_ips

acks

ALL