2

I need to apply error handling to my Dataflow for multiple inserts to Spanner with the same primary key. The logic being that an older message may be received after the current message and I do not want to overwrite the saved values. Therefore I will create my mutation as an insert and throw an error when a duplicate insert is attempted.

I have seen several examples of try blocks within DoFn's that write to a side output to log any errors. This is a very nice solution but I need to apply error handling to the step that writes to Spanner which does not contain a DoFn

spannerBranchTuples2.get(spannerOutput2)
    .apply("Create Spanner Mutation", ParDo.of(createSpannerMutation))                      
    .apply("Write Spanner Records", SpannerIO.write()
        .withInstanceId(options.getSpannerInstanceId())                  
        .withDatabaseId(options.getSpannerDatabaseId())
        .grouped());

I have not found any documentation that allows error handling to be applied to this step, or found a way to re-write it as a DoFn. Any suggestions how to apply error handling to this? thanks

1 Answer 1

5

There is an interesting pattern for this in Dataflow documentation.

Basically, the idea is to have a DoFn before sending your results to your writing transforms. It'd look something like so:

    final TupleTag<Output> successTag = new TupleTag<>() {};
    final TupleTag<Input> deadLetterTag = new TupleTag<>() {};
    PCollection<Input> input = /* … */;
    PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {
      @Override
      void processElement(ProcessContext c) {
      try {
        c.output(process(c.element());
      } catch (Exception e) {
        LOG.severe("Failed to process input {} -- adding to dead letter file",
          c.element(), e);
        c.sideOutput(deadLetterTag, c.element());
      }
    }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
    
    outputTuple.get(deadLetterTag)
      .apply(/* Write to a file or table or anything */);

    outputTuple.get(successTag)
      .apply(/* Write to Spanner or any other sink */);

Let me know if this is useful!

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.