Spark JOIN using REGEX

Categories Data Science

A more technical post about how I end up efficiently JOINING 2 datasets with REGEX using a custom UDF in SPARK


Context

For the past couple of months I have been struggling with this small problem. I have a list of REGEX patterns and I want to know which WIKIPEDIA article contains them.

What I wanted to end with was a table with the following columns:

  • Wikipedia Article ID
  • Wikipedia Article Text
  • Matching Pattern (or null if no pattern got triggered)
IDTextPattern
1This Wikipedia:Statistics page measures…‘\bbot.*generated\b’
1This Wikipedia:Statistics page measures…‘\bgompertz\b’
2In the context of network theory,…‘\bpower law\b’
3In probability theory and statisticsnull

My list of REGEX was roughly 500 pattern long. Some were simple word search, but others were more complex REGEX. I needed a good way to search for these patterns and find a way to get them in the mentioned format. Some sort of LEFT OUTER JOIN.

Setup

Since there are a lot of articles on Wiki. Apparently, 5.8 millions of them. I decided to use a tool capable of paralleling the research, I have decided to use Spark from the Apache foundation. Now a days, you can select your favourite cloud provider and there is a high chance you can get a cluster with not much more than 1 click, which I did.

Now that I had my working setup I started to look online on how to do that.

The first thing I found online was to do a LEFT OUTER JOIN with rlike.

That looked promising, on a small set it was doing exactly what I wanted.

After I naively just launched the join for all the dataset… and I waited for a long time. Actually as long as it took me to realize my cluster went out of memory. From that point, I looked online for many hours, tried many solutions and I could not find anything relevant on how to solve that problem.

Since that was nothing really important, I did not touch it for a couple of months and one day at work, I was talking about it with one of my colleagues and he figured out the problem.

The issue

Here is what he said:

” First, and foremost, non-equijoins perform poorly in Spark because they can only be evaluated using a broadcast nested loop join or a cross join.

Let’s assume that articles contains 1,000,000 rows and patterns contains 500 rows. The smaller dataset (particle in this case) will be broadcast. To evaluate this join, particle will effectively be scanned 1,000,000 times and the join predicate will be evaluated 500,000,000 times. What makes it even worse is that the pattern will be compiled 500,000,000 times.”

This basically spells disaster.

Michael

The Solution

And then, after finding the issue he came back to me a couple of days later with a custom UDF

and to use it, simply need to query it like this:

Result

I tried it, and within couple of hours I had exactly what I wanted.

I decided to do a small benchmark. After both datasets loaded and cached in Spark, I selected only 20 000 articles to try both methods. By using the same exact samples here are the results:

  • First technique, rlike: 3 articles per seconds
  • Second technique, UDF: ~5 000 articles per seconds

I know most of this delay is due to cache, reading and other memory management, but still what a difference.

Conclusion

I do not usually write this type of blog post, but since I looked for a solution and could not find anything, I thought it worth sharing back.

I would like to thank Michael Styles for his help on this project.

3 thoughts on “Spark JOIN using REGEX

  1. thanks for the great post.
    I have encountered the similar requirement and working on it.
    getting some syntax errors when I crate the UDF. could you pls share full code.

  2. could you pls check and let me know the error message reason. Appreciate your help…
    %pyspark
    from pyspark.sql import functions as F
    regex = sc.parallelize([(1,”stackoverflow.com/questions/.*$”), (2,”(?i:google)”),(3,”nn.com/*/politics”),(7,”(?i:^social|^blogs)”)]).toDF([“id”,”pattern”])
    wiki = sc.parallelize([(1,”stackoverflow.com/questions/47272330″),(2,”www.google.de/url”),(7,”social-GLO-LinkedIn-SMEDaily”)]).toDF([“id”,”text”])
    regexes = regex.agg(F.collect_list(F.col(“pattern”))).collect()[0][0]

    def findMatchingPatterns(regexes: ArrayList[String]): UserDefinedFunction = {
    udf((value: String) => {
    for {
    text Pattern.matches(r, text))
    if matches.nonEmpty
    } yield matches
    }, ArrayType(StringType))
    }#

    from utils.scala_functions import find_matching_patterns
    from pyspark.sql import functions as F

    regexes = regex.agg(F.collect_list(F.col(“pattern”))).collect()[0][0]
    regexes = sc.broadcast(regexes)

    articles = articles \
    .withColumn(“patterns”, find_matching_patterns(F.col(“text”), regexes.value)
    .withColumn(“patterns”, F.when(F.col(“patterns”).isNull(), F.array(F.lit(None))).otherwise(F.col(“patterns”))) \
    .withColumn(“pattern”, F.explode(F.col(“patterns”)))

    error message:
    invalid syntax (, line 7)
    File “”, line 7
    def findMatchingPatterns(regexes: ArrayList[String]): UserDefinedFunction = {
    ^

Leave a Reply

Your email address will not be published. Required fields are marked *