I am new to airflow and using airflow 2.6.1 on a macOS with M1. I have a simple ETL process which uses GET request to ingest data in JSON and transform it and then save it to a db. I can do it without fuss with other methods.
I wanted to see how easy it is to do in airflow and got stuck with the API call. Read the blogs and documentation and understood about ‘PythonOperator’ and ‘SimpleHttpOperator’ but haven’t been able to make a successful GET call yet using airflow. The dag gets stuck in the API request and times out.
I have attached log files for reference.
Example 1: Using SimpleHttpOperator
import requests
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow import settings
from airflow.models import Connection
dag = DAG(dag_id = 'weatherapi',
#default_args = default_args,
start_date = datetime(2023, 6, 17),
schedule = None,
catchup = False,
)
t1 = SimpleHttpOperator(task_id = 'weather_api_dag',
http_conn_id = 'weather_api',
endpoint = '/direct',
headers = {
'q' : 'Rugby',
'appid' : 'APP_TOKEN'
},
method = 'GET',
response_filter = lambda response: response.json()['nested']['property'],
log_response = True,
dag = dag)
t1
Please note that I used the airflow UI to create a HTTP connect with hostname as
http://api.openweathermap.org/geo/1.0
and named it 'weatherapi' and passed the APP_TOKEN as well
SimpleHTTPOperator Log:
AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T21:32:39.994216+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T21:32:39.994216+00:00'
[2023-06-17T22:32:44.069+0100] {http.py:123} INFO - Calling HTTP method
[2023-06-17T22:32:44.073+0100] {base.py:73} INFO - Using connection ID 'weather_api' for task execution.
[2023-06-17T22:37:36.114+0100] {local_task_job_runner.py:291} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2023-06-17T22:37:36.122+0100] {process_utils.py:131} INFO - Sending Signals.SIGTERM to group 173. PIDs of all processes in the group: [173]
[2023-06-17T22:37:36.122+0100] {process_utils.py:86} INFO - Sending the signal Signals.SIGTERM to group 173
Example 2: Using PythonOperator
import requests
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
def extract_data():
print('Starting GET request')
# Define the API endpoint URL
url = "https://jsonplaceholder.typicode.com/users"
# Send a GET request to the API
response = requests.get(url)
# Parse the JSON data from the response
data = response.json()
return data
dag = DAG(dag_id = "api_check",
#default_args = default_args,
start_date = datetime(2023, 6, 16),
schedule = None,
catchup = False,
tags = ["example"],)
t1 = PythonOperator(task_id = 'test',
python_callable = extract_data,
dag = dag)
t1
In the PythonOperator example, airflow reads “Starting GET request” but then gets stuck with the request call.
appreciate any help and guidance on this. The logs don’t give much info. I just stopped the task as ‘failed’.
PythonOperator Log:
2023-06-17T18:39:20.702+0100] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='api_check' AIRFLOW_CTX_TASK_ID='test' AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T17:39:17.671023+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T17:39:17.671023+00:00'
[2023-06-17T18:39:20.703+0100] {logging_mixin.py:149} INFO - Starting GET request
[2023-06-17T18:44:42.297+0100] {local_task_job_runner.py:291} WARNING - State of this instance has been externally set to failed. Terminating instance.
[2023-06-17T18:44:42.302+0100] {process_utils.py:131} INFO - Sending Signals.SIGTERM to group 91889. PIDs of all processes in the group: [91889]