10

I have a simple pipeline in dataflow 2.1 sdk. Which reads data from pubsub then applies a DoFn to it.

PCollection<MyClass> e = streamData.apply("ToE", ParDo.of(new MyDoFNClass()));

Getting below error on this pipeline:

java.lang.IllegalStateException: Unable to return a default Coder for ToEvents/ParMultiDo(MyDoFNClass).out0 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for com.X.X.model.MyClass.

MyDoFn class is below:

@DefaultCoder(AvroCoder.class)

public class MyClass{

    public long id;
    public HashMap<String,HashSet<String>> a;

    @SerializedName("a")
    public Integer Id;
    @SerializedName("ae")
    public String ae;
}
1
  • 1) What does MyDoFNClass look like? 2) Alternatively, have you tried specifying a coder manually using .setCoder() as the message suggests? Commented Dec 8, 2017 at 0:56

2 Answers 2

27

Found the solution just neeeded to add implements Serializable to MyClass

@DefaultCoder(AvroCoder.class)

public class MyClass implements Serializable {

public long id;
public HashMap<String,HashSet<String>> a;

@SerializedName("a")
public Integer Id;
@SerializedName("ae")
public String ae;
}
Sign up to request clarification or add additional context in comments.

1 Comment

To clarify - for this error, add implements Serializable to the class that defines the objects that your storing in the PCollection, i.e. add it to any class that describes the entity being serialized/deserialized
1

Below are some docs about coder from beam programming guide

The Beam SDKs require a coder for every PCollection in your pipeline. In most cases, the Beam SDK is able to automatically infer a Coder for a PCollection based on its element type or the transform that produces it, however, in some cases the pipeline author will need to specify a Coder explicitly, or develop a Coder for their custom type.

Each Pipeline object has a CoderRegistry object, which maps language types to the default coder the pipeline should use for those types. You can use the CoderRegistry yourself to look up the default coder for a given type, or to register a new default coder for a given type.

Go to the below link to see default coders used by beam libraries - https://beam.apache.org/documentation/programming-guide/#default-coders-and-the-coderregistry

If your object that you are using in pCollections does not lies within the default coder, you may have to provide a custom coder for that object. for e.g. If you look at the implemenation of PubsubIO.write()/PubsubIO.read() methods, they use a custom coder. e.g. PubsubMessagePayloadOnlyCoder

Suppose you are converting a string into Pubsub Message. You can supply this coder to your pcollection.

PCollection<PubsubMessage> pubsubMessagePCollection = pCollectionTuple.get(accountId);
pubsubMessagePCollection.setCoder(PubsubMessagePayloadOnlyCoder.of());

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.