9

I am trying to execute a pipeline using Apache Beam but I get an error when trying to put some output tags:

import com.google.cloud.Tuple;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;

import java.lang.reflect.Type;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * The Transformer.
 */
class Transformer {
    final static TupleTag<Map<String, String>> successfulTransformation = new TupleTag<>();
    final static TupleTag<Tuple<String, String>> failedTransformation = new TupleTag<>();

    /**
     * The entry point of the application.
     *
     * @param args the input arguments
     */
    public static void main(String... args) {
        TransformerOptions options = PipelineOptionsFactory.fromArgs(args)
                .withValidation()
                .as(TransformerOptions.class);

        Pipeline p = Pipeline.create(options);

        p.apply("Input", PubsubIO
                .readMessagesWithAttributes()
                .withIdAttribute("id")
                .fromTopic(options.getTopicName()))
                .apply(Window.<PubsubMessage>into(FixedWindows
                        .of(Duration.standardSeconds(60))))
                .apply("Transform",
                        ParDo.of(new JsonTransformer())
                                .withOutputTags(successfulTransformation,
                                        TupleTagList.of(failedTransformation)));

        p.run().waitUntilFinish();
    }

    /**
     * Deserialize the input and convert it to a key-value pairs map.
     */
    static class JsonTransformer extends DoFn<PubsubMessage, Map<String, String>> {

        /**
         * Process each element.
         *
         * @param c the processing context
         */
        @ProcessElement
        public void processElement(ProcessContext c) {
            String messagePayload = new String(c.element().getPayload());
            try {
                Type type = new TypeToken<Map<String, String>>() {
                }.getType();
                Gson gson = new Gson();
                Map<String, String> map = gson.fromJson(messagePayload, type);
                c.output(map);
            } catch (Exception e) {
                LOG.error("Failed to process input {} -- adding to dead letter file", c.element(), e);
                String attributes = c.element()
                        .getAttributeMap()
                        .entrySet().stream().map((entry) ->
                                String.format("%s -> %s\n", entry.getKey(), entry.getValue()))
                        .collect(Collectors.joining());
                c.output(failedTransformation, Tuple.of(attributes, messagePayload));
            }

        }
    }
}

The error shown is:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Transform.out1 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V. Building a Coder using a registered CoderProvider failed. See suppressed exceptions for detailed failures. Using the default output Coder from the producing PTransform failed: Unable to provide a Coder for V. Building a Coder using a registered CoderProvider failed.

I tried different ways to fix the issue but I think I just do not understand what is the problem. I know that these lines cause the error to happen:

.withOutputTags(successfulTransformation,TupleTagList.of(failedTransformation))

but I do not get which part of it, what part needs a specific Coder and what is "V" in the error (from "Unable to provide a Coder for V").

Why is the error happening? I also tried to look at Apache Beam's docs but they do not seems to explain such a usage nor I understand much from the section discussing about coders.

Thanks

2 Answers 2

23

First, I would suggest the following -- change:

final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<>();
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<>();

into this:

final static TupleTag<Map<String, String>> successfulTransformation = 
    new TupleTag<Map<String, String>>() {};
final static TupleTag<Tuple<String, String>> failedTransformation = 
    new TupleTag<Tuple<String, String>>() {};

That should help the coder inference determine the type of the side output. Also, have you properly registered a CoderProvider for Tuple?

Sign up to request clarification or add additional context in comments.

5 Comments

The change does not appear to do anything at the current stage. And no, I have not registered a CoderProvider for Tuple as it's not clear to me what is the way I am supposed to do it, would you be able to give more info?
I replaced Tuple with a class made by myself which extends Serializable and your answer solved my issue. Why? What changes between what I did and what you suggested? Also do you have any guess on why Google Cloud's Tuple does not extends Serializable and can't be inherited?
I'm not familiar with Google Cloud's Tuple. Can you link to where it came from? Generally, using Serializable is going to be less efficient than using a custom coder, or something like Avro because of Java serialization. See the documentation on registering Coders.
The reason why has to do with Java generics and erasure. Within the SDK we need to figure out the types in order to infer which Coder to use. new TupleTag<>() has no type information after erasure, which makes this impossible. new TupleTag<String>() {} actually creates an anonymous subclass with no generic arguments, which allows us to play tricks with reflection and determine that the actual type is String, which then lets us look up Coder<String>.
Anyway Google Cloud's Tuple is very simple: googlecloudplatform.github.io/google-cloud-java/0.25.0/apidocs/…
2

Thanks to @Ben Chambers' answer, Kotlin is:

val successTag = object : TupleTag<MyObj>() {}
val deadLetterTag = object : TupleTag<String>() {}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.