Running a Hop pipeline using the Flink Kubernetes Operator
Prerequisites
Please have a look at the Getting started with Beam guide to get a basic understanding of the Beam integration in Hop. These are the files we need to make use of the Flink Kubernetes Operator in the example below:
-
A Hop fat jar : generate once for the version of Hop you want to use (use the Hop GUI
Tools
menu or usehop-conf.sh -fj
. -
Hop metadata export: a JSON file containing the metadata of your project (use the Hop GUI
Tools
menu or usehop-conf.sh -xm
-
A Hop pipeline: A .hpl file (XML) which contains the pipeline metadata
Flink Kubernetes Operator
The Flink operator wants to make it easy for you to start a new deployment of a Flink cluster on Kubernetes. To install it on your own K8s setup you can follow the Quick Start guide.
Example setup
We’ll be running our setup on Amazon Web Services' Elastic Kubernetes Service (EKS). Our Apache Hop fat jar as well as our pipeline and metadata JSON file are all stored in a folder on S3.
The goal in our setup is to generate sample data with an unbound (never ending) pipeline and then to send that data to a Kinesis stream:
The Flink Deployment
In the Flink deployment file below we’ll do the following things different from a standard setup:
-
Run with Java 11 since we’re using Apache Hop >= 2.0.0
-
Run with Java 17 since we’re using Apache Hop >= 2.10.0
-
Create an ephemeral volume called
hop-resources
which is mounted by all images in folder/hop
-
Before any Flink container starts we run an initContainer to copy our Hop files (fat jar, pipeline, metadata) to the
hop-resources
volume. Container agiledigital/s3-artifact-fetcher is used to synchronize an S3 folder with our ephemeral volume shared by the nodes/containers. -
We specify the Hop main Beam class and pass in the pipeline and metadata JSON filenames (now found locally on the container) along with the name of the Flink Hop pipeline run configuration as arguments.
Consider the following file called flink-deployment.yml
:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
namespace: default
name: hop-flink
spec:
image: flink:1.15-java11
flinkVersion: v1_15
flinkConfiguration:
taskmanager.numberOfTaskSlots: "8"
taskmanager.memory.jvm-metaspace.size: "512m"
serviceAccount: flink
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: flink-pod-template
spec:
serviceAccount: flink
containers:
#
# Make an ephemeral volume available to the main flink container
#
- name: flink-main-container
volumeMounts:
- mountPath: /hop
name: hop-resources
initContainers:
#
# Copy a folder from s3:// to an ephemeral volume
# Put a Hop fat jar in it as well as Hop metadata (JSON)
# and the pipeline to run.
#
- name: fetcher
image: agiledigital/s3-artifact-fetcher
env:
- name: SOURCE_URL
value: "s3://<source-folder-of-hop-fat-jar-and-metadata>"
- name: ARTIFACT_DIR
value: "/hop/"
- name: RUNNER_USER
value: root
- name: AWS_ACCESS_KEY_ID
value: <your-aws-access-key>
- name: AWS_SECRET_ACCESS_KEY
value: "<your-aws-secret-key>"
- name: AWS_DEFAULT_REGION
value: <your-aws-region>
volumeMounts:
- mountPath: /hop
name: hop-resources
volumes:
- name: hop-resources
emptyDir: {}
jobManager:
replicas: 1
resource:
memory: "2g"
cpu: 1
taskManager:
resource:
memory: "2g"
cpu: 2
job:
jarURI: local:///hop/<your-hop-fat-jar>
parallelism: 4
upgradeMode: stateless
entryClass: org.apache.hop.beam.run.MainBeam
args:
- /hop/<your-hop-pipeline-hpl>
- /hop/hop-metadata.json
- Flink
Tip: you can provide a 4th argument (in the args list) after the run configuration name: the name of the environment configuration file to use.
Please note that the parallelism
argument is mandatory in the spec.job
section. This parameter is ignored however as our pipeline is executed using Apache Beam. The Flink runner has its own parallelism setting. You can configure this in the pipeline run configuration (called Flink
above). In this run configuration you can specify Flink master [auto]
.
Execution
Just as is shown in the Flink operator documentation we can now create this Flink deployment by running the following command.
kubectl create -f flink-deployment.yml
You can then see the various pods move through their stages of initialisation:
kubectl get pods
Obviously the first thing that will happen is that we want a Flink job manager. Before starting we’re initializing the hop-resources
volume by copying data from S3 with the s3-artifact-fetcher
. This leads to an init
state:
hop-flink-85b8fc7d5f-fkxvq 0/1 Init:0/1 0 2s
After a while we’ll start the job manager itself and we’ll end up with this pod status:
hop-flink-85b8fc7d5f-fkxvq 1/1 Running 0 45s
Once the job manager is ready the job itself will start and (in our case) a task manager will get started as well:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-kubernetes-operator-6976569cb-68c6g 1/1 Running 0 2d16h
hop-flink-85b8fc7d5f-fkxvq 1/1 Running 0 95s
hop-flink-taskmanager-1-1 1/1 Running 0 21s
Monitoring and logging
To see what’s going on we can do a port forward from the job manager:
kubectl port-forward pod/hop-flink-85b8fc7d5f-fkxvq 8081
Forwarding from 127.0.0.1:8081 -> 8081
Forwarding from [::1]:8081 -> 8081
Handling connection for 8081
...
You can now browse to localhost:8081 to have a look at the Flink job.
To see the detailed logging on the job manager you can run kubectl logs
. The logging from the Hop pipeline execution can be found in between the Flink job manager logs:
kubectl logs -f pod/hop-flink-85b8fc7d5f-fkxvq
...
Argument 1 : Pipeline filename (.hpl) : /hop/<your-hop-pipeline>.hpl
Argument 2 : Metadata filename (.json) : /hop/hop-metadata.json
Argument 3 : Pipeline run configuration : Flink
>>>>>> Initializing Hop...
...
>>>>>> Loading pipeline metadata
>>>>>> Building Apache Beam Pipeline...
>>>>>> Found Beam Input transform plugin class loader
>>>>>> Pipeline executing starting...
2022/06/27 13:37:27 - General - Created Apache Beam pipeline with name 'synthetic-data-to-kinesis'
2022/06/27 13:37:28 - General - Handled transform (ROW GENERATOR) : oo rows
2022/06/27 13:37:28 - General - Handled generic transform (TRANSFORM) : sysdate, gets data from 1 previous transform(s), targets=0, infos=0
2022/06/27 13:37:28 - General - Handled generic transform (TRANSFORM) : Random values, gets data from 1 previous transform(s), targets=0, infos=0
2022/06/27 13:37:28 - General - Handled generic transform (TRANSFORM) : JSON output, gets data from 1 previous transform(s), targets=0, infos=0
2022/06/27 13:37:28 - General - Handled transform (KINESIS OUTPUT) : Beam Kinesis Produce, gets data from JSON output
2022/06/27 13:37:28 - General - Executing this pipeline using the Beam Pipeline Engine with run configuration 'Flink'
...