2

I currently am trying to spark-submit a fat jar to a local cluster, which I developed using Spark 2.4.6; Scala 2.11.12. Upon submitting to the cluster, I receive this error:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2

My spark submit command (run in cmd prompt): spark-submit --class main.app --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6 my_app_name-1.0-SNAPSHOT-jar-with-dependencies.jar

Other details:

  • Scala version: 2.11.12
  • Spark 2.4.6
  • When I submit using Spark 3.0.0 (i.e. pointing my SPARK_HOME to Spark 3.0.0 directory and submitting), it works fine, but when I submit using Spark 2.4.6 (i.e. pointing my SPARK_HOME to Spark 2.4.6 directory and submitting) I get that error
  • I have to use 2.4.6 (this cannot be changed)

My pom file

[....headers and stuff]
<groupId>org.example</groupId>
<artifactId>my_app_name</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
    <scala.version>2.11.12</scala.version>
</properties>

<repositories>
    <repository>
        <id>scala-tools.org</id>
        <name>Scala-Tools Maven2 Repository</name>
        <url>http://scala-tools.org/repo-releases</url>
    </repository>
</repositories>

<pluginRepositories>
    <pluginRepository>
        <id>scala-tools.org</id>
        <name>Scala-Tools Maven2 Repository</name>
        <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
</pluginRepositories>

<dependencies>
        <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api -->
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>5.6.0</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.specs</groupId>
        <artifactId>specs</artifactId>
        <version>1.2.5</version>
        <scope>test</scope>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.4.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.4.3</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-avro -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>2.4.3</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.4.3</version>
        <scope>provided</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>2.4.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-tools -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-tools</artifactId>
        <version>2.4.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.databricks/spark-csv -->
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.11</artifactId>
        <version>1.5.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>2.7.4</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.11.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.3.3</version>
    </dependency>
</dependencies>
<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <plugin>
            <!-- see http://davidb.github.com/scala-maven-plugin -->
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.3.2</version>
            <configuration>
                <recompileMode>incremental</recompileMode>   <!-- NOTE: incremental compilation although faster requires passing to MAVEN_OPTS="-XX:MaxPermSize=128m" -->
                <!-- addScalacArgs>-feature</addScalacArgs -->
                <args>
                    <arg>-Yresolve-term-conflict:object</arg>   <!-- required for package/object name conflict in Jenkins jar -->
                </args>
                <javacArgs>
                    <javacArg>-Xlint:unchecked</javacArg>
                    <javacArg>-Xlint:deprecation</javacArg>
                </javacArgs>
            </configuration>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>
                                    ingest_package.object_ingest
                                </mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

[....footers and stuff]

My Main App File

package main

import java.nio.file.{Files, Paths}

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.functions.{date_format, struct}

object app {

def main(args: Array[String]): Unit = {

val spark = SparkSession
  .builder()
  .master("local[*]")
  .appName("parquet_ingest_engine")
  .getOrCreate()

Logger.getLogger("org").setLevel(Level.ERROR)
val accessKeyId = System.getenv("AWS_ACCESS_KEY_ID")
val secretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY")


val person_df = spark.read.format("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat").load("s3_parquet_path_here")
val person_df_reformatted = person_df.withColumn("registration_dttm_string", date_format(person_df("registration_dttm"), "MM/dd/yyyy hh:mm"))
val person_df_final = person_df_reformatted.select("registration_dttm_string", "id", "first_name", "last_name", "email", "gender", "ip_address", "cc", "country", "birthdate", "salary", "title", "comments")

person_df_final.printSchema()
person_df_final.show(5)

val person_avro_schema = new String(Files.readAllBytes(Paths.get("input\\person_schema.avsc")))
print(person_avro_schema)

person_df_final.write.format("avro").mode("overwrite").option("avroSchema", person_avro_schema).save("output/person.avro")
print("\n" + "=====================successfully wrote avro to local path=====================" + "\n")


person_df_final.select(to_avro(struct("registration_dttm_string", "id", "first_name", "last_name", "email", "gender", "ip_address", "cc", "country", "birthdate", "salary", "title", "comments")) as "value")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "spark_topic_test")
  .save()

print("\n" + "========================Successfully wrote to avro consumer on localhost kafka consumer========================" + "\n"+ "\n")


  }
 }

2 Answers 2

1

I met the same error. And solved it by using the jar with same scala version and spark version. I see the jar version you are using (org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6 ) is consistent with your spark, maybe you can try to change the version to a close one (e.g. org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 etc).

My spark is "version 2.4.4 using Scala version 2.11.12", when I read an avro file using the following jar(spark-avro_2.12), I got the exactly same error. spark-shell --packages org.apache.spark:spark-avro_2.12:3.1.2

It was fixed after changing to "spark-shell --packages com.databricks:spark-avro_2.11:2.4.0".

Sign up to request clarification or add additional context in comments.

Comments

1

First, you have problems with dependencies:

  • you don't need com.databricks:spark-csv_2.11 - CSV support is in the Spark itself for a long time
  • you don't need Kafka dependencies except org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.6
  • spark-sql and spark-core need to be declared with <scope>provided</scope> like here
  • it's better to use the same version of Spark dependencies as you're using for submission

Second, the problem could be from the incorrect Scala version (for example, you didn't do mvn clean when you changed it) - if you said that code works with Spark 3.0 then it should be compiled with Scala 2.12, while 2.4.6 works only with 2.11

I strongly recommend to get rid of unnecessary dependencies, use provided, do mvn clean, etc.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.