Your Daily Geekery

top-k with mapreduce


Providing time series data about plays/views/visits is a rather simple problem compared to providing top-k information for arbitrary time windows. Most research found in this area has a different focus of what the usual web startup needs. The following approach seems to work fairly well.

The Problem

For exact order information an ordered list with all counters has to be maintained.

track1, 1040 plays
track2, 100 plays
track3, 10 plays

Of course this does not scale if you add another dimension into the mix. Adding a user to the above example suddenly means maintaining such a list of tracks - per user. Which isn't really feasible for any reasonable number of users. Another problem is that with a an arbitrary time window you would have to re-build the counters for that very window. Even when counting based on aggregates this still does not perform well enough.

The Approximation

The first thing to realize here is that the order probably is more important to the user than the exact count. Using a similar approach as for the counts, top-k information can also be aggregated based on buckets. A query for the time range 31.01.2011 until 01.03.2011 could look like this:

topk(01.03.2011) = [[track5,  6], [track3,  3], [track2, 1]] # day
topk(02.2011)    = [[track1, 15], [track2, 13], [track5, 5]] # month
topk(31.01.2011) = [[track2,  7], [track1,  5], [track5, 1]] # day

Merging these top-k information results in the following array:

topk(31.01.2011 .. 01.03.2011) =
  [[track2, 1+13+7=21], [track1, 1+15=16], [track5, 6+5+1=12],
   [track3, 3]]

Restricting this array to k=3 gives an approximate of the top-k for the given time window. Approximation because the individual top-k results are cut off. The bigger the timespan (the more different time buckets used) the closer is the result to the exact order. A hybrid solution could be put in place for shorter timespans.

Calculating the top-k

If the approximation is acceptable the only problem left is to generate the top-k information for the various time buckets. What would require a lot of sorting can be solved with the single sort of a mapreduce. Let's run through this with a concrete example.

The mapper has to construct the keys so they are sorted by date and grouped by the additional dimension "user".

user1, 2009, 01, 01, track2     # -> user1, 2009
                                # store top-k
user1, 2011, 01, 01, track2     # -> user1, 2011
user1, 2011, 01, 02, track2     # -> user1, 2011
user1, 2011, 02, 01, track1     # -> user1, 2011
                                # store top-k
user2, 2009, 01, 01, track1     # -> user2, 2009

It's important to use a custom partitioner so that the grouped data is not scattered across different reducers. Since a reducer now sees all the data of a single user the problem is a little more confined. The idea is to build the top-k information while the data passes through the reducer, keep them in memory and then flush them to the store.

The mindful reader probably noticed that the this organization of data is not the best approach. The reducer for the above example requires to keep the information per [user, year] in memory. This means (because of the hopefully increasing number of tracks and users in the system) it does not even have an upper bound. The trick is to move out the unbound fields and change the grouping by applying a new order inside the mapper.

user1, 2009, track2, 01, 01     # -> user1, 2009, track2
                                # merge top-k for track2
                                # store top-k
user1, 2011, track1, 02, 01     # -> user1, 2011, track1
                                # merge top-k for track1
user1, 2011, track2, 01, 01     # -> user1, 2011, track2
user1, 2011, track2, 02, 01     # -> user1, 2011, track2
                                # merge top-k for track2
                                # store top-k
user2, 2009, track1, 01, 01     # -> user2, 2009, track1

Now while data passes through the reducer the maximum memory requirement is bound as we key by [user, year, track]. A whole year of counts means keeping track of 365 + 12 + 1 = 378 values plus top-k lists for all possible buckets. For k=3 this means another 365 + 12 + 1 = 378 lists * k=3 * (1 key + 1 value) = 2268 values. Whenever a track year is complete an insert sort into a length limited array throws away all unnecessary top-k k>3 information, again restricting the memory requirement to a fix bound. Which means the maximum memory required is 378 * (1 + 3 * 2) = 2646 values. Now we are talking.

The Implementation

Here is quick implementation in ruby that should explain it even better. The TopArray does an insert sort but only keeps the max number of values. It's the structure to keep the top-k information.

class TopArray < Array
  def initialize(max)
    @max = max
  def insert_sort(name, value)
    pos = find_index { |(n,v)| value >= v } || length
    if pos && pos <= @max
      insert(pos, [ name, value ])
    while length > @max

The reducer does the counting for for all the different time buckets. On track or year changes it merges the top-k information. On user or year changes it writes out the final top-k information for the year and user.

class Reducer
  def initialize
    @counts, @topk = {}, {}

  def reduce(line)
    fields = line.chomp.split("\t")

    # current fields
    user, year, track, month, day, hour = fields

    # previous fields
    prev_user, prev_year, prev_track = @prev_fields
    @prev_fields = fields

    # check if there was a change
    if (track != prev_track || year != prev_year) && @counts.length > 0
      puts "#{prev_year}: merge #{prev_track} counts #{@counts} into topk"
      # merge the top-k and reset
      @counts.each do |k,v|
        topk = @topk[k] ||
        topk.insert_sort(prev_track, v)
        @topk[k] = topk
      @counts = {}

    # add "#{year}-#{month}-#{day}@#{hour}" if you also want top-k per hour
    [ "#{year}", "#{year}-#{month}", "#{year}-#{month}-#{day}" ].each do |k|
      # increment counters per time bucket
      @counts[k] = (@counts[k] || 0) + 1

    if (user != prev_user || year != prev_year) && @topk.length > 0
      puts "#{prev_year}: write top-k for #{prev_user} = #{@topk}"
      # write out top-k and reset
      @topk = {}

Feeding some data (as a mapper would produce it) into the reducer

r =
].each { |l| r.reduce(l) }

produces the following output:

2010: merge track1 counts {"2010"=>3, "2010-02"=>3, "2010-02-01"=>3} into topk
2010: merge track2 counts {"2010"=>1, "2010-02"=>1, "2010-02-01"=>1} into topk
2010: write top-k for user1 = {
  "2010"=>[["track1", 3], ["track2", 1]],
  "2010-02"=>[["track1", 3], ["track2", 1]],
  "2010-02-01"=>[["track1", 3], ["track2", 1]]}
2011: merge track1 counts {"2011"=>5, "2011-01"=>1, "2011-01-01"=>1, "2011-03"=>4, "2011-03-03"=>4} into topk
2011: merge track2 counts {"2011"=>2, "2011-02"=>2, "2011-02-01"=>1, "2011-02-02"=>1} into topk
2011: merge track3 counts {"2011"=>1, "2011-03"=>1, "2011-03-03"=>1} into topk
2011: write top-k for user1 = {
  "2011"=>[["track1", 5], ["track2", 2], ["track3", 1]],
  "2011-01"=>[["track1", 1]],
  "2011-01-01"=>[["track1", 1]],
  "2011-03"=>[["track1", 4], ["track3", 1]],
  "2011-03-03"=>[["track1", 4], ["track3", 1]],
  "2011-02"=>[["track2", 2]],
  "2011-02-01"=>[["track2", 1]],
  "2011-02-02"=>[["track2", 1]]}

The reducer is the work horse and is a little more complex than just for counts but it's still quite an easy solution to the otherwise very complex top-k dilemma. It requires only a single sort and a fix amount of memory.