Blog

Using language-detector aka large not serializable objects in Spark

Posted By Jakub Nowacki, 25 September 2017

Often we find ourselves in a situation, that we need to incorporate an external library to do some extra work with our data. This is especially true when we work with natural language or require machine learning to be applied. These libraries can create large objects and may not be suitable for usage in Big Data clusters out of the box. The majority of distributed computing libraries, Apache Spark being no exception, require the objects that they use to be serializable. On the other hand not all the libraries and their classes implement Serializable interface. Below I discuss how to deal with that in Scala and use these objects efficiently in Spark.

One of a first tasks when dealing with natural language text from a multilingual source is language detection. There are many ways of detecting the text language, but most of the practical ones rely on dictionary approach with some word statistics, often combined into n-grams. Great example of such a ready-to-use tool is language-detector. It’s written in Java, thus, it’s well suited to work with Spark. Moreover, it provides 71 built-in language profiles to be used out-of-the box. The are two issues we need to workaround: relatively large size, i.e. 74MB for all language profiles, and the fact that its classes are not serializable.

Lets start with not serializable issue. When you try to use the object created in driver directly on some test data, e.g. inside map function, you’ll get java.io.NotSerializableException as Spark needs to serialize the object to be used in the executors. Indeed, you may create the object inside a function, which won’t throw the not serializable exception, e.g.

ds.flatMap((text String) => {
  val languageDetector: langdetect.LanguageDetector = {
    val languageProfiles = new LanguageProfileReader()
      .readAllBuiltIn
    LanguageDetectorBuilder.create(NgramExtractors.standard)
      .withProfiles(languageProfiles).build
  }

  val textObjectFactory: TextObjectFactory = CommonTextObjectFactories
    .forDetectingShortCleanText

  val textObject = textObjectFactory.forText(text)
  val lang = languageDetector.detect(textObject)
  if (lang.isPresent)
    Some(lang.get.getLanguage)
  else
    None    
})

While it’s a valid piece of code, it’s also suboptimal. Namely, for every mapped element a new object is created, which takes 74MB in memory and, thus, it takes relatively long to create it just for one execution. Note that this can be used for small objects quite efficiently, but not in this case.

The best solution is to propagate the model somehow and minimize the number of object creations. First approach we could use without changing the model is using the mapPartition method, which is available for both RDD and Dataset. We can use it as follows:

ds.mapPartitions((partition: Iterator[String]) => {
  val languageDetector: langdetect.LanguageDetector = {
    val languageProfiles = new LanguageProfileReader().readAllBuiltIn
    LanguageDetectorBuilder.create(NgramExtractors.standard).withProfiles(languageProfiles).build
  }

  val textObjectFactory: TextObjectFactory = CommonTextObjectFactories.forDetectingShortCleanText

  partition.flatMap((text: String) => {
    val textObject = textObjectFactory.forText(text)
    val lang = languageDetector.detect(textObject)
    if (lang.isPresent)
      Some(lang.get.getLanguage)
    else
      None
  })
})

In this case the language detector object is created once for every partition and it is applied to all the elements in partition via the iterator passed as an input argument. This is much more optimal than before, but still we create quite a lot of objects. Note that language detector detect method returns Java Optional, which needs to be repackaged into Scala Option if we intent to use it in flatMap as show above.

But there is a better way, to have one object per executor. Since the detector object is not serializable, we cannot create it in driver directly, but we need to postpone it somehow and create it on each driver. Also, actions in Spark do not have any open or before method, which would allow us some general object creation before it gets used in executor. In Java we could use a singleton pattern and check, if the object is already created, via some getInstance method. Since we are in Scala we can use the language features to perform roughly the same operation, but in more concise fashion. We can take the original language detector and wrap it into our class as follows:

class LanguageDetector extends Serializable {

  @transient lazy val languageDetector: langdetect.LanguageDetector = {
    val languageProfiles = new LanguageProfileReader().readAllBuiltIn
    LanguageDetectorBuilder.create(NgramExtractors.standard).withProfiles(languageProfiles).build
  }

  @transient lazy val textObjectFactory: TextObjectFactory = CommonTextObjectFactories.forDetectingShortCleanText

  def detect(text: String): Option[String] = {
    val textObject = textObjectFactory.forText(text)
    val lang = languageDetector.detect(textObject)
    if (lang.isPresent)
      Some(lang.get.getLanguage)
    else
      None
  }
}

object LanguageDetector {
  def apply(): LanguageDetector = new LanguageDetector()
}

First of all, our class is now serializable, though it contains objects which are not. To stop them from being serializable we can use Scala @transient decorator. This decorator means that the field should not be serialized. We could use only @transient decorator, if there is some open method, but in this case a better solution is to also make the original detector objects into lazy val. This means that the object in the field will be only created once the field is used the first time; in our case, while using detect method in some Spark action.

To put it all together, our LanguageDetector object will be created in the driver, but without the non-serializable objects, sent to executor, and the objects will be created on the executor once the detect method is used somewhere like flatMap function. Thus, we’ll have just one object per executor.

Now lets implement it into something more useful. First, to distribute the object we could rely on the standard serialization, but it is better to wrap it into Spark’s broadcast variable. These variables are distributed more efficiently and also allow for better management. We can also wrap the code into User Defined Function, which can be used in all the languages of Spark, which is especially useful for working in Python and R, as creating UDFs in this languages is not optimal. We can use the above as follows:

object LangDetectUdfUsage {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("LangDetectUdfTest")
      .getOrCreate()
    import spark.implicits._

    val languageDetector = LanguageDetector()

    val sampleText = "Now what is that?"
    println(sampleText, languageDetector.detect(sampleText))

    val ldBcast = spark.sparkContext.broadcast(languageDetector)

    val ldUdf = udf((text: String) => ldBcast.value.detect(text))

    val df = List(
      "To będzie po polsku.",
      "And this is in English.",
      "Und das ist auf Deutsch.").toDF("text")

    df.show()

    df.select('text, ldUdf('text).as('lang)).show()

  }
}

The result of running the above code is:

(Now what is that?,Some(en))

+--------------------+
|                text|
+--------------------+
|To będzie po polsku.|
|And this is in En...|
|Und das ist auf D...|
+--------------------+

+--------------------+----+
|                text|lang|
+--------------------+----+
|To będzie po polsku.|  pl|
|And this is in En...|  en|
|Und das ist auf D...|  de|
+--------------------+----+

Note that to run the above inside an IDE, you may need to add -Dspark.master=local[*] to VM options for the run configuration.

To sum up, I’ve shown how you can use very useful tool for language detection inside Spark efficiently, even though it’s not serializable. This approach can be used with other objects like machine learning models, which creation you need to control. Note that the original language detector is thread-safe, thus, we didn’t have to synchronize its usage. If your objects are not thread-safe, you may need to use synchronization methods or have objects pool, that will assure that only one object is called by each Spark function execution. You can obtain the above codes from my GitHub repository.

Contact

How can we help?

Drop us a line and we'll respond as soon as possible.

Treble