0

I have some SQL to be triggered by airflow BigQueryOperator, one of the SQL is common for all tasks given below situation:

SQL to run - common.sql, abc.sql, xyz.sql

  • Task1 - common.sql + abc.sql
  • Task2 - common.sql + xyz.sql

In order for one task run 2 SQL, I read 2 SQL files into a string, then run the merged-string to run task in one go.

Code looks like this:

with open ('common.sql', "r") as sqlfile:
     common_array = sqlfile.readlines()

with open ('abc.sql', "r") as sqlfile:
     abc_array    = sqlfile.readlines()
            
# at this point, sql_script has all codes from common.sql and abc.sql
sql_script = ''.join(common_array) + '\n' + ''.join(abc_array)

BigQueryOperator(task_id='task1', sql=sql_script)

This serve my purpose, is there any other way that more elegant?

0

1 Answer 1

0

Instead of reading manually file by file, you can create a list that will contain your files. This will be helpful if you have multiple files. For each file read contents, join it to the common_array and create a key value pair per joined common and sql file.

For testing test1.sql, test2.sql, test3.sql and common.sql contains single line query.

import json
with open ('common.sql', "r") as sqlfile:
     common_array = sqlfile.readlines()

file_arr = ['test1.sql','test2.sql','test3.sql']
sql_dict = {}
for data in file_arr:
    with open (data, "r") as sqlfile:
        key = data.split('.')[0]
        value = ''.join(common_array) + '\n' + ''.join(sqlfile.read().rstrip())
        sql_dict[key] = value

print(json.dumps(sql_dict, sort_keys=False, indent=2))

print("\nDictionary value for sql_dict['test1']: \n"+ sql_dict['test1'])

Output:

{
  "test1": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable` LIMIT 10",
  "test2": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_2` LIMIT 10",
  "test3": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_3` LIMIT 10"
}

Dictionary value for sql_dict['test1']:
SELECT * FROM `my-project.my_dataset.common` LIMIT 10

SELECT * FROM `my-project.my_dataset.myTable` LIMIT 10

You can then use the values in the dictionary to BigQueryOperator.

BigQueryOperator(task_id='task1', sql=sql_dict['test1'])
Sign up to request clarification or add additional context in comments.

2 Comments

Ricco, thanks for answering the question. Your solution is similar, also write the script content to a string then assign to BigQueryOperator parameter. I'm expecting something like assign a filepath, but instead of assign only one file, we can assign multi files. For example: BigQueryOperator(task_id='task1', sql=['common.sql', 'abc.sql']) (I try this but it won't work). If I can assign filepaths, the codes will look more concise.
@BrianMo Yes it is quite similar, my solution is intended to remove multiple lines of joining common and sql file. I checked the source code of BigQueryOperator and as of now it can only accepts strings thus it is not yet possible to pass a list of sql files. Thus your workaround is your only option for now.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.