# Joins in MapReduce Pt. 2 - Generalizing Joins in PySpark

Posted by **Cameron Davidson-Pilon** at

## Introduction

In the previous article in this series on Joins in MapReduce, we looked at how a traditional join is performed in a distributed map-reduce setting. I next want to generalize the idea of a join: typically we think of joining rows on *equal keys, *but more generally, you could join two rows on any function that returns a boolean: \( l, r \mapsto \theta(l,r) \). We call these generalized joins \(\theta\)-joins, or *theta-joins. *Because of the arbitrariness of the \(\theta\) function, theta-joins can be incredibly more computationally challenging. Here are some examples of theta-joins:

`R.key < L.key`

`|R.key - L.key| < 2`

`R.key1 = L.key1 and R.key2 = L.key2`

And then, implementing a theta-join in mapreduce is even more challenging! If you recall the previous join algorithm, where we implemented an *equality-join *hence \(\theta\) equaled the equality operator, we used an implicit trick: rows with the same key on the lefthand side and the righthand side would map to the same reducer in the `groupByKey`

step. Really, no joining is actually done pre-reducer, just organizing. So how do we implement a theta-join in mapreduce?

## 1-Bucket-Theta Algorithm

Unfortunately, the problem is not generally solved. For very specific join types, there might exist very fragile tricks that one can use, but they are not generalizable. In order to accommodate any join, you'll need to check all elements of the RHS (righthand side) against all elements of the LHS (lefthand side). This is inefficient or just generally crappy, so how can we reduce the crappiness? The authors of a pretty accessible paper, Processing Theta-Joins using MapReduce, present an efficient way to partition this cross-product (what they call the join-space) across the reducers. What's unique about this algorithm is because of it's randomness (yes, it's stochastic), it handles the skew-problem common in mapreduce joins gracefully.

Below is a PySpark implementation of the 1-Bucket-Theta join:

And some example usage using datetimes, this might be called an *interval-join *as I'm searching for rows that have a datetime that falls into a interval.

## Caveat.

Sorry, but the above implementation is just too inefficient: that join-space I mentioned above, which is the cross-product of the datasets, is simply too big. Most rows will not be included in the result set, yet we check them anyways. This is the burden of the theta-join. Though I don't follow suite, the paper does examine gains by using some summary statistics from the LHS and RHS to improve the performance of certain types of theta-joins.

## Latest Data Science screencasts available

Comments