1

Error while creating a DataStream using fromElements function

Below is the expeption -

Caused by: java.io.IOException: Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions. Serializer is org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@599fcdda at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:121) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745)

3
  • 1
    What is the type you're providing to the fromElements method? Commented Dec 20, 2015 at 8:48
  • it's InputStreamReader Commented Dec 20, 2015 at 9:40
  • Below is the sample - private static DataStream<InputStreamReader> getStream(StreamExecutionEnvironment env) { InputStreamReader isr=null; try { URL url = new URL("ex.in/res"); HttpURLConnection httpconn = (HttpURLConnection) url.openConnection(); if (httpconn.getResponseCode() != 200) throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode()); isr = new InputStreamReader((httpconn.getInputStream())); } catch (Exception e){} return env.fromElements(isr); } Commented Dec 20, 2015 at 16:25

2 Answers 2

1

Why do you want to process InputStreamReader tuples? I guess there is some miss understanding here. The generic type specifies the type of the data you want to process. For example

DataStream<Integer> intStream = env.fromElements(1, 2, 3, 4, 5);

Generate a finite data stream with 5 Integer tuples. I assume that you actually want to use an InputStreamReader to generate the actual tuples.

If you want to read via HttpURLConnection you could implement your own SourceFunction (or RichSourceFunction) as follows (replace OUT with you actual data type you want to use -- also consider Flinks Tuple0 to Tuple25 types):

env.addSource(new SourceFunction<OUT> {
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<OUT> ctx) {
        InputStreamReader isr = null;
        try {
            URL url = new URL("ex.in/res");
            HttpURLConnection httpconn = (HttpURLConnection) url.openConnection();
            if (httpconn.getResponseCode() != 200)
                throw new RuntimeException("Failed : HTTP error code : " + httpconn.getResponseCode());
            isr = new InputStreamReader((httpconn.getInputStream()));
        } catch (Exception e) {
            // clean up; log error
            return;
        }

        while(isRunning) {
            OUT tuple = ... // get data from isr
            ctx.collect(tuple);
        }
    }

    @Override
    public void cancel() {
         this.isRunning = false;
    }
});
Sign up to request clarification or add additional context in comments.

3 Comments

I am very much new to Flink. I am trying to read a Web log which is accessible through a HTTP URL. Hence I was trying that way. Which methods/objects does Flink provide in order to enable users to read HTTP streams?
Maybe it would make sense for you to implement a custom source. Just have a look at the SourceFunction interface. The SourceContext from the run() method allows you to emit elements.
Just extend my answer.
0

You cannot create a DataStream<InputStreamReader> with fromElements since the InputStreamReader is not serializable. That's required by the fromElements method. Furthermore, it probably does not make so much sense to work on InputStreamReaders. I guess it would be better to simply read the data from the HttpURLConnection and then continue working on this data.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.