# Joins in MapReduce Pt. 1 - Implementations in PySpark

## Introduction

In traditional databases, the JOIN algorithm has been exhaustively optimized: it's likely the bottleneck for most queries. On the other hand, MapReduce, being so primitive, has a simpler implementation. Let's look at a standard join in MapReduce (with syntax from PySpark).

## Equality Join in PySpark

We start off with two datasets, that might resemble something like below

```
# left # right
(1, 'A', 'B' ) | (1, 'Z', 'Y' )
(2, 'C', 'D' ) | (1, 'X', 'V' )
(2, 'E', 'F' ) | (2, 'W', 'U' )
(3, 'E', 'F' ) | (4, 'T', 'S' )
```

We need to specify a *key* for each record, something that we will use to join on. In PySpark, there is a `keyBy`

function that allows you to set a key. Below is the resulting output.

```
left = left.keyBy(lambda r: r[0])
right = right.keyBy(lambda r: r[0])
# left # right
(1, (1, 'A', 'B') ) | (1, (1, 'Z', 'Y' ))
(2, (2, 'C', 'D') ) | (1, (1, 'X', 'V' ))
(2, (2, 'E', 'F') ) | (2, (2, 'W', 'U' ))
(3, (3, 'E', 'F') ) | (4, (4, 'T', 'S' ))
```

Cool. Next we will add some values to denote the origin of the record: 1 if it is from *left, and 2 *if it is from* right.*

```
left = left.map(lambda (k, v): (k, (1, v)))
right = right.map(lambda (k, v): (k, (2, v)))
```

# left # right
(1, (1, (1, 'A', 'B')) ) | (1, (2, (1, 'Z', 'Y')) )
(2, (1, (2, 'C', 'D')) ) | (1, (2, (1, 'X', 'V')) )
(2, (1, (2, 'E', 'F')) ) | (2, (2, (2, 'W', 'U')) )
(3, (1, (3, 'E', 'F')) ) | (4, (2, (4, 'T', 'S')) )
# We now have (key, (origin, values)) tuples

Next we add the datasets together, using the `union`

function.

```
unioned = left.union(right)
# unioned
(1, (1, (1, 'A', 'B')) ) # key = 1
(2, (1, (2, 'C', 'D')) ) # key = 2
(2, (1, (2, 'E', 'F')) ) # key = 2
(3, (1, (3, 'E', 'F')) ) # key = 3
(1, (2, (1, 'Z', 'Y')) ) # key = 1
(1, (2, (1, 'X', 'V')) ) # key = 1
(2, (2, (2, 'W', 'U')) ) # key = 2
(4, (2, (4, 'T', 'S')) ) # key = 4
```

Up to now, we've really only been mapping. Next we want to start reducing. We start by performing a `groupByKey`

on the dataset, which will send items with the same key to the same reducer. The group by key will also add up all records with the same key into a list, as we see below:

`grouped = unioned.groupByKey() # grouped (each key goes to a different reducer) (1, [(1, (1, 'A', 'B')), (2, (1, 'Z', 'Y')), (2, (1, 'X', 'V'))]) (2, [(1, (2, 'C', 'D')), (1, (2, 'E', 'F')),`

(2, (2, 'W', 'U'))])

(3, [(1, (3, 'E', 'F'))])

(4, [(2, (4, 'T', 'S'))])

We know have (key, [list of all elements with that key])

Oh my god we're almost done! Finally, we take each array and map a function that will perform a mini-cross product:

```
result = grouped.flatMapValues(lambda x : dispatch(x))
# Take each value (the list in the second position), and perform the dispatch function (below) on it.
# For example, let's look at the first element of grouped, i.e. key=1:
def dispatch(seq):
left_origin = []
right_origin = []
for (n, v) in seq:
if n == 1:
left_origin.append(v)
elif n == 2:
right_origin.append(v)
return [(v, w) for v in left_origin for w in right_origin]
For example:
> d = [(1, (1, 'A', 'B')), (2, (1, 'Z', 'Y')),(2, (1, 'X', 'V'))]
> dispatch(d)
[
((1, 'A', 'B'), (1, 'Z', 'Y')),
((1, 'A', 'B'), (1, 'X', 'V'))
]
```

Which is the correct result for joining left 1 keys against right 1 keys.

## Limitations and Extensions

This is a neat way to perform a distributed map-reduce join. But there are limitations:

- Suppose there is a key that is incredibly common across the datasets. The groupby action will send all the records with that key to the same reducer, so at the very least slowing down the entire join, but also potentially blowing up its memory. This is called the
*skew-problem*. - The above algorithm works well for equality joins, that is, where we want keys to be equal. Sometimes we have different requirements on the join condition. For example, suppose I want to join on the lefthand key being equal to
*or smaller*than the righthand key. Or more generally, any function \( l, r \mapsto f(l,r) \) that returns a boolean.

This latter problem is called a \(theta\)-join, where in the case of an equality join \(theta\) is the equality operator. We'll explore how to solve *both* these problems at once in the next blog post.