1

I'm trying to figure out how to reference a sql file in a another path in the same bucket as the DAG. The path where the dag lives in the bucket is dags/. It works when I put the script in that path or a child of that path but I want it to be out of the dag path. Any ideas?

PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "Test-Processing/")

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "project-id")
BUCKET_1 = os.environ.get("GCP_GCS_BUCKET_1", "bucket-name")

with DAG(dag_id='dag_id',default_args=default_args,schedule_interval="@daily", 
start_date=days_ago(1), catchup=False

) as dag:

    insert_job_operator = BigQueryInsertJobOperator(
        task_id='insert_job_operator',
        configuration={
            "query": {
                "query": "{% include '../Scripts/script.sql' %}",
                "useLegacySql": False,
            }
        }

    )
insert_job_operator 
2
  • This is running in Cloud Composer, right? Commented Jun 18, 2022 at 16:12
  • Right, with the newest versions of Airflow and Python Commented Jun 19, 2022 at 17:04

1 Answer 1

3

You may define the path to Data folder in template_searchpath argument in the DAG definition as a viable option.

Note that this works only on specific folders in Cloud Storage or mapped directories.

See working code below:

import os
from airflow import DAG
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
from pathlib import Path


PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<your-project>")


with DAG(dag_id='dag_id',schedule_interval="@daily", 
start_date=datetime(2022,6,22), catchup=False, template_searchpath = '/home/airflow/gcs/data/'

) as dag:

    insert_job_operator = BigQueryInsertJobOperator(
        task_id='insert_job_operator',
        configuration={
            "query": {
                "query": "{% include '20220621_script.sql' %}",
                "useLegacySql": False,
            }
        },
        dag=dag

    )
    
insert_job_operator 

enter image description here

enter image description here

Logs:

[2022-06-22, 01:52:50 UTC] {bigquery.py:2247} INFO - Executing: {'query': {'query': 'SELECT name, CAST (id AS string) as id, address FROM `your-project.your-dataset.your-table`;', 'useLegacySql': False}}
[2022-06-22, 01:52:50 UTC] {credentials_provider.py:324} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-06-22, 01:52:50 UTC] {bigquery.py:1560} INFO - Inserting job airflow_dag_id_insert_job_operator_2022_06_22T01_52_43_313003_00_00_d90ca3cd2e59501bd34d36064cc75c88
[2022-06-22, 01:52:51 UTC] {taskinstance.py:1279} INFO - Marking task as SUCCESS. dag_id=dag_id, task_id=insert_job_operator, execution_date=20220622T015243, start_date=20220622T015249, end_date=20220622T015251
[2022-06-22, 01:52:51 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-06-22, 01:52:51 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

Project History in Bigquery: enter image description here

Sign up to request clarification or add additional context in comments.

2 Comments

Oh, that's interesting. I'll try that. Thanks for the info!
Glad that my answer is useful to you. You may refer to this SO post: What should I do when someone answers my question?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.