1

With Apache Beam SDK, registered coder doesn't work.

I would like to use SimpleFunction with BigQuery's TableSchema but it needs to be serialized. I add TableSchemaCoder into CodeRegistry but it doesn't seem to be used.

How can I solve it?

// Coder

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

public class TableSchemaCoder extends AtomicCoder<TableSchema> {
    public static class FieldSchema {
        private String name;
        private String type;
        private String mode;

        public FieldSchema(String name, String type, String mode) {
            this.name = name;
            this.type = type;
            this.mode = mode;
        }

        /* setter / getter */
    }

    private final StringUtf8Coder stringCoder = StringUtf8Coder.of();

    @Override
    public TableSchema decode(InputStream inStream) throws IOException {
        return new SchemaBuilder().build(stringCoder.decode(inStream));
    }

    @Override
    public void encode(TableSchema value, OutputStream outStream) throws IOException {
        List<JSONObject> fields = new ArrayList<>();
        for (TableFieldSchema s : value.getFields()) {
            fields.add(new JSONObject(new FieldSchema(s.getName(), s.getType(), s.getMode())));
        }
        String json = new JSONArray(fields).toString();
        stringCoder.encode(json, outStream);
    }
}



// Pipeline

// ...

CodeRegistry cr = pipeline.getCodeRegistry
cr.registerCoderForClass(TableSchema.class, TableSchemaCoder())

// ...

TableSchema schema = getSchema()
pipeline.apply(MapElements.via(RowParser(schema)))

Error messages:

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.MapElements$1@7ac2e39b
        at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
        at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
        at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
        at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
        at org.apache.beam.sdk.transforms.MapElements.expand(MapElements.java:118)
        at org.apache.beam.sdk.transforms.MapElements.expand(MapElements.java:30)
        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514)
        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454)
        at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:284)

Caused by: java.io.NotSerializableException: com.google.api.services.bigquery.model.TableSchema
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
        ... 9 more

1 Answer 1

2

You didn't share the code for RowParser, but I am guessing it has a TableSchema as a field. A Coder is only used for encoding data within your pipeline. Functions such as the RowParser must use Java serialization which doesn't use the registered coders.

Depending on how you are generating the table schema, you have a few options:

  1. Have the RowParser store it as a string or in some in some other, serializable format. It can have a transient field for the actual TableSchema object, and initialize that field (if it is null) from the serializable format.

  2. Implement Java serialization hooks for serializing RowParser that avoid serializing the TableSchema. This would probably be similar to the above.

  3. Compute the schema the first time the RowParser is used.

Sign up to request clarification or add additional context in comments.

2 Comments

Check out this pattern that BigQueryIO itself uses to store a TableSchema in a closure: github.com/apache/beam/blob/master/sdks/java/io/…
Thank you for nice answer! I didn't know > A Coder is only used for encoding data within your pipeline. I understood. I'll try it.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.