Run workflows and pipelines from Apache Airflow
Introduction
Apache Airflow is an open-source workflow management platform for data engineering pipelines.
Airflow uses directed acyclic graphs (DAGs) to manage workflow orchestration. Tasks and dependencies are defined in Python and then Airflow manages the scheduling and execution. DAGs can be run either on a defined schedule (e.g. hourly or daily) or based on external event triggers (e.g. a file appearing in an AWS S3 bucket). DAGs can often be written in one Python file.
Apache Hop workflows and pipelines can be used in Airflow through the DockerOperator . Alternatively, the BashOperator to call Hop Run could also be used.
Sample Dag
Running a Hop workflow or pipeline through the Airflow DockerOperator uses Docker to run a workflow or pipeline through a Docker container.
Check the Docker docs for more information on how to run Apache Hop workflows and pipelines with Docker. Check Projects and environments for more information and best practices to set up your project . |
In the example below, we’ll run a sample pipeline. The project and environment will be provided as mounted volumes to the container (LOCAL_PATH_TO_PROJECT_FOLDER
and LOCAL_PATH_TO_ENV_FOLDER
).
Since your Airflow workflows probably will do more than just run a pipeline (e.g. perform a git clone
or git pull
first), two DummyOperators (start and end) were added to the sample.
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.docker_operator import DockerOperator from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from docker.types import Mount default_args = { 'owner' : 'airflow', 'description' : 'sample-pipeline', 'depend_on_past' : False, 'start_date' : datetime(2022, 1, 1), 'email_on_failure' : False, 'email_on_retry' : False, 'retries' : 1, 'retry_delay' : timedelta(minutes=5) } with DAG('sample-pipeline', default_args=default_args, schedule_interval=None, catchup=False, is_paused_upon_creation=False) as dag: start_dag = DummyOperator( task_id='start_dag' ) end_dag = DummyOperator( task_id='end_dag' ) hop = DockerOperator( task_id='sample-pipeline', # use the Apache Hop Docker image. Add your tags here in the default apache/hop:<TAG> syntax image='apache/hop', api_version='auto', auto_remove=True, environment= { 'HOP_RUN_PARAMETERS': 'INPUT_DIR=<YOUR_INPUT_PATH>', 'HOP_LOG_LEVEL': 'Basic', 'HOP_FILE_PATH': '${PROJECT_HOME}/etl/sample-pipeline.hpl', 'HOP_PROJECT_DIRECTORY': '/project', 'HOP_PROJECT_NAME': 'hop-airflow-sample', 'HOP_ENVIRONMENT_NAME': 'env-hop-airflow-sample.json', 'HOP_ENVIRONMENT_CONFIG_FILE_NAME_PATHS': '/project-config/env-hop-airflow-sample.json', 'HOP_RUN_CONFIG': 'local' }, docker_url="unix://var/run/docker.sock", network_mode="bridge", mounts=[Mount(source='<LOCAL_PATH_TO_PROJECT_FOLDER>', target='/project', type='bind'), Mount(source='LOCAL_PATH_TO_ENV_FOLDER', target='/project-config', type='bind')], force_pull=False ) start_dag >> hop >> end_dag
After you deploy this DAG to your Airflow dags folder (e.g. as hop-airflow-sample.py
), it will be picked up by Apache Airflow and is ready to run.
Check the Airflow logs for the sample-pipeline
task for the full Hop logs of the pipeline execution.