Getting started with Apache Beam

What is Apache Beam?

Apache Beam is an advanced unified programming model that allows you to implement batch and streaming data processing jobs that run on any execution engine. Popular execution engines are for example Apache Spark, Apache Flink or Google Cloud Platform Dataflow.

How does it work?

Apache Beam allows you to create programs in a variety of programming languages like Java, Python and Go using a standard Beam API. These programs build data pipelines which can then be executed using Beam runners on the various execution engines.

How is Hop using Beam?

Hop is using the Beam API to create Beam pipelines based off of your visually designed Hop pipelines. The terminology of Hop and Beam are aligned because they mean the same thing. Hop provides 4 standard ways to execute a pipeline that you designed on Spark, Flink, Dataflow or on the Direct runner.

Here is the documentation for the relevant plugins:

What software versions are supported

Hop version Beam version Spark version Flink version

2.9.0

2.56.0

3.4.X (scala 2.12)

1.17.x

2.8.0

2.50.0

3.4.X (scala 2.12)

1.16.x

2.7.0

2.50.0

3.4.X (scala 2.12)

1.16.x

2.6.0

2.50.0

3.4.X (scala 2.12)

1.16.x

2.5.0

2.48.0

3.4.X (scala 2.12)

1.16.x

2.4.0

2.46.0

3.3.X (scala 2.12)

1.15.x

2.3.0

2.43.0

3.3.X (scala 2.12)

1.15.x

2.2.0

2.43.0

3.3.X (scala 2.12)

1.15.x

2.1.0

2.41.0

3.3.0 (scala 2.12)

1.15.2

2.0.0

2.38.0

3.1.3 (scala 2.12)

1.14.4 (scala 2.11)

1.2.0

2.35.0

3.1.2 (scala 2.12)

1.13 (scala 2.11)

1.1.0

2.35.0

3.1.2

1.13

1.0.0

2.32.0

2.4.8

1.11

How are my pipelines executed?

An Apache Hop pipeline is just metadata. The various beam pipeline engine plugins look at this metadata one transform at a time. It decides what to do with it based on a Hop transform handler which is provided. The transforms are in general split into a different types described below.

Important to remember is that Beam pipelines try to solve every action in an 'embarrassingly parallel' way. This means that every transform can and usually will run in more than 1 copy. On large clusters you should expect a lot of copies of the same code to run at any given time.

Beam specific transforms

There are a number of Beam specific transforms available which only work on the provided Beam pipeline execution engines. For example: Beam Input which reads text file data from one or more files or Beam BigQuery Output which writes data to BigQuery.

You can find these transforms in the Big Data category and their names all start with Beam to make is easy to recognize them.

