MapReduce
See http://en.wikipedia.org/wiki/Mapreduce
The MapReduce pattern aims to handle large-scale computations across a cluster of servers, often involving massive amounts of data.
"The computation takes a set of input key/value pairs, and produces a set of output key/value pairs. The developer expresses the computation as two Func delegates: Map and Reduce.
Map - takes a single input pair and produces a set of intermediate key/value pairs. The MapReduce function groups results by key and passes them to the Reduce function.
Reduce - accepts an intermediate key I and a set of values for that key. It merges together these values to form a possibly smaller set of values. Typically just zero or one output value is produced per Reduce invocation. The intermediate values are supplied to the user's Reduce function via an iterator."
the canonical MapReduce example: counting word frequency in a text file.
MapReduce using DryadLINQ
see http://research.microsoft.com/en-us/projects/dryadlinq/ and http://connect.microsoft.com/Dryad
DryadLINQ provides a simple and straightforward way to implement MapReduce operations. This The implementation has two primary components: A Pair structure, which serves as a data container. A MapReduce method, which counts word frequency and returns the top five words.
The Pair Structure - Pair has two properties: Word is a string that holds a word or key. Count is an int that holds the word count. The structure also overrides ToString to simplify printing the results. The following example shows the Pair implementation.
public struct Pair
{
private string word;
private int count;
public Pair(string w, int c)
{
word = w;
count = c;
}
public int Count { get { return count; } }
public string Word { get { return word; } }
public override string ToString()
{
return word + ":" + count.ToString();
}
}
The MapReduce function that gets the results. the input data could be partitioned and distributed across the cluster.
1. Creates a DryadTable<LineRecord> object, inputTable, to represent the lines of input text. For partitioned data, use GetPartitionedTable<T> instead of GetTable<T> and pass the method a metadata file.
2. Applies the SelectMany operator to inputTable to transform the collection of lines into collection of words. The String.Split method converts the line into a collection of words. SelectMany concatenates the collections created by Split into a single IQueryable<string> collection named words, which represents all the words in the file.
3. Performs the Map part of the operation by applying GroupBy to the words object. The GroupBy operation groups elements with the same key, which is defined by the selector delegate. This creates a higher order collection, whose elements are groups. In this case, the delegate is an identity function, so the key is the word itself and the operation creates a groups collection that consists of groups of identical words.
4. Performs the Reduce part of the operation by applying Select to groups. This operation reduces the groups of words from Step 3 to an IQueryable<Pair> collection named counts that represents the unique words in the file and how many instances there are of each word. Each key value in groups represents a unique word, so Select creates one Pair object for each unique word. IGrouping.Count returns the number of items in the group, so each Pair object's Count member is set to the number of instances of the word.
5. Applies OrderByDescending to counts. This operation sorts the input collection in descending order of frequency and creates an ordered collection named ordered.
6. Applies Take to ordered to create an IQueryable<Pair> collection named top, which contains the 100 most common words in the input file, and their frequency.
Test then uses the Pair object's ToString implementation to print the top one hundred words, and their frequency.
public static IQueryable<Pair> MapReduce(
string directory,
string fileName,
int k)
{
DryadDataContext ddc = new DryadDataContext("file://" + directory);
DryadTable<LineRecord> inputTable = ddc.GetTable<LineRecord>(fileName);
IQueryable<string> words = inputTable.SelectMany(x => x.line.Split(' '));
IQueryable<IGrouping<string, string>> groups = words.GroupBy(x => x);
IQueryable<Pair> counts = groups.Select(x => new Pair(x.Key, x.Count()));
IQueryable<Pair> ordered = counts.OrderByDescending(x => x.Count);
IQueryable<Pair> top = ordered.Take(k);
return top;
}
To Test:
IQueryable<Pair> results = MapReduce(@"c:\DryadData\input",
"TestFile.txt",
100);
foreach (Pair words in results)
Debug.Print(words.ToString());
Note: DryadLINQ applications can use a more compact way to represent the query:
return inputTable
.SelectMany(x => x.line.Split(' '))
.GroupBy(x => x)
.Select(x => new Pair(x.Key, x.Count()))
.OrderByDescending(x => x.Count)
.Take(k);
MapReduce using PLINQ
The pattern is relevant even for a single multi-core machine, however. We can write our own PLINQ MapReduce in a few lines.
the Map function takes a single input value and returns a set of mapped values àLINQ's SelectMany operator.
These are then grouped according to an intermediate key à LINQ GroupBy operator.
The Reduce function takes each intermediate key and a set of values for that key, and produces any number of outputs per key à LINQ SelectMany again.
We can put all of this together to implement MapReduce in PLINQ that returns a ParallelQuery<T>
public static ParallelQuery<TResult> MapReduce<TSource, TMapped, TKey, TResult>(
this ParallelQuery<TSource> source,
Func<TSource, IEnumerable<TMapped>> map,
Func<TMapped, TKey> keySelector,
Func<IGrouping<TKey, TMapped>, IEnumerable<TResult>> reduce)
{
return source
.SelectMany(map)
.GroupBy(keySelector)
.SelectMany(reduce);
}
the map function takes in an input document and outputs all of the words in that document. The grouping phase groups all of the identical words together, such that the reduce phase can then count the words in each group and output a word/count pair for each grouping:
var files = Directory.EnumerateFiles(dirPath, "*.txt").AsParallel();
var counts = files.MapReduce(
path => File.ReadLines(path).SelectMany(line => line.Split(delimiters)),
word => word,
group => new[] { new KeyValuePair<string, int>(group.Key, group.Count()) });