Beam Kafka Consume
DescriptionThe Beam Kafka Consume transform consumes records from a Kafka cluster using the Beam execution engine. |
Limitations
The main limitation of the Kafka Consumer is that it currently only supports reading Strings as keys and String and Avro Record as values.
Options
Option | Description |
---|---|
Transform name | Name of the transform, this name has to be unique in a single pipeline. |
Bootstrap servers | A comma separated list of hosts which are Kafka brokers in a "bootstrap" Kafka cluster. |
Topics to consume | A comma separated list of topics to consume. |
Group ID | Specifies the ID of a consumer group a Kafka consumer belongs to. |
The name of the key output field | The result key field. |
The name of the message output field | The result message field. |
The message type (default is Sting) | The type of message to get from Kafka. |
Schema registry URL (Avro) | The schema registry URL in case you’re consuming Avro Record messages |
Schema registry subject (Avro) | The name of the subject (schema name) in the schema registry. This will allow us to know what kind of Avro values are being consumed. |
Use processing time | The time when the record is processed by Beam. |
Use log append time | The time when the record is appended by the broker. |
Use create time | The time when the producer record is created. |
Restrict read to committed messages | Restricts reading to committed records only. |
Allow offsets to be committed back | Allows committing offsets as to mark an offset as consumed. |
Configuration options | A list of configuration parameters. |
Parameter | A configuration parameter. |
Value | The parameter value. |
Type | The value data type. |
Avro and Schema registry
Here are some options you need to consume Avro Record values from a Kafka server. The schema of Avro values are not sent to Kafka but to a schema registry. As such you need to have one available. Here are some options you need to set to make this work on a Confluent Cloud Kafka instance. There are various parts of the software stack that need authentication, hence the bit of redundancy. We recommend that you put these options in variables in your environment configuration file.
Option | Example |
---|---|
auto.offset.reset | earliest (or latest) |
sasl.jaas.config | org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET"; |
security.protocol | SASL_SSL |
basic.auth.credentials.source | USER_INFO |
basic.auth.user.info | CLUSTER_API_KEY:CLUSTER_API_SECRET |
schema.registry.basic.auth.user.info | SCHEMA_REGISTRY_API_KEY:SCHEMA_REGISTRY_API_SECRET |
sasl.mechanism | PLAIN |
client.dns.lookup | use_all_dns_ips |
acks | ALL |