Running a Hop pipeline using the Flink Kubernetes Operator

Prerequisites

Please have a look at the Getting started with Beam guide to get a basic understanding of the Beam integration in Hop. These are the files we need to make use of the Flink Kubernetes Operator in the example below:

  • A Hop fat jar : generate once for the version of Hop you want to use (use the Hop GUI Tools menu or use hop-conf.sh -fj.

  • Hop metadata export: a JSON file containing the metadata of your project (use the Hop GUI Tools menu or use hop-conf.sh -xm

  • A Hop pipeline: A .hpl file (XML) which contains the pipeline metadata

Note: Hop is capable of reading/writing files directly to s3:// buckets (or gs:// or azure://).

The Flink operator wants to make it easy for you to start a new deployment of a Flink cluster on Kubernetes. To install it on your own K8s setup you can follow the Quick Start guide.

Example setup

We’ll be running our setup on Amazon Web Services' Elastic Kubernetes Service (EKS). Our Apache Hop fat jar as well as our pipeline and metadata JSON file are all stored in a folder on S3.

The goal in our setup is to generate sample data with an unbound (never ending) pipeline and then to send that data to a Kinesis stream:

Sending test data to a Kinesis stream

In the Flink deployment file below we’ll do the following things different from a standard setup:

  • Run with Java 11 since we’re using Apache Hop >= 2.0.0

  • Create an ephemeral volume called hop-resources which is mounted by all images in folder /hop

  • Before any Flink container starts we run an initContainer to copy our Hop files (fat jar, pipeline, metadata) to the hop-resources volume. Container agiledigital/s3-artifact-fetcher is used to synchronize an S3 folder with our ephemeral volume shared by the nodes/containers.

  • We specify the Hop main Beam class and pass in the pipeline and metadata JSON filenames (now found locally on the container) along with the name of the Flink Hop pipeline run configuration as arguments.

Consider the following file called flink-deployment.yml:

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  namespace: default
  name: hop-flink
spec:
  image: flink:1.15-java11
  flinkVersion: v1_15
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "8"
    taskmanager.memory.jvm-metaspace.size: "512m"
  serviceAccount: flink
  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      name: flink-pod-template
    spec:
      serviceAccount: flink
      containers:
        #
        # Make an ephemeral volume available to the main flink container
        #
        - name: flink-main-container
          volumeMounts:
            - mountPath: /hop
              name: hop-resources
      initContainers:
        #
        # Copy a folder from s3:// to an ephemeral volume
        # Put a Hop fat jar in it as well as Hop metadata (JSON)
        # and the pipeline to run.
        #
        - name: fetcher
          image: agiledigital/s3-artifact-fetcher
          env:
            - name: SOURCE_URL
              value: "s3://<source-folder-of-hop-fat-jar-and-metadata>"
            - name: ARTIFACT_DIR
              value: "/hop/"
            - name: RUNNER_USER
              value: root
            - name: AWS_ACCESS_KEY_ID
              value: <your-aws-access-key>
            - name: AWS_SECRET_ACCESS_KEY
              value: "<your-aws-secret-key>"
            - name: AWS_DEFAULT_REGION
              value: <your-aws-region>
          volumeMounts:
            - mountPath: /hop
              name: hop-resources
      volumes:
        - name: hop-resources
          emptyDir: {}
  jobManager:
    replicas: 1
    resource:
      memory: "2g"
      cpu: 1
  taskManager:
    resource:
      memory: "2g"
      cpu: 2
  job:
    jarURI: local:///hop/<your-hop-fat-jar>
    parallelism: 4
    upgradeMode: stateless
    entryClass: org.apache.hop.beam.run.MainBeam
    args:
      - /hop/<your-hop-pipeline-hpl>
      - /hop/hop-metadata.json
      - Flink

Tip: you can provide a 4th argument (in the args list) after the run configuration name: the name of the environment configuration file to use.

Please note that the parallelism argument is mandatory in the spec.job section. This parameter is ignored however as our pipeline is executed using Apache Beam. The Flink runner has its own parallelism setting. You can configure this in the pipeline run configuration (called Flink above). In this run configuration you can specify Flink master [auto].

Execution

Just as is shown in the Flink operator documentation we can now create this Flink deployment by running the following command.

kubectl create -f flink-deployment.yml

You can then see the various pods move through their stages of initialisation:

kubectl get pods

Obviously the first thing that will happen is that we want a Flink job manager. Before starting we’re initializing the hop-resources volume by copying data from S3 with the s3-artifact-fetcher. This leads to an init state:

hop-flink-85b8fc7d5f-fkxvq                  0/1     Init:0/1   0          2s

After a while we’ll start the job manager itself and we’ll end up with this pod status:

hop-flink-85b8fc7d5f-fkxvq                  1/1     Running   0          45s

Once the job manager is ready the job itself will start and (in our case) a task manager will get started as well:

$ kubectl get pods
NAME                                        READY   STATUS    RESTARTS   AGE
flink-kubernetes-operator-6976569cb-68c6g   1/1     Running   0          2d16h
hop-flink-85b8fc7d5f-fkxvq                  1/1     Running   0          95s
hop-flink-taskmanager-1-1                   1/1     Running   0          21s

Monitoring and logging

To see what’s going on we can do a port forward from the job manager:

kubectl port-forward pod/hop-flink-85b8fc7d5f-fkxvq 8081
Forwarding from 127.0.0.1:8081 -> 8081
Forwarding from [::1]:8081 -> 8081
Handling connection for 8081
...

You can now browse to localhost:8081 to have a look at the Flink job.

To see the detailed logging on the job manager you can run kubectl logs. The logging from the Hop pipeline execution can be found in between the Flink job manager logs:

kubectl logs -f pod/hop-flink-85b8fc7d5f-fkxvq
...
Argument 1 : Pipeline filename (.hpl)   : /hop/<your-hop-pipeline>.hpl
Argument 2 : Metadata filename (.json)  : /hop/hop-metadata.json
Argument 3 : Pipeline run configuration : Flink
>>>>>> Initializing Hop...
...
>>>>>> Loading pipeline metadata
>>>>>> Building Apache Beam Pipeline...
>>>>>> Found Beam Input transform plugin class loader
>>>>>> Pipeline executing starting...
2022/06/27 13:37:27 - General - Created Apache Beam pipeline with name 'synthetic-data-to-kinesis'
2022/06/27 13:37:28 - General - Handled transform (ROW GENERATOR) : oo rows
2022/06/27 13:37:28 - General - Handled generic transform (TRANSFORM) : sysdate, gets data from 1 previous transform(s), targets=0, infos=0
2022/06/27 13:37:28 - General - Handled generic transform (TRANSFORM) : Random values, gets data from 1 previous transform(s), targets=0, infos=0
2022/06/27 13:37:28 - General - Handled generic transform (TRANSFORM) : JSON output, gets data from 1 previous transform(s), targets=0, infos=0
2022/06/27 13:37:28 - General - Handled transform (KINESIS OUTPUT) : Beam Kinesis Produce, gets data from JSON output
2022/06/27 13:37:28 - General - Executing this pipeline using the Beam Pipeline Engine with run configuration 'Flink'
...

Termination

To shut down the job along with the cluster we can simply delete the Flink deployment:

kubectl delete -f flink-deployment.yml