I have created this Docker Compose file:
# Command: docker stack deploy streaming-stack --compose-file docker/spark-kstreams-stack.yml
# Gary A. Stafford (2022-09-14)
# Updated: 2022-12-28
version: "3.9"
services:
spark-master:
image: "apache/spark:latest"
container_name: spark-master
hostname: spark-master
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- "8080:8080"
volumes:
- ./all_logs:/opt/spark/logs
- ./all_conf/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf
- ./masterivycache:/opt/spark/ivycache:rw
command:
- "bash"
- "-c"
- "/opt/spark/sbin/start-master.sh --host spark-master && tail -f /dev/null"
networks:
- streaming-stack
spark-worker:
image: "apache/spark:latest"
container_name: spark-worker
hostname: spark-worker
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
volumes:
- ./all_logs:/opt/spark/logs
- ./all_conf/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf
- ./workerivycache:/opt/spark/ivycache:rw
command:
- "bash"
- "-c"
- "/opt/spark/sbin/start-worker.sh spark://spark-master:7077 --host spark-worker && tail -f /dev/null"
networks:
- streaming-stack
spark-connect:
image: apache/spark:latest
container_name: spark-connect
hostname: spark-connect
ports:
- "4040:4040"
- "15002:15002"
depends_on:
- spark-master
volumes:
- ./jars/spark-connect_2.13-4.0.0.jar:/opt/spark/jars/spark-connect_2.13-4.0.0.jar
- ./sc-work-dir:/opt/spark/work-dir:rw
- ./ivycache:/opt/spark/ivycache:rw
- ./all_logs:/opt/spark/logs
- ./all_conf/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf
command:
- "bash"
- "-c"
- "/opt/spark/sbin/start-connect-server.sh --jars /opt/spark/jars/spark-connect_2.13-4.0.1.jar && tail -f /dev/null"
networks:
- streaming-stack
kafka:
image: docker.io/bitnami/kafka:latest
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=LISTENER_BOB://kafka:29092,LISTENER_FRED://kafka:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=LISTENER_BOB://kafka:29092,LISTENER_FRED://localhost:9092
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=LISTENER_BOB
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- streaming-stack
kafka-ui:
image: "provectuslabs/kafka-ui"
ports:
- "9080:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=streaming-demo
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
depends_on:
- kafka
networks:
- streaming-stack
# jupyter:
# image: quay.io/jupyter/datascience-notebook:latest
# ports:
# - "8888:8888"
# volumes:
# - "jupyter_data:/home/jovyan/work"
# networks:
# - streaming-stack
volumes:
kafka_data:
jupyter_data:
networks:
streaming-stack:
which I execute on Rancher on Windows 10 x64. I also have a spark-defaults.conf:
spark.jars.packages org.apache.spark:spark-streaming-kafka-0-10_2.13:4.0.0
spark.jars.ivy /opt/spark/ivycache
that I mount on the container. I tested the setup with Jupyter Notebook
# Import SparkSession
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder.remote("sc://127.0.0.1:15002").getOrCreate()
df = spark.range(100)
df.write.mode("overwrite").parquet("example.parquet")
df.show()
print(spark.conf.get("spark.jars.packages"))
spark.stop()
Everything works ok, until I try to connect to Kafka with this (the business logic is not critical here)
import os
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StructField,
StructType,
IntegerType,
StringType,
FloatType,
TimestampType,
BooleanType,
)
from pyspark.sql.window import Window
spark = SparkSession.builder.remote("sc://127.0.0.1:15002").getOrCreate()
BOOTSTRAP_SERVERS = "kafka:29092"
TOPIC_PURCHASES = "demo.purchases"
options = {
"kafka.bootstrap.servers": BOOTSTRAP_SERVERS,
"subscribe": TOPIC_PURCHASES,
"startingOffsets": "earliest",
"endingOffsets": "latest",
"includeHeaders": "true"
}
df_sales = spark.read.format("kafka").options(**options).load()
schema = StructType(
[
StructField("transaction_time", TimestampType(), False),
StructField("transaction_id", StringType(), False),
StructField("product_id", StringType(), False),
StructField("price", FloatType(), False),
StructField("quantity", IntegerType(), False),
StructField("is_member", BooleanType(), True),
StructField("member_discount", FloatType(), True),
StructField("add_supplements", BooleanType(), True),
StructField("supplement_price", FloatType(), True),
# StructField("total_purchase", FloatType(), False),
]
)
window = Window.partitionBy("product_id").orderBy("quantity")
window_agg = Window.partitionBy("product_id")
(
df_sales.selectExpr("CAST(value AS STRING)")
.select(F.from_json("value", schema=schema).alias("data"))
.select("data.*")
.withColumn("row", F.row_number().over(window))
.withColumn("quantity", F.sum(F.col("quantity")).over(window_agg))
.withColumn("sales", F.sum(F.col("quantity") * F.col("price")).over(window_agg))
.filter(F.col("row") == 1)
.drop("row")
.select(
"product_id",
F.format_number("sales", 2).alias("sales"),
F.format_number("quantity", 0).alias("quantity"),
)
.coalesce(1)
.orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
.show(10)
)
then I hit the dreaded AnalysisException: Failed to find data source: kafka.
Which I reduced through logs to a problem at
org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindKafkaDataSourceError(QueryCompilationErrors.scala:1741)
I spent the whole day trying to find an answer. Nothing.