1

I am working in Azure synapse and getting myself used to working with pyspark. I want to create a paring logic between lines in my df but I can’t get it to work. I have an ID column and a sequence number. For example:

ID seqNum
100 3609
100 3610
100 3616
100 3617
100 3622
100 3623
100 3634
100 3642
100 3643

This is what the code should output:

ID seqNum pairID
100 3609 1
100 3610 1
100 3616 2
100 3617 2
100 3622 3
100 3623 3
100 3634 Null
100 3642 4
100 3643 4

Line with 3634 should not be paired because the difference between sequence numbers should be one.

I have logic in python that seems to work but then I cannot make use of the processing abilitys from spark. Can someone help me create the logic in pyspark?


# window specification
windowSpec = Window.orderBy("seqNum")

# Add prev and next sequence numbers
df = df.withColumn("prev_seq", lag("seqNum").over(windowSpec))
df = df.withColumn("next_seq", lead("seqNum").over(windowSpec))

# Add flags to indicate proximity
df = df.withColumn("diff_prev", col("ID") - col("prev_seq"))
df = df.withColumn("diff_next", col("next_seq") - col("seqNum"))

#make PairID
df = df.withColumn("PairID", lit(None).cast("int"))

# Assign PairID based on proximity logic
pair_id = 1
rows = df.collect()  # Collect rows for iterative processing
paired_indices = set()  # Track already paired rows
result = []

for i, row in enumerate(rows):
    if i in paired_indices:
        continue  # Skip already paired rows

    current = row["seqNum"]
    prev_diff = row["diff_prev"]
    next_diff = row["diff_next"]

    # Pair with the row above if diff_prev == 1 and it is not already paired
    if prev_diff == 1 and (i - 1) not in paired_indices:
        result.append((current, pair_id, rows[i - 1]["seqNum"]))
        result.append((rows[i - 1]["seqNum"], pair_id, current))
        paired_indices.update([i, i - 1])
        pair_id += 1

    # Pair with the row below if diff_next == 1 and it is not already paired
    elif next_diff == 1 and (i + 1) not in paired_indices:
        result.append((current, pair_id, rows[i + 1]["seqNum"]))
        result.append((rows[i + 1]["seqNum"], pair_id, current))
        paired_indices.update([i, i + 1])
        pair_id += 1

   
    else:
        result.append((current, None, None))

# to DataFrame
result_df = spark.createDataFrame(result, ["seqNum", "PairID", "Closest"])
4
  • last 2 row doesn't pair i guess 3642 and 3443 or typo mistake Commented Jan 10 at 10:11
  • Yes you are correct it is a typo it should be 3643! I adjusted it thanks ! Commented Jan 10 at 11:45
  • Is the line df = df.withColumn("diff_prev", col("ID") - col("prev_seq")) correct? Or is it col("seqNum") instead of col("ID")? Is the column ID of any relevance at all? Commented Jan 11 at 14:31
  • do you want pair of 2 numbers or more? Commented Jan 13 at 6:27

1 Answer 1

0

If you want pair id for only 2 numbers you can use below code.

But here you won't get the correct order of pair ids but they will be paired with a number.

df.withColumn("prev_seq", lag("seqNum").over(windowSpec))\
    .withColumn("next_seq", lead("seqNum").over(windowSpec))\
    .withColumn("pair_with_prev", when((col("seqNum") - col("prev_seq")) == 1, lit(1)).otherwise(lit(0)))\
    .withColumn("pair_with_next", when((col("next_seq") - col("seqNum")) == 1, lit(1)).otherwise(lit(0)))\
    .withColumn("cond1",when((col("pair_with_prev") == 0) & (col("pair_with_next") == 1), randn()).otherwise(lit(0)))\
    .withColumn("cond2",when((col("pair_with_prev") == 1) & (col("pair_with_next") == 0), lag("cond1").over(windowSpec)).otherwise(lit(0)))\
    .withColumn("all_cond",col("cond1")+col("cond2"))\
    .withColumn("res",dense_rank().over(Window.partitionBy("ID").orderBy("all_cond")))\
    .withColumn("res",when(col("all_cond") == 0, lit(None)).otherwise(col("res")))\
    .select("ID","seqNum","res")\
    .display()

Here i checking condition for both pair with previous and next.

if it has no previous pair and has next pair then there is new pair found, i generated random value.

if there is previous pair and not next then that is end of the pair, i assigned previous pair random value.

Next, i generated dense rank on both of the above condition and made id null for no pairs.

output:

ID seqNum res
100 3634 null
100 3616 2
100 3617 2
100 3609 3
100 3610 3
100 3622 4
100 3623 4
100 3642 5
100 3643 5
Sign up to request clarification or add additional context in comments.

3 Comments

This works thank you so much! Later i also try to pair items that instead of 1 seqnum apart it needs to be less that 5 apart. Because of some sequence numbers are missing. I changed that in you code but then the functionality stoped working. Do you know how I should adjust it instead? like these numbers: 8 11 13 14 16 18 20 22 24 28
just check the difference between them as you need. Do you want to pair more than 2 numnbers?
No I just want to pair 2!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.