2

I have a simple PySpark program which publishes data into kafka. when i do a spark-submit, it gives error

Command being run :

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0 ~/PycharmProjects/Kafka/PySpark_Kafka_SSL.py

Error :

Traceback (most recent call last):
  File "/Users/karanalang/PycharmProjects/Kafka/PySpark_Kafka_SSL.py", line 33, in <module>
    df.write.format('kafka')\
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 738, in save
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py", line 1309, in __call__
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2/python/lib/py4j-0.10.9.2-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o41.save.
: java.lang.NoClassDefFoundError: scala/$less$colon$less
    at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:180)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
    ... 42 more

Spark Version - 3.2.0; I've confluent kafka installed on my m/c, here is the version :

Karans-MacBook-Pro:confluent-6.2.1 karanalang$ confluent local services kafka version
The local commands are intended for a single-node development environment only,
NOT for production usage. https://docs.confluent.io/current/cli/index.html

6.2.1-ce

Here is the code :

import sys, datetime, time, os
from pyspark.sql.functions import col, rank, dense_rank, to_date, to_timestamp, format_number, row_number, lead, lag,monotonically_increasing_id
from pyspark.sql import SparkSession, Window


spark = SparkSession.builder.appName('StructuredStreaming_KafkaProducer').getOrCreate()

kafkaBrokers='host:port'
# CA Root certificate ca.crt
caRootLocation='/Users/karanalang/Documents/Technology/strimzi/gcp-certs-dec3/caroot.pem'
# user public (user.crt)
certLocation='/Users/karanalang/Documents/Technology/strimzi/gcp-certs-dec3/my-bridge/my-bridge-user-crt.pem'
# user.key
keyLocation='/Users/karanalang/Documents/Technology/strimzi/gcp-certs-dec3/my-bridge/user-with-certs.pem'
password='passwd'
topic = "my-topic"

df = spark.read.csv("data/input.txt", header=False)

df.write.format('kafka')\
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("security.protocol","SSL")\
    .option("ssl.ca.location",caRootLocation)\
    .option("ssl.certificate.location", certLocation)\
    .option("ssl.key.location",keyLocation)\
    .option("ssl.key.password",password)\
    .option("subscribe", topic) \
    .save()

Any ideas what the issue is ? The Spark version seems to be matching the jar tia!

1 Answer 1

8

The error:

Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less

Usually pops up when something is not right with the Scala version.

If you run spark-shell, you'll get the output:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/
         
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 1.8.0_292)

It says: Using Scala version 2.12.15

It also mentions: "For the Scala API, Spark 3.2.0 uses Scala 2.12. You will need to use a compatible Scala version (2.12.x)", in the docs.

But when we look at the spark-sql-kafka-0-10_2.13:3.2.0 in the Maven repository: Kafka 0.10+ Source For Structured Streaming » 3.2.0 it says: Scala target: Scala 2.13

I would try to specify Scala version in spark-sql-kafka, you can find desired Scala version by going to "View all targets".

Try with: Kafka 0.10+ Source For Structured Streaming » 3.2.0:

Note the change: spark-sql-kafka-0-10_2.13:3.2.0 -> spark-sql-kafka-0-10_2.12:3.2.0

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 ~/PycharmProjects/Kafka/PySpark_Kafka_SSL.py
Sign up to request clarification or add additional context in comments.

2 Comments

thanks for the input, that makes sense and helped me resolve the issue.. however, wondering why the maven repository shows scala 2.13 & 2.12 support for Spark 3.2.0 pls refer - mvnrepository.com/artifact/org.apache.spark/… , any ideas on this ?
@KaranAlang You're welcome, I'm glad I helped! To be honest, I'm not sure, I'll have to check it out, maybe they expect a bump in Scala version. Also, if this answered your question, please mark it as answered.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.