1

In my Flink app, I found this log:

Field EnrichmentInfo#groupIds will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.

The class is something like that:

import java.util.List;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
public class EnrichmentInfo {
  private List<String> groupIds;
  ...
}

Is there any best practice I am not following to avoid serialization with Kryo?

2 Answers 2

2

you've encountered a common issue. To resolve it, add this class:

public class ListInfoFactory<E> extends TypeInfoFactory<List<E>> {

    @Override
    public TypeInformation<List<E>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
        TypeInformation<?> elementType = genericParameters.get("E");
        if (elementType == null) {
            throw new InvalidTypesException("Type extraction is not possible on List (element type unknown).");
        }

        return Types.LIST((TypeInformation<E>) elementType);
    }
}

and then annotate your groupIds field with @TypeInfo(ListInfoFactory.class)

Sign up to request clarification or add additional context in comments.

Comments

-1

Add Supporting Getters/Setters

A List<String> is a custom collection type within Flink so it should be supported as such. However, the declaration above indicates that it's a private field so depending on the availability of getters/setters could impact it (e.g. getGroupIds() and setGroupIds() respectively), so you may need to consider implementing those to back this field:

public class EnrichmentInfo {
  private List<String> groupIds;
  ...
  public List<String> getGroupIds() { return groupIds; }
  public void setGroupIds(List<String> groupIds) { this.groupIds = groupIds; }
}

General Serialization / POJO Rules

Generally you are implementing your own custom classes, you'll want to ensure that you are following the proper POJO rules defined by Flink to ensure that the class and its underlying properties aren't falling back to generics/Kryo:

  • The class must be public.
  • It must have a public constructor without arguments (default constructor).
  • All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter methods must be named getFoo() and setFoo().
  • The type of a field must be supported by a registered serializer.

3 Comments

I have just read all the documentation about POJO rules. As you can see in my code, the annotations @Getter and @Setter just satisfy the "All fields are either public or must be accessible through getter and setter functions" rule
As far as your class is concerned, you are using lombok.Getter/Setter for it. You have to be very careful when working with Flink along with non-native libraries as they do not always work as expected. Have you tried to explicitly declare the class as you might in Java directly (e.g. no args constructor, explicitly implementing getters/setters). Your error mentions the groupIds as a private field but your example has no mention of getters/setters for it.
With all that being said, Ken's answer above should help you out if you need to explicitly provide type information for the class mentioned regarding the groupIds issue.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.