0

I have created this Docker Compose file:

# Command: docker stack deploy streaming-stack --compose-file docker/spark-kstreams-stack.yml
# Gary A. Stafford (2022-09-14)
# Updated: 2022-12-28

version: "3.9"

services:
  spark-master:
    image: "apache/spark:latest"
    container_name: spark-master
    hostname: spark-master
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    ports:
      - "8080:8080"
    volumes:
      - ./all_logs:/opt/spark/logs
      - ./all_conf/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf  
      - ./masterivycache:/opt/spark/ivycache:rw
    command: 
      - "bash"
      - "-c"
      - "/opt/spark/sbin/start-master.sh --host spark-master && tail -f /dev/null"
    networks:
      - streaming-stack
  spark-worker:
    image: "apache/spark:latest"
    container_name: spark-worker
    hostname: spark-worker
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    volumes:    
      - ./all_logs:/opt/spark/logs
      - ./all_conf/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf  
      - ./workerivycache:/opt/spark/ivycache:rw      
    command: 
      - "bash"
      - "-c"
      - "/opt/spark/sbin/start-worker.sh spark://spark-master:7077 --host spark-worker  && tail -f /dev/null"      
    networks:
      - streaming-stack
  spark-connect:
    image: apache/spark:latest
    container_name: spark-connect
    hostname: spark-connect
    ports:
      - "4040:4040"
      - "15002:15002"
    depends_on:
      - spark-master
    volumes:
      - ./jars/spark-connect_2.13-4.0.0.jar:/opt/spark/jars/spark-connect_2.13-4.0.0.jar
      - ./sc-work-dir:/opt/spark/work-dir:rw
      - ./ivycache:/opt/spark/ivycache:rw
      - ./all_logs:/opt/spark/logs
      - ./all_conf/spark-defaults.conf:/opt/spark/conf/spark-defaults.conf      
    command: 
      - "bash"
      - "-c"
      - "/opt/spark/sbin/start-connect-server.sh --jars /opt/spark/jars/spark-connect_2.13-4.0.1.jar && tail -f /dev/null"
    networks:
      - streaming-stack
  kafka:
    image: docker.io/bitnami/kafka:latest
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      # KRaft settings
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      # Listeners
      - KAFKA_CFG_LISTENERS=LISTENER_BOB://kafka:29092,LISTENER_FRED://kafka:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT,CONTROLLER:PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=LISTENER_BOB://kafka:29092,LISTENER_FRED://localhost:9092
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=LISTENER_BOB
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - streaming-stack
  kafka-ui:
    image: "provectuslabs/kafka-ui"
    ports:
      - "9080:8080"
    environment:
      - KAFKA_CLUSTERS_0_NAME=streaming-demo
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:29092
    depends_on:
      - kafka
    networks:
      - streaming-stack
  # jupyter:
  #   image: quay.io/jupyter/datascience-notebook:latest
  #   ports:
  #     - "8888:8888"
  #   volumes:
  #     - "jupyter_data:/home/jovyan/work"
  #   networks:
  #     - streaming-stack

volumes:
  kafka_data:
  jupyter_data:
  
networks:
  streaming-stack:

which I execute on Rancher on Windows 10 x64. I also have a spark-defaults.conf:

spark.jars.packages org.apache.spark:spark-streaming-kafka-0-10_2.13:4.0.0
spark.jars.ivy  /opt/spark/ivycache

that I mount on the container. I tested the setup with Jupyter Notebook

# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder.remote("sc://127.0.0.1:15002").getOrCreate() 

df = spark.range(100)

df.write.mode("overwrite").parquet("example.parquet")

df.show()

print(spark.conf.get("spark.jars.packages"))

spark.stop()

Everything works ok, until I try to connect to Kafka with this (the business logic is not critical here)

import os

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField,
    StructType,
    IntegerType,
    StringType,
    FloatType,
    TimestampType,
    BooleanType,
)
from pyspark.sql.window import Window

spark = SparkSession.builder.remote("sc://127.0.0.1:15002").getOrCreate() 

BOOTSTRAP_SERVERS = "kafka:29092"
TOPIC_PURCHASES = "demo.purchases"

options = {
    "kafka.bootstrap.servers": BOOTSTRAP_SERVERS,
    "subscribe": TOPIC_PURCHASES,
    "startingOffsets": "earliest",
    "endingOffsets": "latest",
    "includeHeaders": "true"
}

df_sales = spark.read.format("kafka").options(**options).load()

schema = StructType(
    [
        StructField("transaction_time", TimestampType(), False),
        StructField("transaction_id", StringType(), False),
        StructField("product_id", StringType(), False),
        StructField("price", FloatType(), False),
        StructField("quantity", IntegerType(), False),
        StructField("is_member", BooleanType(), True),
        StructField("member_discount", FloatType(), True),
        StructField("add_supplements", BooleanType(), True),
        StructField("supplement_price", FloatType(), True),
        # StructField("total_purchase", FloatType(), False),
    ]
)

window = Window.partitionBy("product_id").orderBy("quantity")
window_agg = Window.partitionBy("product_id")

(
    df_sales.selectExpr("CAST(value AS STRING)")
    .select(F.from_json("value", schema=schema).alias("data"))
    .select("data.*")
    .withColumn("row", F.row_number().over(window))
    .withColumn("quantity", F.sum(F.col("quantity")).over(window_agg))
    .withColumn("sales", F.sum(F.col("quantity") * F.col("price")).over(window_agg))
    .filter(F.col("row") == 1)
    .drop("row")
    .select(
        "product_id",
        F.format_number("sales", 2).alias("sales"),
        F.format_number("quantity", 0).alias("quantity"),
    )
    .coalesce(1)
    .orderBy(F.regexp_replace("sales", ",", "").cast("float"), ascending=False)
    .show(10)
)

then I hit the dreaded AnalysisException: Failed to find data source: kafka.

Which I reduced through logs to a problem at

org.apache.spark.sql.errors.QueryCompilationErrors$.failedToFindKafkaDataSourceError(QueryCompilationErrors.scala:1741)

I spent the whole day trying to find an answer. Nothing.

0

2 Answers 2

1

Kafka is a steam not a format.

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sparktest") \
    .option("startingOffsets", "earliest") \
    .load()
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you @Frank, I will test tomorrow, and see if it solves it.
Unfortunately this does not solved the problem. What I understand is the problem has to do with the dependencies. Maybe they are not propagated from Spark Connect.
0

I posted an article in Medium with the details.

In summary you need to mount a

spark-defaults.conf

with the content

spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.1
spark.jars.ivy /opt/spark/ivycache

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.