0

Can I insert data to different bigQuery datasets according to data I treat in a previous dataflow step?


I am creating a dataflow pipeline, it's reading from PubSub subscription and writing to big query table. It is defined as follow:

def run(argv=None, save_main_session=True):
    options: PipelineOptions = PipelineOptions(
        project='project-id',
        runner='DataflowRunner',
        region='region',
        streaming=True,
        setup_file='dataflow/setup.py',
        autoscaling_algorithm='THROUGHPUT_BASED',
        job_name='telemetry-processing'
    )

    with beam.Pipeline(options=options) as p:
        status = (
                p
                 | 'Get Status PubSub' >> beam.io.ReadFromPubSub(
            subscription='projects/project-id/subscriptions/subscription-id',
            with_attributes=True))

        status_records = (status| 'Proto to Dict' >> beam.Map(lambda x: 
convert_proto_to_dict(x, nozzle_status_proto.NozzleStatus)) )

        status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project- 
id:dataset-id.table-id')

         bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes()))
         bytes_status | 'Write to BQ BackUp' >> beam.io.WriteToBigQuery(
        'project-id:dataset-id.backup-table-id')

It is working exactly as expected for given input and output.
What I want is, regarding a particular attribute I have in my PubSubMessage, to define on which dataset my message should go. So the part I need to change is this one:

status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:dataset-id.table-id')

I already tried to extract the needed data and use it like this:

status_records | 'Write status to BQ' >> beam.io.WriteToBigQuery('project-id:{data-from-previous-step}.table-id')

But we can't get data from a PCollection directly.

I tried to overwrite WriteToBigQuery as in this post (How can I write to Big Query using a runtime value provider in Apache Beam?) but I got no error and nothing insert.

I do not see how to achieve this.
Do you know where I should start to do this?
Do I have to create n pipeline(s) for n dataset?

1

1 Answer 1

1

The "table" parameter of WriteToBigQuery can be a function from element to the table it should be written to. For example:

status_records | 'Write' >> beam.io.WriteToBigQuery(
  lambda e: 'dataset1.invalid_records' if is_invalid(e) else 'dataset2.good_records')
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.