1

I tried to consume from Kafka using Spark, more specifically PySpark and Structured Streaming.

import os
import time
import time

from ast import literal_eval
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct, explode
from pyspark.sql import SparkSession
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell'

spark = SparkSession \
    .builder \
    .appName("Structured Streaming") \
    .getOrCreate()

    requests = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "ip-ec2:9092") \
  .option("subscribe", "ssp.requests") \
  .option("startingOffsets", "earliest") \
  .load()

requests.printSchema()

# root  |-- key: binary (nullable = true)  |-- value: binary (nullable =
# true)  |-- topic: string (nullable = true)  |-- partition: integer
# (nullable = true)  |-- offset: long (nullable = true)  |-- timestamp:
# timestamp (nullable = true)  |-- timestampType: integer (nullable =
# true)

When I ran the next lines of code

rawQuery = requests \
        .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
        .writeStream.trigger(processingTime="5 seconds") \
        .format("parquet") \
        .option("checkpointLocation", "/home/user/folder/applicationHistory") \
        .option("path", "/home/user/folder") \
        .start()
rawQuery.awaitTermination()    

Py4JJavaError Traceback (most recent call last) /opt/conda/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw) 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e:

/opt/conda/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 319 "An error occurred while calling {0}{1}{2}.\n". --> 320 format(target_id, ".", name), value) 321 else:

Py4JJavaError: An error occurred while calling o70.awaitTermination. : org.apache.spark.sql.streaming.StreamingQueryException: Job aborted. === Streaming Query === Identifier: [id = c2b48840-5ba4-416e-a192-dcae94007856, runId = 4afcca20-00cd-4187-a70b-1b742f1f5c0d] Current Committed Offsets: {} Current Available Offsets: {KafkaSource[Subscribe[ssp.requests]]:

I can't understand the reason of this error

Py4JJavaError: An error occurred while calling o70.awaitTermination

1
  • Could you post full stacktrace (it is Py4J exception, so JVM part matters). Commented Apr 26, 2018 at 12:29

1 Answer 1

1

I just replaced the line rawQuery.awaitTermination() by

print(rawQuery.status)
time.sleep(60)
print(rawQuery.status)
rawQuery.stop()

and it works.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.