Many times in a distributed systems, need of shuffle arises. Map/Reduce implementations like Hadoop,Spark heavily depend upon effective shuffling to do the distributed processing. So whenever we build a new distributed system from scratch, it will be nice to have the ability to do shuffle.
This post talks how to implement shuffle on mesos.
This post extends the custom scala executor discussed here post. If you are new to mesos please go through that post before continuing.
tl;dr Access the complete code on github.
What is shuffle?
Shuffle is an operation where result produced at one machine is moved to another machine over the network, in order to combine the results. The operations like reduceByKey, join in Hadoop or Spark uses shuffling.
Implementing Shuffle in Mesos
The following steps are one of the ways to implement shuffle in mesos.This shuffle supports implementing operations like reduce,reduceByKey,groupByKey etc. The following implementation is inspired by the spark implementation.
The trait has three types
K - key type
V - value type
C - combined value type (Final result type)
compute is an abstract method which takes the following functions
createCombiner - As name suggests, its a function creates combiner for each partition. It will start with initial value provided by v parameter.
mergeValue - for every value in partition, mergeValue will be used to update the combine
mergeCombiners - its a reduce side job of merging two different combiners
numberOfOutputSplit allows us to control number of reduce tasks and final parameter scheduler allow us as to access the context.
Local file based shuffle implementation
The following code explains the implementation of shuffle using local file system and http server.
Step 1 : Implement Shuffle trait
The rows variable simulating a in memory collection with multiple rows. I am using an in memory collection to keep things simple. You can replace it with RDD or a hadoop file data.
Step 2 : Number each split
we number each split with index so that we can use it to identify it when we shuffle.
Step 3: Create local combiner tasks
For each split, we create a forEachTask
Implementation of forEach task.
forEachTask returns a function task which does following.
We create a temporary directory for each split, where we are going to write the output files for that split.
We create buckets(partitions) using hashParitioner.The for each value, we determine which bucket it belongs to using hashCode method. We handle negative hash code case too.
Once we have the bucket, then we use createCombiner or mergeValue to run combining for that pair.
Step 4 : Writing results to local disk
Once we have results for a given split, we are going to write them to the disk.
Step 5 : Serve the local files using a HTTP server
Once we write the files to disk, we start a http server which serves these output files to reduce tasks.
Step 6: Run forEach tasks
Step 7 : Map the splits to uri
After map side combiners are completed, we are going to create a hashmap which going to contain uri’s of http servers and the different splits they contain.
Step 8 : Generate reduce task for output split
We are going to create reduce tasks as specified by numberOfOutputSplit.
Step 9: Implementation of reduce task
The following is the implementation of reduce.
reduce task just downloads the uri’s specific to the bucket and runs mergeCombiners to get the final reduce value for that bucket.
Step 10 : Run the reduce tasks and return result
runs the above created reduce tasks and returns the result as a list.
Using the shuffle to implement hadoop word count
The following code uses our shuffle to implement word count. Here we do the mapping inline , only implement combiner and reducer functionality.
Building and Running
Download source code from github.
Refer to this post for building and running instructions.