Blog

Scala (and Java) Spark UDFs in Python

Posted By Jakub Nowacki, 30 October 2017

Many systems based on SQL, including Apache Spark, have User-Defined Functions (UDFs) support. While it is possible to create UDFs directly in Python, it brings a substantial burden on the efficiency of computations. It is because Spark’s internals are written in Java and Scala, thus, run in JVM; see the figure from PySpark’s Confluence page for details.

Since Spark SQL is really a declarative interface, the actual computations take place mostly in JVM. But if we write and use UDFs in Python, the calls have to be made to Python interpreter, which is a separate process. Thus, there is considerable overhead of doing so, as visible on the above figure.

The simplest solution to Python UDFs is to use the available functions, which are quite rich. These functions take and return Column, thus, they can be composed to create more complex functions.

Nonetheless, sometimes we require an external library to perform some task. As I’ve explained in the previous post about language-detector usage, we can utilize external libraries inside Spark quite effectively. I have updated my GitHub repository with some extra code to create UDF in Scala. We are mostly interested in the below piece of code:

object LanguageDetector {

  private lazy val languageDetector: LanguageDetector = new LanguageDetector()

  def apply(): LanguageDetector = languageDetector

  def detect(text: String): Option[String] = languageDetector.detect(text)

  def getUdf: UserDefinedFunction = udf((text: String) => detect(text))

  def registerUdf(alias: String): UserDefinedFunction = {
    val spark = SparkSession.builder().getOrCreate()

    spark.udf.register(alias, (text: String) => detect(text))
  }

  // Alias can be done with default, but this way plays better with Py4j
  def registerUdf: UserDefinedFunction = registerUdf("lang")

}

The above works fine in Scala (and Java), but one may want to use it in Python. Since LanguageDetector is used as a UDF it should be quite easily usable for detecting the language for each string value it receives. The function will result in DataFrame’s null if it can’t detect anything, which is most likely to happen if the sentence is too short or is missing in general.

First, we need to build the JAR file as usual, using SBT. I suggest using sbt assembly in this case, as it contains all the required dependencies, with Spark being marked as provided, though, the assembly (fat) JAR is quite small, about 4MB in total. Note that you may need to change the Spark version, as the current default set in SBT files is 2.2.0.

We start a Spark session as usual, though we need to include the JAR as a configuration parameter spark.jars. The driver class path extension is optional as it is only required for the local run, when the driver plays the role of executor as well. The sample session configuration is provided below; note that you may need to alter the JAR path and name.

import pyspark
import pyspark.sql.functions as func

# spark.driver.extraClassPath is only needed in this case for local execution
spark = pyspark.sql.SparkSession.builder\
    .appName('udfs')\
    .config('spark.jars', 'spark-nlp-tools-assembly-0.1.jar')\
    .config('spark.driver.extraClassPath', 'spark-nlp-tools-assembly-0.1.jar')\
    .getOrCreate()

sc = spark.sparkContext

If the above goes well without any issue, we can go further and prepare some small sample set to test the UDF on.

texts = [("To będzie po polsku.", ),
         ("And this is in English.", ),
         ("Und das ist auf Deutsch.", )]

lines = spark.createDataFrame(texts, ['text'])
lines.show()
+--------------------+
|                text|
+--------------------+
|To będzie po polsku.|
|And this is in En...|
|Und das ist auf D...|
+--------------------+

Having JAR registered, we can approach the problem of porting the function into Python. The business logic will be executed in JVM but we need to create a way to call it from Python, in a form similar to Spark functions.

In Spark to communicate between driver’s JVM and Python instance, gateway provided by Py4j is used; this project is a general one, without dependency on Spark, hence, you may use it in your other projects. The JVM gateway is already present in Spark session or context as a property _jvm. For example, we can use the Java’s random number generator as follows.

random = spark._jvm.java.util.Random()
random.nextDouble()
0.36608641852242674

Note that the types are automatically casted to Python. Though, this is not always the case. Let’s call out getUdf method.

spark._jvm.com.sigdelta.spark.nlp.tools.LanguageDetector.getUdf()
JavaObject id=o51

We get back some Java object, but it doesn’t seem to be too usable. Indeed, we need to create a Python function as below to map the object and types correctly.

def lang(column):
    from pyspark.sql.column import Column, _to_java_column, _to_seq
    jc = spark._jvm.com.sigdelta.spark.nlp.tools.LanguageDetector.getUdf().apply
    return Column(jc(_to_seq(sc, [column], _to_java_column)))

First we create an instance of Java class jc. We call the method directly as it is defined in Scala’s object and, thus, is a static method, but in case you’d need to create an instance of new object you can use the below substitute.

pcls = "com.sigdelta.spark.nlp.tools.LanguageDetector"
jc = spark._jvm.java.lang.Thread.currentThread() \
    .getContextClassLoader().loadClass(pcls).newInstance().getUdf().apply

The function _to_seq turns the list of columns into a Java sequence. It requires Spark Context and conversion function, i.e. _to_java_column to transform the objects correctly. The created sequence is then passed to apply function of our UDF. Note, that we need to cast the result of the function to Column object as it is not done automatically.

Once the above function register correctly, we can use it with DataFrame, by passing it the name or Column object of interest.

lines.withColumn('lang', lang('text')).show()
+--------------------+----+
|                text|lang|
+--------------------+----+
|To będzie po polsku.|  pl|
|And this is in En...|  en|
|Und das ist auf D...|  de|
+--------------------+----+

Alternativelly, if we require the function to be used in SQL expressions, we can use the provided method registerUdf and, well, register UDF for SQL expression usage.

spark._jvm.com.sigdelta.spark.nlp.tools.LanguageDetector.registerUdf()
JavaObject id=o63

Since the default name is also lang, we can use it as follows.

lines.createOrReplaceTempView('lines')

spark.sql('SELECT text, lang(text) AS lang FROM lines').show()
+--------------------+----+
|                text|lang|
+--------------------+----+
|To będzie po polsku.|  pl|
|And this is in En...|  en|
|Und das ist auf D...|  de|
+--------------------+----+

If required, we can register UDF with a different alias as follows:

spark._jvm.com.sigdelta.spark.nlp.tools.LanguageDetector.registerUdf('lang_detect')
JavaObject id=o65

Now UDF lang_detect should also work.

spark.sql('SELECT text, lang_detect(text) AS lang FROM lines').show()
+--------------------+----+
|                text|lang|
+--------------------+----+
|To będzie po polsku.|  pl|
|And this is in En...|  en|
|Und das ist auf D...|  de|
+--------------------+----+

Above we’ve shown only one, relatively simple UDF. Nonetheless, this method, with some small alterations, can be used for different functions, UDFs or otherwise, to be registered in Python.

The original source notebook is available here.

Contact

How can we help?

Drop us a line and we'll respond as soon as possible.

Treble