Here is an example of a simple pipeline which read files in a folder (on gs://), filters out data from California, removes and renames a few fields and writes the data back to another set of files:

Beam input-process-output-sample

Universal transforms

There are a few transforms which are translated into Beam variations:

  • Memory Group By: This transform allows you to aggregate data across large data volumes. When using the Beam engines it uses org.apache.beam.sdk.transforms.GroupByKey.

  • Merge Join: You can join 2 data sources with this transform. The main difference is that in the Beam engines the input data doesn’t need to be sorted. The Beam class used to perform this is: org.apache.beam.sdk.extensions.joinlibrary.Join.

  • Generate Rows: This transform is used to generate (empty/static) rows of data. It can be either a fixed number, or it can generate rows indefinitely. When using the Beam engines it uses org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource or org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource.

Unsupported transforms

A few transforms are simply not supported because we haven’t found a good way to do this on Beam yet:

The Denormaliser transform works technically correct on Apache Beam in release 1.1.0 and later. Even so you need to consider that the aggregation of the key-value pairs in that transform (in the general case) only happens on a sub-set of the rows. That is because in a Beam pipeline the order in which rows arrive is lost because they are continuously re-shuffled to maximize parallelism. This is different from the behavior of the "Local" Hop pipeline engine.

To get around this issue you can apply a Memory Group By transform across the whole dataset to grab the first non-null value of every field you de-normalised. This will produce the correct result.

All others

All other transforms are simply supported. They are wrapped in a bit of code to make the exact same code that runs on the Hop local pipeline engine work in a Beam pipeline. There are a few things to mention though.

Special case Solution

Info transforms

Some transforms like Stream Lookup read data from other transforms. This is handled by side-inputs for the data in the Beam API and is as such fully supported.

Target transforms

Sometimes you want to target specific transforms like in Switch Case or Filter Rows. This is fully supported as well and handled by the Beam API which handles additional outputs.

Non-Beam input transforms

When you’re reading data using a non-beam transform (see Beam specific transforms above) we need to make sure that this transform is executed in exactly one thread. Otherwise, you might read your XML or JSON document many times by the inherently parallel intentions of the various engines. This is handled by doing a Group By over a single value. You’ll see the following in for example your Dataflow pipeline: Create.ValuesWithKeysGroupByKeyValuesFlatten.IterablesParDo …​ and all this is just done to make sure we only ever execute our transform once.

Non-Beam input transforms on Dataflow

Non-Beam Output transforms

The insistence of a Beam pipeline to run work in an embarrassingly parallel way can trip you up on the output side. In general, it is NOT possible to limit the number of copies of a particular transform. What Hop tries to do is perform a series of operations to try and force a single thread. However, this does not work on all runners. Flink for example insists on doing even this in parallel: GroupByKey(Void)Values()Flatten().

You can set the number of copies string of a transform to include SINGLE_BEAM (click on the transform and select Number of copies in the Hop GUI).

A better way to deal with this problem is NOT to insist on creating a single file. You can include the unique ID of a transform in the filename with variable ${Internal.Transform.ID}. This will allow the various transform copies to write unique files to the same folder.

Row batching with non-Beam transforms

A lot of target databases like to receive rows in batches of records. So if you have a transform like for example Table Output or Neo4j Output you might see that performance is not that great. This is because by default the Beam programming model is designed to stream rows of data through a pipeline in bundles and the Hop API only knows about a single record at once. For these transforms you can include BATCH in the number of copies string of a transform click on the transform and select Number of copies in the Hop GUI). For these flagged transforms you can then specify 2 parameters in your Beam pipeline run configurations. When you set these you can determine how long rows are kept behind before being forced to the transforms in question

Streaming Hop transform flush interval: how long in time are rows kept and batched up? If you care about latency make this lower (500 or lower). If you have a long-running batching pipeline, make it higher (10000 or higher perhaps).

Hop streaming transforms buffer size: how many rows are being batched? Consider making it the same as the batching size you use in your transform metadata (e.g. Table Output, Neo4j Cypher, …​)

Please note that these are maximum values. If the end of a bundle is reached in a pipeline rows are always forced to the transform code and as such pushed to the target system. To get an idea of how many times a batching buffer is flushed to the underlying transform code (and as such to for example a remote database) we added a Flushes metric. You will notice this in your metrics view in the Hop GUI when executing.

Beam Flushes Metrics

Fat jars?

A fat jar is often used to package up all the code you need for a particular project. The Spark, Flink and Dataflow execution engines like it since it massively simplifies the Java classpath when executing pipelines. Apache Hop allows you to create a fat jar in the Hop GUI with the Tools/Generate a Hop fat jar…​ menu or using the following command:

sh hop-config.sh -fj /path/to/fat.jar

The path to this fat jar can then be referenced in the various Beam runtime configurations. Note that the current version of Hop and all its plugins are used to build the fat jar. If you install or remove plugins or update Hop itself make sure to remember to generate a new fat jar or to update it.

Beam File definitions

The Beam Input and Beam Output transforms expect you to define the layout of the file(s) being read or written.

Beam File Definition example

Current limitations

There are some specific advantages to using engines like Spark, Flink and Dataflow. However, with it come some limitations as well…​

  • Previewing data is not available (yet). Because of the distributed nature of execution we don’t have a great way to acquire preview data.

  • Unit testing: not available for similar reasons compared to previewing or debugging. To test your Beam pipelines pick up data after a pipeline is done and compare that to a golden data set in another pipeline running with a "Local Hop" pipeline engine.

  • Debugging or pausing a pipeline is not supported