0

I have events streaming into multiple Kafka topics in the form of key:value jsons (without nested structure) for example:

event_1: {"name": "Alex", "age": 27, "hobby": "pc games"},  
event_2: {"name": "Bob", "age": 33, "hobby: "swimming"},  
event_3: {"name": "Charlie", "age": 12, "hobby: "collecting stamps"}

I am working in Python 3.7, and wish to consume a batch of events from those topics, let's say, every 5 minutes, transform it into a dataframe, do some processing and enrichment with this data and save the result to a csv file.

I'm new to Spark and searched for documentation to help me with this task but did not find any. Is there any updated source of information recommended?
Also, if there is any other recommended Big Data framework that would suit this task, I'd love to hear about it.

1
  • You found nothing on the Spark Streaming documentation about consuming from Kafka? Commented Aug 19, 2021 at 11:30

1 Answer 1

1

Refer: triggers section of Structured Streaming Programming Guide. There are 3 different types of trigger, with default as micro-batch, where micro-batches will be generated as soon as the previous micro-batch has completed processing.

In you case you need Fixed interval micro-batches where you can specify the duration on which the query has to be triggered. Following is the code snippet to do that.

df.writeStream \
    .format("csv") \
    .option("header", True) \
    .option("path", "path/to/destination/dir") \
    .trigger(processingTime='5 minutes') \ # fixed interval trigger
    .start()

Brief code

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType


# Define schema of kafak message

schema = StructType([
    StructField("name", StringType, true),
    StructField("age", IntegerType, true),
    StructField("hobby", StringType, true),
])

# Initialize spark session

spark = SparkSession.builder.appName("example").getOrCreate()

# Read Kafka topic and load data using schema

df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers","x.x.x.x:2181")\
    .option("startingOffsets", "latest")\
    .option("subscribe","testdata")\
    .load()\
    .select(from_json(col("value").cast("string"), schema).alias("data"))\
    .select(f.col("data.*"))\

# Do some transformation
df1 = df...

# Write the resultant dataframe as CSV file

df1.writeStream \
    .format("csv") \
    .option("header", True) \
    .option("path", "path/to/destination/dir") \
    .trigger(processingTime='5 minutes') \
    .start()

You can also repartition the final dataframe before writing as csv file if needed

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.