Getting started with Apache Beam
What is Apache Beam?
Apache Beam is an advanced unified programming model that allows you to implement batch and streaming data processing jobs that run on any execution engine. Popular execution engines are for example Apache Spark, Apache Flink or Google Cloud Platform Dataflow.
How is Hop using Beam?
Hop is using the Beam API to create Beam pipelines based off of your visually designed Hop pipelines. The terminology of Hop and Beam are aligned because they mean the same thing. Hop provides 4 standard ways to execute a pipeline that you designed on Spark, Flink, Dataflow or on the Direct runner.
Here is the documentation for the relevant plugins:
What software versions are supported
Hop version | Beam version | Spark version | Flink version |
---|---|---|---|
2.9.0 | 2.56.0 | 3.4.X (scala 2.12) | 1.17.x |
2.8.0 | 2.50.0 | 3.4.X (scala 2.12) | 1.16.x |
2.7.0 | 2.50.0 | 3.4.X (scala 2.12) | 1.16.x |
2.6.0 | 2.50.0 | 3.4.X (scala 2.12) | 1.16.x |
2.5.0 | 2.48.0 | 3.4.X (scala 2.12) | 1.16.x |
2.4.0 | 2.46.0 | 3.3.X (scala 2.12) | 1.15.x |
2.3.0 | 2.43.0 | 3.3.X (scala 2.12) | 1.15.x |
2.2.0 | 2.43.0 | 3.3.X (scala 2.12) | 1.15.x |
2.1.0 | 2.41.0 | 3.3.0 (scala 2.12) | 1.15.2 |
2.0.0 | 2.38.0 | 3.1.3 (scala 2.12) | 1.14.4 (scala 2.11) |
1.2.0 | 2.35.0 | 3.1.2 (scala 2.12) | 1.13 (scala 2.11) |
1.1.0 | 2.35.0 | 3.1.2 | 1.13 |
1.0.0 | 2.32.0 | 2.4.8 | 1.11 |
How are my pipelines executed?
An Apache Hop pipeline is just metadata. The various beam pipeline engine plugins look at this metadata one transform at a time. It decides what to do with it based on a Hop transform handler which is provided. The transforms are in general split into a different types described below.
Important to remember is that Beam pipelines try to solve every action in an 'embarrassingly parallel' way. This means that every transform can and usually will run in more than 1 copy. On large clusters you should expect a lot of copies of the same code to run at any given time.
Beam specific transforms
There are a number of Beam specific transforms available which only work on the provided Beam pipeline execution engines. For example: Beam Input which reads text file data from one or more files or Beam BigQuery Output which writes data to BigQuery.
You can find these transforms in the Big Data
category and their names all start with Beam
to make is easy to recognize them.
Here is an example of a simple pipeline which read files in a folder (on gs://
), filters out data from California, removes and renames a few fields and writes the data back to another set of files:
Universal transforms
There are a few transforms which are translated into Beam variations:
-
Memory Group By: This transform allows you to aggregate data across large data volumes. When using the Beam engines it uses
org.apache.beam.sdk.transforms.GroupByKey
. -
Merge Join: You can join 2 data sources with this transform. The main difference is that in the Beam engines the input data doesn’t need to be sorted. The Beam class used to perform this is:
org.apache.beam.sdk.extensions.joinlibrary.Join
. -
Generate Rows: This transform is used to generate (empty/static) rows of data. It can be either a fixed number, or it can generate rows indefinitely. When using the Beam engines it uses
org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource
ororg.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource
.
Unsupported transforms
A few transforms are simply not supported because we haven’t found a good way to do this on Beam yet:
-
Group By : Use the
Memory Group By
instead
The Denormaliser transform works technically correct on Apache Beam in release 1.1.0 and later. Even so you need to consider that the aggregation of the key-value pairs in that transform (in the general case) only happens on a sub-set of the rows. That is because in a Beam pipeline the order in which rows arrive is lost because they are continuously re-shuffled to maximize parallelism. This is different from the behavior of the "Local" Hop pipeline engine.
To get around this issue you can apply a Memory Group By transform across the whole dataset to grab the first non-null value of every field you de-normalised. This will produce the correct result.
All others
All other transforms are simply supported. They are wrapped in a bit of code to make the exact same code that runs on the Hop local pipeline engine work in a Beam pipeline. There are a few things to mention though.
Special case | Solution |
---|---|
Info transforms | Some transforms like |
Target transforms | Sometimes you want to target specific transforms like in |
Non-Beam input transforms | When you’re reading data using a non-beam transform (see |
Non-Beam Output transforms | The insistence of a Beam pipeline to run work in an embarrassingly parallel way can trip you up on the output side. In general, it is NOT possible to limit the number of copies of a particular transform. What Hop tries to do is perform a series of operations to try and force a single thread. However, this does not work on all runners. Flink for example insists on doing even this in parallel: You can set the number of copies string of a transform to include A better way to deal with this problem is NOT to insist on creating a single file. You can include the unique ID of a transform in the filename with variable |
Row batching with non-Beam transforms | A lot of target databases like to receive rows in batches of records. So if you have a transform like for example Streaming Hop transform flush interval: how long in time are rows kept and batched up? If you care about latency make this lower (500 or lower). If you have a long-running batching pipeline, make it higher (10000 or higher perhaps). Hop streaming transforms buffer size: how many rows are being batched? Consider making it the same as the batching size you use in your transform metadata (e.g. Please note that these are maximum values. If the end of a bundle is reached in a pipeline rows are always forced to the transform code and as such pushed to the target system. To get an idea of how many times a batching buffer is flushed to the underlying transform code (and as such to for example a remote database) we added a |
Fat jars?
A fat jar is often used to package up all the code you need for a particular project. The Spark, Flink and Dataflow execution engines like it since it massively simplifies the Java classpath when executing pipelines. Apache Hop allows you to create a fat jar in the Hop GUI with the Tools/Generate a Hop fat jar…
menu or using the following command:
sh hop-config.sh -fj /path/to/fat.jar
The path to this fat jar can then be referenced in the various Beam runtime configurations. Note that the current version of Hop and all its plugins are used to build the fat jar. If you install or remove plugins or update Hop itself make sure to remember to generate a new fat jar or to update it.
Beam File definitions
The Beam Input and Beam Output transforms expect you to define the layout of the file(s) being read or written.
Current limitations
There are some specific advantages to using engines like Spark, Flink and Dataflow. However, with it come some limitations as well…
-
Previewing data is not available (yet). Because of the distributed nature of execution we don’t have a great way to acquire preview data.
-
Unit testing: not available for similar reasons compared to previewing or debugging. To test your Beam pipelines pick up data after a pipeline is done and compare that to a golden data set in another pipeline running with a "Local Hop" pipeline engine.
-
Debugging or pausing a pipeline is not supported