I have the problem with Append output mode not supported when there are streaming aggregations. As mentioned here I need to add the modifiedon in my groupby statement in the below code like this
agg = df.groupBy("id","modifiedon").agg(max("modifiedon").alias("modifiedon"))
from pyspark.sql.functions import max
df = df.dropDuplicates()
df = df.withWatermark("modifiedon", "1 day")
agg = df.groupBy("id").agg(max("modifiedon").alias("modifiedon"))
final =df.join(agg, on=["id", "modifiedon"], how="inner")
dfUpdates = final.withColumnRenamed("id","BK_id")
But this creates the problems as the final still contains duplicated Ids in it. Since I dont add that column in groupby,later I have the problem to do a merge into delta table.
final.writeStream.format("delta").foreachBatch(update_insert).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start("abfss://[email protected]/D365/msdyn_workorder_autoloader_nodups")
