0

I have the problem with Append output mode not supported when there are streaming aggregations. As mentioned here I need to add the modifiedon in my groupby statement in the below code like this

agg = df.groupBy("id","modifiedon").agg(max("modifiedon").alias("modifiedon"))

from pyspark.sql.functions import max
df = df.dropDuplicates()
df = df.withWatermark("modifiedon", "1 day")
agg = df.groupBy("id").agg(max("modifiedon").alias("modifiedon"))
final =df.join(agg, on=["id", "modifiedon"], how="inner")
dfUpdates = final.withColumnRenamed("id","BK_id")

But this creates the problems as the final still contains duplicated Ids in it. Since I dont add that column in groupby,later I have the problem to do a merge into delta table.

final.writeStream.format("delta").foreachBatch(update_insert).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start("abfss://[email protected]/D365/msdyn_workorder_autoloader_nodups")
0

1 Answer 1

0

Here, you need to give timestamp type column in group by either with window function or timestamp column, but in your case you can not give modifiedon column even though it is of type timestamp because you are requirement is to do aggregate on modifiedon column itself.

So as I mentioned earlier use window with a greater number of days which you are sure that comes under range of your data.

Here, I used 20000 days.

agg = df.groupBy(window("modifiedon","20000 day"),"id").agg(max("modifiedon").alias("modifiedon"))
final =df.join(agg, on=["id", "modifiedon"], how="inner")

and output.

final.writeStream.format("delta").option("checkpointLocation", "/csv_chk_pnt/").start("/out_csv/final/")

enter image description here

Give large number of days for very long old records, may be even 50000 days also.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.