0

Is there any way to collect all RDD[(String, String)] into one RDD[Map[String, String]]?

E.g., for file input.csv:

1,one
2,two
3,three

Code:

val file = sc.textFile("input.csv")
val pairs = file.map(line => { val a = line.split(","); (a(0), a(1)) })
val rddMap = ???

Output (approximate):

val map = rddMap.collect
map: Array[scala.collection.immutable.Map[String,String]] = Array(Map(1 -> one, 2 -> two, 3 -> three))

Tried pairs.collectAsMap but it returns Map not inside RDD.

6
  • 4
    But why do you want a map inside of RDD? RDD is a collection and as far as I can tell from your code you just want one map, so there is no point of wrapping it with RDD with just one element. Commented Jul 13, 2015 at 6:26
  • I want to cache this map between several jobs. All solutions I found worked with RDD, not with plain objects. Commented Jul 13, 2015 at 6:28
  • Still ask yourself if you really want to share RDD[Map[String, String]] this way you can't take credit of the parallelism. If the map is small and you really need a map, maybe take a look at broadcast variables and accumulators (spark.apache.org/docs/latest/…). Commented Jul 13, 2015 at 6:33
  • Input.csv is about 1.5GB, map is constructed much complicated than in example provided. Thats why I want precomputed map in memory, not just cached input file. Commented Jul 13, 2015 at 6:43
  • If you do so, you're map will be distributed on the cluster but it won't be one map! So whatever you are trying to do is not a good approach! You can use a key-value pair and use lookup method to find your value upon a given key ! Commented Jul 13, 2015 at 8:13

3 Answers 3

1

I don't actually agree with what you are trying to do. Because if you do so, you're map will be distributed on the cluster but it won't be one map!

You can use a key-value pair RDD and use lookup method to find your value upon a given key !

def lookup(key: K): Seq[V]  // Return the list of values in the RDD for key key.

And here is an example about it's usage:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))  
b.lookup(5) 
res0: Seq[String] = WrappedArray(tiger, eagle)

For more information about pair RDDs, I suggest that you read the Chapter 4. Working with Key/Value Pairs - Learning Spark.

Sign up to request clarification or add additional context in comments.

Comments

0

If you want preserve your map only for the time of execution of your diriver program you can collect it to the local map (in the driver) for the next task it will be available in the closer (you can just use it in function passed to the next task). If you dont want trasport it many times you can broadcast it.

On the other hand if you want to use it in different driver programs you can just serialize it and save on hdfs (or any other storage system you use). In this case even if you would have RDD you could not preserve it between drivers without saving it to the file system.

5 Comments

There are external caching systems like Tachyon, not mandatory to save on a file system. In any case, I need to use RDD.persist().
This persists in distributed fasion and there is no sense to do this with local object. Is persisting map between flows and reading it in to the driver too costly for you?
Easy hack would be sc.parallelize(List(localMap)).persist(). I don't think this is good idea tho.
This map should be accessible from executors. Constructing this map takes 50% of job time, and the map is not changed between job executions.
Still i would split this on 2 jobs one creating map and persisting it on file system and the second loading it and using it. In this way first job can be runned once and second can be runned repeatedly.
0

How many maps would you get in a RDD[Map[String, String]] ? Just one, right? The RDD distributes its content, because it's a distributed collection, but if it contains only one element, it becomes quite harder to distribute that collection, doesn't it ?

I would suggest you need hash-based lookup in your PairRDD of Strings. Thankfully, you already have this in the API, with the lookup function.

Look at the code for lookup, it does use hashing to get to your key, in a similar way a Map would. Building the keys and values correctly in your PairRDD are enough for your purposes, even if building them is complex.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.