You may define the path to Data folder in template_searchpath argument in the DAG definition as a viable option.
Note that this works only on specific folders in Cloud Storage or mapped directories.
See working code below:
import os
from airflow import DAG
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
from pathlib import Path
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "<your-project>")
with DAG(dag_id='dag_id',schedule_interval="@daily",
start_date=datetime(2022,6,22), catchup=False, template_searchpath = '/home/airflow/gcs/data/'
) as dag:
insert_job_operator = BigQueryInsertJobOperator(
task_id='insert_job_operator',
configuration={
"query": {
"query": "{% include '20220621_script.sql' %}",
"useLegacySql": False,
}
},
dag=dag
)
insert_job_operator


Logs:
[2022-06-22, 01:52:50 UTC] {bigquery.py:2247} INFO - Executing: {'query': {'query': 'SELECT name, CAST (id AS string) as id, address FROM `your-project.your-dataset.your-table`;', 'useLegacySql': False}}
[2022-06-22, 01:52:50 UTC] {credentials_provider.py:324} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-06-22, 01:52:50 UTC] {bigquery.py:1560} INFO - Inserting job airflow_dag_id_insert_job_operator_2022_06_22T01_52_43_313003_00_00_d90ca3cd2e59501bd34d36064cc75c88
[2022-06-22, 01:52:51 UTC] {taskinstance.py:1279} INFO - Marking task as SUCCESS. dag_id=dag_id, task_id=insert_job_operator, execution_date=20220622T015243, start_date=20220622T015249, end_date=20220622T015251
[2022-06-22, 01:52:51 UTC] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-06-22, 01:52:51 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
Project History in Bigquery:
