I am working in Azure synapse and getting myself used to working with pyspark. I want to create a paring logic between lines in my df but I can’t get it to work. I have an ID column and a sequence number. For example:
| ID | seqNum |
|---|---|
| 100 | 3609 |
| 100 | 3610 |
| 100 | 3616 |
| 100 | 3617 |
| 100 | 3622 |
| 100 | 3623 |
| 100 | 3634 |
| 100 | 3642 |
| 100 | 3643 |
This is what the code should output:
| ID | seqNum | pairID |
|---|---|---|
| 100 | 3609 | 1 |
| 100 | 3610 | 1 |
| 100 | 3616 | 2 |
| 100 | 3617 | 2 |
| 100 | 3622 | 3 |
| 100 | 3623 | 3 |
| 100 | 3634 | Null |
| 100 | 3642 | 4 |
| 100 | 3643 | 4 |
Line with 3634 should not be paired because the difference between sequence numbers should be one.
I have logic in python that seems to work but then I cannot make use of the processing abilitys from spark. Can someone help me create the logic in pyspark?
# window specification
windowSpec = Window.orderBy("seqNum")
# Add prev and next sequence numbers
df = df.withColumn("prev_seq", lag("seqNum").over(windowSpec))
df = df.withColumn("next_seq", lead("seqNum").over(windowSpec))
# Add flags to indicate proximity
df = df.withColumn("diff_prev", col("ID") - col("prev_seq"))
df = df.withColumn("diff_next", col("next_seq") - col("seqNum"))
#make PairID
df = df.withColumn("PairID", lit(None).cast("int"))
# Assign PairID based on proximity logic
pair_id = 1
rows = df.collect() # Collect rows for iterative processing
paired_indices = set() # Track already paired rows
result = []
for i, row in enumerate(rows):
if i in paired_indices:
continue # Skip already paired rows
current = row["seqNum"]
prev_diff = row["diff_prev"]
next_diff = row["diff_next"]
# Pair with the row above if diff_prev == 1 and it is not already paired
if prev_diff == 1 and (i - 1) not in paired_indices:
result.append((current, pair_id, rows[i - 1]["seqNum"]))
result.append((rows[i - 1]["seqNum"], pair_id, current))
paired_indices.update([i, i - 1])
pair_id += 1
# Pair with the row below if diff_next == 1 and it is not already paired
elif next_diff == 1 and (i + 1) not in paired_indices:
result.append((current, pair_id, rows[i + 1]["seqNum"]))
result.append((rows[i + 1]["seqNum"], pair_id, current))
paired_indices.update([i, i + 1])
pair_id += 1
else:
result.append((current, None, None))
# to DataFrame
result_df = spark.createDataFrame(result, ["seqNum", "PairID", "Closest"])
df = df.withColumn("diff_prev", col("ID") - col("prev_seq"))correct? Or is it col("seqNum") instead of col("ID")? Is the column ID of any relevance at all?