0

I am new to airflow and using airflow 2.6.1 on a macOS with M1. I have a simple ETL process which uses GET request to ingest data in JSON and transform it and then save it to a db. I can do it without fuss with other methods.

I wanted to see how easy it is to do in airflow and got stuck with the API call. Read the blogs and documentation and understood about ‘PythonOperator’ and ‘SimpleHttpOperator’ but haven’t been able to make a successful GET call yet using airflow. The dag gets stuck in the API request and times out.

I have attached log files for reference.

Example 1: Using SimpleHttpOperator

import requests
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow import settings
from airflow.models import Connection

dag = DAG(dag_id = 'weatherapi',
          #default_args = default_args,
          start_date = datetime(2023, 6, 17),
          schedule = None,
          catchup = False,
          )

t1 = SimpleHttpOperator(task_id = 'weather_api_dag',
                        http_conn_id = 'weather_api',
                        endpoint = '/direct',
                        headers = {
                                    'q' : 'Rugby',
                                    'appid' : 'APP_TOKEN'
                                    },
                        method = 'GET',
                        response_filter = lambda response: response.json()['nested']['property'],
                        log_response = True,
                        dag = dag)

t1

Please note that I used the airflow UI to create a HTTP connect with hostname as

http://api.openweathermap.org/geo/1.0

and named it 'weatherapi' and passed the APP_TOKEN as well

SimpleHTTPOperator Log:

AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T21:32:39.994216+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T21:32:39.994216+00:00'
[2023-06-17T22:32:44.069+0100] {http.py:123} INFO - Calling HTTP method
[2023-06-17T22:32:44.073+0100] {base.py:73} INFO - Using connection ID 'weather_api' for task execution.
[2023-06-17T22:37:36.114+0100] {local_task_job_runner.py:291} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2023-06-17T22:37:36.122+0100] {process_utils.py:131} INFO - Sending Signals.SIGTERM to group 173. PIDs of all processes in the group: [173]
[2023-06-17T22:37:36.122+0100] {process_utils.py:86} INFO - Sending the signal Signals.SIGTERM to group 173

Example 2: Using PythonOperator

import requests
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

def extract_data():

    print('Starting GET request')
    # Define the API endpoint URL
    url = "https://jsonplaceholder.typicode.com/users"

    # Send a GET request to the API
    response = requests.get(url)

    # Parse the JSON data from the response
    data = response.json()

    return data


dag = DAG(dag_id = "api_check",
          #default_args = default_args,
          start_date = datetime(2023, 6, 16),
          schedule = None,
          catchup = False,
          tags = ["example"],)


t1 = PythonOperator(task_id = 'test',
                    python_callable = extract_data,
                    dag = dag)

t1

In the PythonOperator example, airflow reads “Starting GET request” but then gets stuck with the request call.

appreciate any help and guidance on this. The logs don’t give much info. I just stopped the task as ‘failed’.

PythonOperator Log:

2023-06-17T18:39:20.702+0100] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='api_check' AIRFLOW_CTX_TASK_ID='test' AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T17:39:17.671023+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T17:39:17.671023+00:00'
[2023-06-17T18:39:20.703+0100] {logging_mixin.py:149} INFO - Starting GET request
[2023-06-17T18:44:42.297+0100] {local_task_job_runner.py:291} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2023-06-17T18:44:42.302+0100] {process_utils.py:131} INFO - Sending Signals.SIGTERM to group 91889. PIDs of all processes in the group: [91889]
3
  • 1
    I ran it and it works fine. might be something in your airflow installation and not in the code. check for other logs under $AIRFLOWHOME/logs. how do you install airflow ? pip or docker ? Commented Jun 18, 2023 at 6:46
  • Both options worked fine for you? pip using a virtual env. Will try docker. Thanks for ur reply. Commented Jun 18, 2023 at 7:15
  • I tested only the PythonOpertator Commented Jun 18, 2023 at 7:36

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.