3

Trying to run sample kubernetesPodOperator retrieves:

[2020-05-25 20:00:40,475] {{init.py:51}} INFO - Using executor LocalExecutor
[2020-05-25 20:00:40,475] {{dagbag.py:396}} INFO - Filling up the DagBag from /usr/local/airflow/dags/kubernetes_example.py
│ │ Traceback (most recent call last):
│ │ File "/usr/local/bin/airflow", line 37, in
│ │ args.func(args)
│ │ File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 75, in wrapper
│ │ return f(*args, **kwargs)
│ │ File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 523, in run
│ │ dag = get_dag(args)
│ │ File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 149, in get_dag
│ │ 'parse.'.format(args.dag_id))
│ │ airflow.exceptions.AirflowException: dag_id could not be found: kubernetes_example. Either the dag did not exist or it failed to parse.

This is the code I am using:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago



default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=60)
}

dag = DAG(
    'kubernetes_example', default_args=default_args, schedule_interval=timedelta(minutes=60))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='airflow',
                          image="python:3.6.10",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          env_vars={'EXAMPLE_VAR': '/example/value'},
                          in_cluster=True,
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='airflow',
                          image="ubuntu:18.04",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)

I just took it from sample executor. Did someone stumble upon this issue?

Thanks!

3
  • can you post your code? I am pretty sure you DAG definition is wrong, missing the dag_id Commented May 25, 2020 at 20:26
  • Thanks for the comment, added the DAG Commented May 25, 2020 at 20:33
  • Thanks, check my answer below Commented May 25, 2020 at 20:44

1 Answer 1

0

You need to have a name (the dag_id for your dag).

dag = DAG(
    dag_id='kubernetes_example', 
    default_args=default_args, 
    schedule_interval=timedelta(minutes=60)
)

Also your task_id should have _ not - and be: task_id="failing_task"

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks for the assistance, but its not the problem, I changed the DAG according your comment and it did not solve the issue. BTW you can see in the tutorial (airflow.apache.org/docs/stable/tutorial.html) that it not a must. tried anyway and it didnt work out for me. still appreciate the help
sorry, my bad, also my suggestion for the task_id was not necessary (I am just sticking to much to our internal guidelines). But your code on my machine generates the dag correctly, I wonder if your problem in not just an old version of Airflow on your machine
I am using the latest 1.10.10 version. may be my deployment has flows.. are using helm chart? could you please share your deployment/chart? (only if possible). Thanks anyway mucio!
I am using an internal multi-tenant Airflow. I am also wondering if the dag your Airflow is trying to load is the actual file you are modifying or an older copy
The thing that realy drives me crazy is it outputs: ``` INFO - Using executor LocalExecutor``` . Tried changing env var, but it didnt work, may be you have an idea?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.