1

I am trying to do fast enrichment in Spark with conditional queries.

I have two data sets of key/val: "Event Data" and "Session Map". The "session map" is used to find out who was using a given IP between two timestamps. The "Event data" is a large collection of events, with an IP and a timestamp, which need to be correlated against the "session map" to enrich with username.

Is there an efficient way to enrich the Event Data against Session Map in Spark, or something else?

Session map:

(IP, start_time, end_time) -> Name
(192.168.0.l, 2016-01-01 10:00:00, 2016-01-01 22:00:00) -> John
(192.168.0.l, 2016-01-01 22:00:01, 2016-01-02 04:35:00) -> Dana
(10.0.0.12,   2016-01-02 06:00:13, 2016-01-02 09:23:24) -> John
...

Event data:

IP -> timestamp
192.168.0.l, 2016-01-01 10:00:00
192.168.0.l, 2016-01-01 10:00:01
192.168.0.l, 2016-01-01 10:00:02
192.168.0.l, 2016-01-01 10:05:23
...
192.168.0.l, 2016-01-01 22:00:01 
192.168.0.l, 2016-01-01 22:12:35 
192.168.0.l, 2016-01-01 04:12:00
...
6
  • 2
    Do either of the sets fit in memory? Commented May 25, 2016 at 10:37
  • Unfortunately no it does not fit the memory Commented May 25, 2016 at 10:57
  • Did you try a regular join? What API are you using for your data? Regular RDDs or DataFrames? Commented May 25, 2016 at 10:58
  • Regular join in Spark requires exact match I believe. Am I wrong? The data is regular RDDs. Commented May 25, 2016 at 11:04
  • Ah I didn't read your question correctly. I have an idea, I will post it as an answer. Quick question, what is the distribution of sessions/events per IP? Commented May 25, 2016 at 11:06

2 Answers 2

2

What you could do is a full join on both sets on the IP. This will generate a very big table, which you can then filter to keep only the combinations where the event falls within the range of the session. So:

IP_RDD = (IP,(start_time, end_time, name))
Session_RDD = (IP, timestamp)
joined_RDD = IP_RDD.join(Session_RDD)
joined_RDD = joined_RDD.filter(end_time<=timestamp<=end_time)

This pseudocode should do it, you would need to write a function to check for the timestamp and get it back into a proper format. I don't know if this is fast enough, but unless the session start_time and end_time are on set times (like every 2 hours a new session ie) I don't see a better way.

Sign up to request clarification or add additional context in comments.

2 Comments

If one of the IPs has many, many entries – would that crash in the join?
I am uncertain, I think due to the filter it will not keep things there that do not match the filter will be kept and it should be fine.
0

I think a even simpler way is to first zipWithIndex (to reduce the cost of join operation):

 val SessionUnion = Session.zipWithIndex.map(x=>(x._1.IP,x._1.start_time,x._2)) //This should give you a RDD of IP,Date,Index
 val UnionEvent = Event.map(x=>(x.IP,x.timeStamp,0.toLong)).union(SessionUnion)

This basically make a flat table with all your Sessions and Events - but the catch is that the Sessions have Indexes while the Events only have 0s.

Now take advantage of the sorting in RDD

val sortAns = UnionEvent.map(x=>((x.IP,x.timeStamp),x._3)).sortBy(_._1)

This sort should align the IP and Date ranges for you, now all you need to do is adjust the Index:

 IP1,DateFromSession1,Index1
 IP1,DateFromEvent1,0
 IP1,DateFromEvent2,0
 IP1,DateFromSession2,Index2
 ...

So you can either do a Foldleft such that all indexes that are 0 are replaced by previous Index, so you end up with IP1,DateFromEvent,IndexFromSession. Or I guess there are some other interesting ways to do this.

With the Index attached to Event, you can now do a join using the Index which should be really fast.

2 Comments

How can you do foldLeft in spark / Hadoop?
spark also uses Scala library and left fold is in the Scala library, you should be able to use it, but it is not recommended as it means collecting the entire RDD to master - however, I am simply offering an alternative approach.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.