I am a relatively new user to Python and Airflow and am having a very difficult time getting spark-submit
to run in an Airflow task. My goal is to get the following DAG task to run successfully
from datetime import datetime, timedelta from airflow import DAG from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator from airflow.operators.bash_operator import BashOperator default_args = { 'owner': 'matthew', 'start_date': datetime(2019, 7, 8) } dag = DAG('CustomCreate_test2', default_args=default_args, schedule_interval=timedelta(days=1)) t3 = BashOperator( task_id='run_test', bash_command='spark-submit --class CLASSPATH.CustomCreate ~/IdeaProjects/custom-create-job/build/libs/custom-create.jar', dag=dag )
I know the problem lies with Airflow and not with the bash because when I run the command spark-submit --class CLASSPATH.CustomCreate ~/IdeaProjects/custom-create-job/build/libs/custom-create.jar
in the terminal it runs successfully.
I have been getting the following error from the Airflow logs
... [2019-08-28 15:55:34,750] {bash_operator.py:132} INFO - Command exited with return code 1 [2019-08-28 15:55:34,764] {taskinstance.py:1047} ERROR - Bash command failed Traceback (most recent call last): File "/Users/matcordo2/.virtualenv/airflow/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 922, in _run_raw_task result = task_copy.execute(context=context) File "/Users/matcordo2/.virtualenv/airflow/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 136, in execute raise AirflowException("Bash command failed") airflow.exceptions.AirflowException: Bash command failed ...
I have also tried working with the SparkSubmitOperator(...)
but have had no successful runs using it, I have only ever ended up with error logs like the following
... [2019-08-28 15:54:49,749] {logging_mixin.py:95} INFO - [[34m2019-08-28 15:54:49,749[0m] {[34mspark_submit_hook.py:[0m427} INFO[0m - at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)[0m [2019-08-28 15:54:49,803] {taskinstance.py:1047} ERROR - Cannot execute: ['spark-submit', '--master', 'yarn', '--num-executors', '2', '--total-executor-cores', '1', '--executor-cores', '1', '--executor-memory', '2g', '--driver-memory', '1g', '--name', 'CustomCreate', '--class', 'CLASSPATH.CustomCreate', '--verbose', '--queue', 'root.default', '--deploy-mode', 'cluster', '~/IdeaProjects/custom-create-job/build/libs/custom-create.jar']. Error code is: 1. ...
Is there something I have to do using SparkSubmitOperator(...)
before I can run the spark-submit ...
command in a BashOperator(...)
task?
Is there a way to run my spark-submit
command directly from the SparkSubmitOperator(...)
task?
Is there anything that I have to do to spark_default
in the Admin->Connections page of Airflow?
Is there anything that must be set in the Admin->Users page of Airflow? Is there anything that must be set to allow Airflow to run spark or run a jar file created by a specific user? If so, what/how?
Advertisement
Answer
I found a workaround that solved this problem.
Create a new ssh connection (or edit the default) like the one below in the Airflow Admin->Connection page Airflow SSH Connection Example
Below is a text version if you cannot see the image
Conn ID: ssh_connection
Conn Type: SSH
Host: HOST IP ADDRESS
Username: HOST USERNAME
Password: HOST PASSWORD
Port:
Extra: {“key_file”: “/PATH TO HOME DIR/airflow/.ssh/id_rsa”, “allow_host_key_change”: “true”, “no_host_key_check”: “true”}
Then make the proper adjustments to your python script
from airflow import DAG from airflow.contrib.operators.ssh_operator import SSHOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'matthew', 'start_date': datetime(2019, 8, 28) } dag = DAG('custom-create', default_args=default_args, schedule_interval=timedelta(days=1), params={'project_source': '~/IdeaProjects/custom-create-job', 'spark_submit': '/usr/local/bin/spark-submit', 'classpath': 'CLASSPATH.CustomCreate', 'jar_file': 'build/libs/custom-create.jar'} ) templated_bash_command = """ echo 'HOSTNAME: $HOSTNAME' #To check that you are properly connected to the host cd {{ params.project_source }} {{ params.spark_submit }} --class {{ classpath }} {{ jar_file }} """ t1 = SSHOperator( task_id="SSH_task", ssh_conn_id='ssh_connection', command=templated_bash_command, dag=dag )
I hope this solution helps other people who may be running into a similar problem like I was.