1

i have a flink demo, to find a column of dataSet 1 not in an other dataSet. i write it whit flink sql. it seem ok with the code, but does not work.

the version i use is:

  • flink.version: 1.7.1
  • java.version: 1.8
  • scala.binary.version: 2.12

this is my flink demo:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.List;

public class TestUnScoreItem {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        List<Tuple3<String, String, Integer>> leftList = new ArrayList<>();
        leftList.add(new Tuple3<>("U1", "Item1", 4));
        leftList.add(new Tuple3<>("U1", "Item3", 7));
        leftList.add(new Tuple3<>("U1", "Item5", 2));
        leftList.add(new Tuple3<>("U2", "Item2", 9));
        leftList.add(new Tuple3<>("U2", "Item3", 3));
        leftList.add(new Tuple3<>("U3", "Item1", 3));

        List<Tuple1<String>> rightList = new ArrayList<>();
        rightList.add(new Tuple1<>("Item1"));
        rightList.add(new Tuple1<>("Item2"));
        rightList.add(new Tuple1<>("Item3"));
        rightList.add(new Tuple1<>("Item4"));
        rightList.add(new Tuple1<>("Item5"));

        DataSource<Tuple3<String, String, Integer>> userScoreSet = env.fromCollection(leftList);
        DataSource<Tuple1<String>> allItemSet = env.fromCollection(rightList);

        BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

        tableEnv.registerDataSet("userScoreTable", userScoreSet, "user,item,score");
        tableEnv.registerDataSet("allItemTable", allItemSet, "item2");

        Table unScoreTable = tableEnv.sqlQuery("select user, item from userScoreTable where item not in (select item2 from allItemTable) ");

        DataSet<Row> result = tableEnv.toDataSet(unScoreTable, Row.class);
        result.print();
    }
}

and i get this exception

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/ProcessFunction
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.flink.table.plan.nodes.dataset.DataSetAggregate.translateToPlan(DataSetAggregate.scala:107)
    at org.apache.flink.table.plan.nodes.dataset.DataSetSingleRowJoin.translateToPlan(DataSetSingleRowJoin.scala:99)
    at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
    at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165)
    at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
    at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
    at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
    at org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:147)
    at com.jychan.easycode.recommend.training.TestUnScoreItem.main(TestUnScoreItem.java:65)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.functions.ProcessFunction
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 21 more

Process finished with exit code 1

is some on know how to fit it? or there is any other ways to get the same answer? thanks!

add the dependencys

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.7.1</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.7</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
        <scope>runtime</scope>
    </dependency>
</dependencies>
4
  • Hey, can You also paste your dependencies here? Also, do You run it on some cluster or is it just local execution ? Commented May 5, 2019 at 12:48
  • Your application compiles and runs just fine in my IDE. You probably have a missing dependency. Commented May 5, 2019 at 15:43
  • @DavidAnderson already add the dependency Commented May 6, 2019 at 2:01
  • @DominikWosiński i just run it local with idea Commented May 6, 2019 at 2:02

1 Answer 1

2

oh, i found what's wrong

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

remove the "provided" it work. thanks all.

Sign up to request clarification or add additional context in comments.

1 Comment

That worked! Here's more info about the "provided" scope: ci.apache.org/projects/flink/flink-docs-release-1.8/dev/… In my case, the problems started after I updated IntelliJ to 2019.2 (had 2018.3 previously). So maybe the workaround suggested in the above docs doesn't work anymore with the newer versions of the IDE?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.