Menu
Cart

Joins in MapReduce Pt. 1 - Implementations in PySpark

Posted by Cameron Davidson-Pilon at

This is Part 1 on a series on joining datasets in a MapReduce environment.

Introduction

In traditional databases, the JOIN algorithm has been exhaustively optimized: it's likely the bottleneck for most queries. On the other hand, MapReduce, being so primitive, has a simpler implementation. Let's look at a standard join in MapReduce (with syntax from PySpark). 

Equality Join in PySpark

We start off with two datasets, that might resemble something like below

# left               # right 
(1, 'A', 'B' )   |   (1, 'Z', 'Y' ) 
(2, 'C', 'D' )   |   (1, 'X', 'V' ) 
(2, 'E', 'F' )   |   (2, 'W', 'U' ) 
(3, 'E', 'F' )   |   (4, 'T', 'S' ) 

We need to specify a key for each record, something that we will use to join on. In PySpark, there is a keyBy function that allows you to set a key. Below is the resulting output. 

left = left.keyBy(lambda r: r[0])
right = right.keyBy(lambda r: r[0])

# left                    # right 
(1, (1, 'A', 'B') )   |   (1, (1, 'Z', 'Y' ))
(2, (2, 'C', 'D') )   |   (1, (1, 'X', 'V' ))
(2, (2, 'E', 'F') )   |   (2, (2, 'W', 'U' )) 
(3, (3, 'E', 'F') )   |   (4, (4, 'T', 'S' )) 

Cool. Next we will add some values to denote the origin of the record: 1 if it is from left, and 2 if it is from right.

left = left.map(lambda (k, v): (k, (1, v)))
right = right.map(lambda (k, v): (k, (2, v)))

# left # right (1, (1, (1, 'A', 'B')) ) | (1, (2, (1, 'Z', 'Y')) ) (2, (1, (2, 'C', 'D')) ) | (1, (2, (1, 'X', 'V')) ) (2, (1, (2, 'E', 'F')) ) | (2, (2, (2, 'W', 'U')) ) (3, (1, (3, 'E', 'F')) ) | (4, (2, (4, 'T', 'S')) ) # We now have (key, (origin, values)) tuples

Next we add the datasets together, using the union function.

unioned = left.union(right)

# unioned
(1, (1, (1, 'A', 'B')) )  # key = 1
(2, (1, (2, 'C', 'D')) )  # key = 2
(2, (1, (2, 'E', 'F')) )  # key = 2
(3, (1, (3, 'E', 'F')) )  # key = 3
(1, (2, (1, 'Z', 'Y')) )  # key = 1
(1, (2, (1, 'X', 'V')) )  # key = 1
(2, (2, (2, 'W', 'U')) )  # key = 2
(4, (2, (4, 'T', 'S')) )  # key = 4

Up to now, we've really only been mapping. Next we want to start reducing. We start by performing a groupByKey on the dataset, which will send items with the same key to the same reducer. The group by key will also add up all records with the same key into a list, as we see below:

grouped = unioned.groupByKey()

# grouped (each key goes to a different reducer)
(1, [(1, (1, 'A', 'B')), (2, (1, 'Z', 'Y')), (2, (1, 'X', 'V'))])
(2, [(1, (2, 'C', 'D')), (1, (2, 'E', 'F')), (2, (2, 'W', 'U'))])
(3, [(1, (3, 'E', 'F'))])
(4, [(2, (4, 'T', 'S'))])
We know have (key, [list of all elements with that key])

Oh my god we're almost done! Finally, we take each array and map a function that will perform a mini-cross product:

result = grouped.flatMapValues(lambda x : dispatch(x))
# Take each value (the list in the second position), and perform the dispatch function (below) on it. 
# For example, let's look at the first element of grouped, i.e. key=1:

def dispatch(seq):
    left_origin = []
    right_origin = []
    for (n, v) in seq:
        if n == 1:
            left_origin.append(v)
        elif n == 2:
            right_origin.append(v)
    return [(v, w) for v in left_origin for w in right_origin]

For example:
> d = [(1, (1, 'A', 'B')), (2, (1, 'Z', 'Y')),(2, (1, 'X', 'V'))] 
> dispatch(d)
[
 ((1, 'A', 'B'), (1, 'Z', 'Y')), 
 ((1, 'A', 'B'), (1, 'X', 'V'))
 ]

Which is the correct result for joining left 1 keys against right 1 keys.

Limitations and Extensions

This is a neat way to perform a distributed map-reduce join. But there are limitations:

  1. Suppose there is a key that is incredibly common across the datasets. The groupby action will send all the records with that key to the same reducer, so at the very least slowing down the entire join, but also potentially blowing up its memory. This is called the skew-problem.
  2. The above algorithm works well for equality joins, that is, where we want keys to be equal. Sometimes we have different requirements on the join condition. For example, suppose I want to join on the lefthand key being equal to or smaller than the righthand key. Or more generally, any function \( l, r \mapsto f(l,r) \) that returns a boolean. 

This latter problem is called a \(theta\)-join, where in the case of an equality join \(theta\) is the equality operator. We'll explore how to solve both these problems at once in the next blog post.

Related Posts


Latest Data Science screencasts available


Comments

Leave a comment

Please note: comments will be approved before they are published