9

I am trying to read a stream from kafka using pyspark. I am using spark version 3.0.0-preview2 and spark-streaming-kafka-0-10_2.12 Before this I just stat zookeeper, kafka and create a new topic:

/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties 
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic data_wm

This is my code:

import pandas as pd
import os
import findspark
findspark.init("/usr/local/spark")
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TestApp").getOrCreate()
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "data_wm") \
  .load() 
value = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 

This how I run my script:

sudo --preserve-env=pyspark /usr/local/spark/bin/pyspark --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0-preview

As result for this command I have this :

: resolving dependencies :: org.apache.spark#spark-submit-parent-0d7b2a8d-a860-4766-a4c7-141a902d8365;1.0
        confs: [default]
        found org.apache.spark#spark-streaming-kafka-0-10_2.12;3.0.0-preview in central
        found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0-preview in central
        found org.apache.kafka#kafka-clients;2.3.1 in central
        found com.github.luben#zstd-jni;1.4.3-1 in central
        found org.lz4#lz4-java;1.6.0 in central
        found org.xerial.snappy#snappy-java;1.1.7.3 in central
        found org.slf4j#slf4j-api;1.7.16 in central
        found org.spark-project.spark#unused;1.0.0 in central :: resolution report :: resolve 380ms :: artifacts dl 7ms
        :: modules in use:
        com.github.luben#zstd-jni;1.4.3-1 from central in [default]
        org.apache.kafka#kafka-clients;2.3.1 from central in [default]
        org.apache.spark#spark-streaming-kafka-0-10_2.12;3.0.0-preview from central in [default]
        org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0-preview from central in [default]
        org.lz4#lz4-java;1.6.0 from central in [default]
        org.slf4j#slf4j-api;1.7.16 from central in [default]
        org.spark-project.spark#unused;1.0.0 from central in [default]
        org.xerial.snappy#snappy-java;1.1.7.3 from central in [default]

But I have always this error:

d> f = spark \ ... .readStream \ ... .format("kafka") \ ...

.option("kafka.bootstrap.servers", "localhost:9092") \ ...
.option("subscribe", "data_wm") \ ... .load() Traceback (most recent call last): File "", line 5, in File "/usr/local/spark/python/pyspark/sql/streaming.py", line 406, in load return self._df(self._jreader.load()) File "/usr/local/spark/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in call File "/usr/local/spark/python/pyspark/sql/utils.py", line 102, in deco raise converted pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

I don't know the cause of this error, please help

7
  • Unrelated - Why are you using sudo to run pyspark? Commented Feb 14, 2020 at 23:42
  • And did you actually install Spark 3 preview? Commented Feb 14, 2020 at 23:43
  • even I don't use sudo I have the same problem and yes I have spark 3.0.0-preview2 installed on my laptop Commented Feb 15, 2020 at 12:46
  • Do you have the same problem not using preview version? Commented Feb 15, 2020 at 17:21
  • No even by using preview version I have the same problem Commented Feb 15, 2020 at 17:46

3 Answers 3

14

I have successfully resolved this error on Spark 3.0.1 (using PySpark).

I would keep things simple and provide the desired packages through the --packages argument:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 MyPythonScript.py

Mind the order of arguments otherwise it will throw an error.

Where MyPythonScript.py has:

KAFKA_TOPIC = "data_wm"
KAFKA_SERVER = "localhost:9092"

# creating an instance of SparkSession
spark_session = SparkSession \
    .builder \
    .appName("Python Spark create RDD") \
    .getOrCreate()

# Subscribe to 1 topic
df = spark_session \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_SERVER) \
    .option("subscribe", KAFKA_TOPIC) \
    .load()
print(df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"))
Sign up to request clarification or add additional context in comments.

1 Comment

You're a life-saver! It was the order of arguments causing my error! I was doing spark-submit my-script.py --packages... but it should have been spark-submit --packages... my-script.py
1

If you check the documentation mentioned in the error, it indicates to download a different package - spark-sql-kafka, not spark-streaming-kafka. You can see in your resolving dependencies log section, you do not have that.

You can also add packages via findspark rather than at the CLI

Comments

0

For Spark 3.3.0, this helped me (bringing relevant jars to project)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.