Searching through distributed datasets: The ModBinary Search
Posted by Cameron DavidsonPilon at
On a nottoounusual day, one of my Spark jobs failed in production. Typically this means
there was a row of bad data that entered into the job. As I’m one to write regression tests, this “type” of bad had likely never been seen before, so I needed to inspect the individual offending row (or rows). Typically debug steps include:
 Manually inspecting all the recent data, either by hand or on a local machine.
 The failed job might print the offending row in the logs.
 The failed job might print the stacktrace of the failure, giving a clue as to what the original row looked like.
My problem was not as easy as this:
 The data is too big to manually inspect, and would not all fit on a single machine to inspect.
 Nope, unfortunately this is not a feature in Spark logs.
 Also nope, not in Spark logs. Using Spark DataFrames, all the code is autogenerated at a lower layer anyways, so it returning LOC wouldn’t give me much.
Essentially, all the information I had was the following: my job threw a NullPointerException somewhere when it tried to process this new dataset:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 533 in stage 1.0 failed 4 times, most recent failure: Lost task 533.3 in stage 1.0. java.lang.NullPointerException
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
...
But I still needed to finding this offending row. The idea of a binary search over the dataset appealed to me, but the data was distributed: there was no sense of an “order” to it, that is, I had no concept of a middle value to split the data on. The next idea I thought of was using modulus. Suppose each row had an integer 1,2,3,… associated with it, then I could split the data modulo 2 and test either side, 0 or 1, for a failure. Suppose that the 1shalf contained the failure. Then I should look at splitting the dataset by 1 and 3 modulo 4. And so on. This idea could be applied recursively. In practice, it relies on the fact that, given \(x \equiv k \bmod 2^n\) implies either \(x \equiv k \bmod 2^{n+1}\) or \(x \equiv k + 2^n \bmod 2^{n+1}\). Using this, I can recursively halve my dataset until I have found a single row (or a much smaller set atleast). I’ll explain how to associate each row to an integer in a moment, but first, here’s a sample algorithm in Python:
This is a toy example, but the gist of it is that each round I halve the dataset
and test one of the halves for the desired behaviour (implemented in test
). The running time of this algorithm is \(\mathcal{O}(T\log{n})\), where \(n\) is the size of the dataset and \(T\) is the running time of the job.
This assumes there is an easy way to assign an integer to each row. When implementing this idea in Spark to a distributed dataset, there is no way to index each row consecutively (Spark does have zipWithIndex
in its RDD layer, though this does an internal collect
which isn’t very desirable.) However there is a monotonically_increasing_id
function, which gives each row a monotonically increasing integer, but not necessarily consecutive. Fortunately this algorithm still works (and typically this is where a normal binary search would break). The runtime however is no longer \(\mathcal{O}(T\log{n})\), but \(\mathcal{O}(T\log{M})\), where \(M\) is the maximum integer assigned. So in Spark, my implementation to find the offending row looked like:
So I let this run for an hour or so, and it worked! I was very happy to find the offending row. Though next time, I would change this:
 Don’t bother filtering down to just one row. Better to stop early and collect locally, then inspect the smaller dataset.

cache
right before your job fails, else you’ll be rerunning the same computations over and over
I’m not totally sure this is the fastest way to find the offending line (though, given the tools plus information provided, it might be), but it works and is reliable.
Latest Data Science screencasts available
Comments