Incremental Map/Reduce/Combiner?
Because people keep asking me about a Combiner function (which is written about in the Google Map/Reduce paper and also used in Hadoop), here's a follow up to yesterdays post. The reason I'm not talking about about a combiner function is because I'm pretty sure the design I propose is isomorphic to a Map/Reduce/Combiner combo design. That is to say it's the same damn thing, just re-jiggered a little.
To illustrate, here is an example that does it's own combiner operations inside the reduce.
Let's say we have a database full of receipts and we want to calculate the total spent in a month and the average amount of each purchase.
Here is an example receipt:
{
"_id":"2DA92AAA628CB4156134F36927CF4876",
"type":"receipt"
"amount":19.95,
"note":"Enzyte, i sure hope this works",
"year":2008,
"month":1,
"day":12,
....
"}
}
Calculating the total spent in a reduction is easy, it's simple addition which is commutative and associative. But calculating an average is not intrinsically associative, so we'll have to make our function a little more complex to make the Javascript function behave properly.
This map function outputs each receipt's year and month as the key, and the amount as the value:
function (doc) {
if (doc.type == "receipt") {
var key = [doc.year, doc.month];
emit(key, doc.amount);
}
}
And a Reduce function to compute the totals of the receipts:
function (date, values) {
var result = {totalSpent:0, numPurchases:0};
for(var i=0; i<values.length; i++)
{
switch (typeof values[i]) {
case "number":
result.totalSpent += values[i];
result.numPurchases++;
case "object":
// this in an output object from a previous reduction
result.totalSpent += values[i].totalSpent;
result.numPurchases += values[i].numPurchases;
}
}
return result;
}
Once saved into a design document then you can get the values from January of '08:
GET /db/_view/receipts/amount_spent_month?key=[2008,1]
Result:
{"total_rows":12,
"rows":[
{ "key":[2008,1],
"value": { "totalSpent": 1252.1
"numPurchases": 13}}
]}
Now calculating the final average is left to the client.
However, we can avoid extra combiner logic by simply changing the map function to output the values to look like their final form anyway:
function (doc) {
if (doc.type == "receipt") {
var key = [doc.year, doc.month];
emit(key, {totalSpent: doc.amount, numPurchases:1});
}
}
And the Reduce function:
function (date, values) {
var result = {totalSpent:0, numPurchases:0};
for(var i=0; i<values.length; i++)
{
result.totalSpent += values[i].totalSpent;
result.numPurchases += values[i].numPurchases;
}
}
return result;
}
This code is a lot simpler than the first example, but doing it this way has the disadvantage that the map output values, which can be very numerous, are now containing extra useless information for the sake of algorithmic simplicity, possibly slowing things down or using extra resources.
Anyway, I think the addition of a separate Combiner function makes things more complicated for the general case, and an separate combiner design can be done in this design anyway, using wrapper functions. So I don't see the point of an explicit combiner function, though I might be convinced otherwise.
Posted February 8, 2008 1:47 PM
Comments
I may just being a pedant here, it's been known, so tell me to go fornicate with a badger if you like, but from my understanding of things, what you are implementing is MapCombine rather than MapReduce.
Now this may be all anyone needs to do in practice if you giggle the problem around enough. It's certainly a good next step from simple views and having it CouchDB will be great for huge numbers of problems.
So it's not that you need to add combiner functionality, you already have it, it that you aren't implementing a full reduce.
Kerr, February 8, 2008 2:40 PM
Kerr, can you explain a little more what's missing from this version of reduce?
Damien Katz, February 8, 2008 2:58 PM
Just reread your post and now I might have got myself confused.
You want (I assume) the server to not care about the code in map and reduce functions.
You also want the reduce to run on subset of the total mapped output for a given key. Correct?
This is so you can run batches of map and reduce on discreet nodes and then group partially run reduces later. It also means any incremental update doesn't have to recalculate the entire reduce for a given key.
But this requires that your reduce function be written to handle passing in partial data. If you just don't include the combiner logic then you are going to break the run. So you need a way to tell the server that the reduce doesn't include combiner logic, so don't try and feed it results from itself. You could do this be using a different name for the function. Say combine() if it does contain combiner logic, reduce() if it doesn't.
Kerr, February 8, 2008 3:08 PM
Damien, my second post was not a response to your request for more info. I've got to shoot off and won't be back on line for the rest of your day.
Quickly though; I think a "Full Reduce" will take in a complete set of map data for a given key and output whatever it likes, with no restriction. As soon as you say the reduce has to take in it's own output then you are place a restriction on the system and doing what the google paper calls a combiner. It's mostly a definition thang.
I'll be able to pick up mail if you want to chat more.
Kerr, February 8, 2008 3:20 PM
Kerr, in this design, the reductions happen at index-update time, and the reductions are stored directly inside the inner nodes of the view b+tree index. Then at query time, the intermediate results are reduced to their final result. The number of reductions that happen at query time are logarithmic with respect to the number of matching key/values.
So one thing about the Reduce function requirement of being commutative and associative, if the reduce function cannot be written that way, then it is also not possible to write a combiner function for it.
In that case, the reductions are both not incrementally buildable AND not parallelizable, and every reduction is serialized at some point to a single processor on a single machine. This cost of updating the reduced value will be OLog(N) and there will always be the single machine bottleneck.
Damien Katz, February 8, 2008 4:42 PM
Perhaps you could simply reduce functions by making the API more like Ruby's inject? You could write it in Ruby like:
inject({:totalSpent=> 0, :numPurchases => 0}) do |result, (date, values)| values.each do |v| result[:totalSpent] += v result[:numPurchases] += 1 end result endAnd in Javascript:
function (result, date, values){ // take care of intialization that ruby's inject gives us for free if (typeof(result) == 'undefined'){ result = {totalSpent: 0, numPurchases: 0}; } for(var i=0; i<values.length; i++){ result.totalSpent += values[i]; result.numPurchases += 1; } return result; }ryan king, February 8, 2008 5:27 PM
Damien:
I think Kerr is correct: This is more like MapCombine. In terms of the original MapReduce paradigm (or at least how it is implemented in Hadoop), what you are storing in the B+Tree indices is actually the output of a Combine function. The Combine function is a function just like Reduce, except that it can be executed on a _subset_ of the Map outputs for a given key.
Reduces on the other hand must execute on the entire set of Map outputs for a key. In this example, being guaranteed that you were executing on all outputs would allow you to actually perform the division to calculate the final average (as opposed to pushing it to the client).
Don't get me wrong though, this will be incredibly useful: but I think you would be better off renaming what you have called Reduce above to Combine. Then users could define a Reduce function that is guaranteed to be passed the outputs of all Map -> Combine instances.
Stu Hood, February 11, 2008 12:38 AM
Hi Damien, I think this is just a point of definition. I think I understand what you are doing and why you are doing it. I also think that having this feature is excellent, possibly all anyone actually ever needs. I just think that it is also exactly what the Google paper calls Combine, which is a special case of Reduce.
From the Google Paper:
"In some cases, there is significant repetition in the intermediate keys produced by each map task, and *the userspecified Reduce function is commutative and associative*."
"The Combiner function is executed on each machine that performs a map task. Typically the same code is used to implement both the combiner and the reduce functions."
"The only difference between a reduce function and a combiner function is how the MapReduce library handles the output of the function."
Do you think that there is something else you are doing that is different from what google calls the combiner function?
I'm not clear as to why calling this MapCombine would be a problem. Is it just a PR issue? I'm also not clear why and leaving the door open for a full fat reduce, even though it has a bottleneck problem would be technically difficult or undesirable.
Thanks for the extra detail on how it will work internally, I'll follow up my posting on the google group with some more detailed thoughts.
Kerr, February 11, 2008 5:32 AM
Stu> Reduces on the other hand must execute on the entire set of Map outputs for a key
If that's the case, the CouchDB already has a full, non parallelizable reduce you can do today, in any language: Just GET the view results for a desired key and compute the results client side.
Granted you'll want to do this only when you have a fast connection to the server, but that will usually be the case with CouchDB sitting behind the application server, often on the same machine. With a streaming json parser you can easily do it while keeping a single map output value in memory at once.
In the not-to-distant future when it will be possible to run server-side scripts that can query the database, there will be no network bottleneck
Stu> In this example, being guaranteed that you were executing on all outputs would allow you to actually perform the division to calculate the final average (as opposed to pushing it to the client).
Actually, we could keep a running average, but the math was more complicated for no good reason and a simple division at the end is easy for a client.
Kerr> Do you think that there is something else you are doing that is different from what google calls the combiner function?
No I don't.
Any reduce function that is associative doesn't need a combiner, it is already it's own combiner. And any valid reduce/combiner function combo can be mechanically rewritten to a single associative reduce function. If the function cannot be written associatively, then a combiner also cannot be written.
So the issue here is, should an explicit combiner function be a part of the incremental couchdb, javascript view design? I think not. In Java, and other statically type languages, an explicit combiner function may simplify things and could be implemented on CouchDB that way. But for Javascript I don't like it.
However, I think I have a good compromise, perhaps a combiner "flag" could be passed in to the reduce functions, to let the reduce function know it is operating on its own output. If the reduce function is already intrinsically associative, it just ignores the argument.
Here is a Reduce function, with a combiner argument flag, to compute the totals of the receipts:
function (date, values, combine) { var result = {totalSpent:0, numPurchases:0}; if (combine) { // these are output from a previous reduction for(var i=0; i<values.length; i++) { result.totalSpent += values[i].totalSpent; result.numPurchases += values[i].numPurchases; } } else { for(var i=0; i<values.length; i++) { result.totalSpent += values[i]; result.numPurchases++; } } return result; }In the case where there is no difference between the combiner and reducer (the reduction operation is intrinsically associative), you can completely ignore the argument to the point you don't even declare it.
Simple summation is an example, the combine arg isn't used:
function (tag, counts, combine) { int sum=0; for(var i=0; i<counts.length; i++) sum+=counts[i]; return sum; }Same example without the arg even declared (yet it is passed in by the caller):
function (tag, counts) { int sum=0; for(var i=0; i<counts.length; i++) sum+=counts[i]; return sum; }For Javascript, I think I like this way, it's simple in the simple case and the complex stuff is only a little more complex, if at all.
But for statically type languages like Java, the explicit separation of functions might be simpler to code. They could be integrated with CouchDB either way.
Damien Katz, February 11, 2008 7:48 PM
Damien-
The point I was trying to get at in my comment is that I don't think you need a combine flag– you just treat everything as a combination, with the first iteration being the degenerate case of combining with the empty result set (or some initialized set).
ryan king, February 12, 2008 1:30 AM
Hi Damien,
I think I'd have to play with this a bit before coming to any further conclusions. I think it's less confusing to just call it "MapCombine" rather than "MapReduce where reduce must be commutative and associative"
I also still think it would be useful to offer a full reduce. While this could be done currently by a client, or app server on demand, I think there is scope for CouchDB to mange the process and make it more efficient. For example if the map ends up with 100 distinct keys, there's an awful lot of scope for distributing the reducing of these key sets, even if you can't run combiner code on subsets. I think it should also be possible to effectively cache reduce output and have it updated on demand in much the same way as views, even if the hit on providing the update could be large. The alternative for an application that required such a function would be a roll your own affair that didn't have access to CouchDB internals.
By the way, I'm furiously trying to get up to speed on the actual CouchDB source code with Joe Armstrong's book in one hand. So I'm not just shouting from the sidelines, I'm tying my shoelaces and hoping not to lot fall flat on my face when I head out onto the field.
Cheers
Kerr
Kerr, February 12, 2008 11:18 AM
Damien:
Adding the explicit combine flag doesn't really change much. The problem is still that if the user wanted to perform the equivalent of "select date, avg(totalSpent) ... group by date limit N" they would need to perform N divisions on the client.
Division isn't that scary, but this is a simple aggregation example... imagine a GIS application that might want to perform an image composition in the reduce, or a Machine Learning problem that wanted to 'gather canopies'.
I just want to encourage you not to limit the possibilities of this functionality, because integrating MR into a semi-structured data store is extremely exciting.
Thanks for your patience.
Stu Hood, February 12, 2008 1:23 PM
Post a comment