Spark is a framework which tries to provides answers to many problems at once. At its core it allows for the distribution of generic workloads to a cluster. But then it provides a SQL-friendly API to work with structured data, a streaming engine to support applications with fast-data requirements and a ML library. The later is the one in which we are interested in this post: a distributed machine learning library with several models and general feature extraction, transformation and selection implementations. Supporting abstractions for composing ML pipelines or hyperparameter tunning, among others, are also provided.
Even though we get a lot out of the box from Spark ML, there will eventually be cases where you need to develop your custom transformations. Maybe the data science team you are working with as came up with some new complex features that turned out to be really valuable to the problem and now you need to implement these transformations at scale. Ideally, you will want to write them using Scala and expose a Python wrapper to facilitate their use.
For a better understanding, I recommend studying Spark’s code. Start with a easy model like the CountVectorizer
and understand what is being done. It will give you all the tools you need to build your own customizations.
We will use Spark 2.2.1
and the ML API
that makes use of the DataFrame
abstraction.
The complete example can be found on this repository. It contains the scala code plus the python wrapper implementation and boiler plate for testing in both languages.
Custom Estimator/Transformer
Let’s create a custom Bucketizer
that will divide the range of a continuous numerical column by an input parameter numberBins
and then, for each row, decide the appropriate bin.
Given an input column:
+-----+
|input|
+-----+
| 1.0|
| 5.0|
| 0.0|
| 7.0|
| 4.0|
| 8.0|
| 10.0|
+-----+
We expect the following output
+-----+---+
|input|bin|
+-----+---+
| 1.0| 0|
| 5.0| 2|
| 0.0| 0|
| 7.0| 2|
| 4.0| 1|
| 8.0| 3|
| 10.0| 4|
+-----+---+
In order to create a custom Transformer
or Estimator
we need to follow some contracts defined by Spark. Very briefly, a Transformer
must provide a .transform
implementation in the same way as the Estimator
must provide one for the .fit
method.
You need an Estimator
every time you need to calculate something prior to the actual application of the transformation. For instance, if you need to normalize the value of the column between 0 and 1, you must necessarily first know the maximum and the minimum of that particular column. So you would create a estimator with a .fit
method that calculates this data and then returns a Model that already has all it needs to apply the operation.
First of all declare the parameters needed by our Bucketizer:
validateAndTransformSchema
just validates the model operating conditions, like the input type of the column: if (field.dataType!= DoubleType)
We then declare that our Bucketizer
will respect the Estimator
contract, by returning a BucketizerModel
with the transform
method implemented. Additionally, BucketizerParams
provides functionality to manage the parameters that we have defined above.
class Bucketizer(override val uid: String) extends Estimator[BucketizerModel] with BucketizerParams
And here is the implementation:
The interesting part is the fit
method that calculates the minimum and maximum values of the input column, creates a SortedMap
with the bins boundaries and returns a BucketizerModel
with this pre calculated data. This model, having knowledge about the boundaries, just needs to map each value to the right bin:
javaBins
is needed to map the bins data structure to a more java-friendly version. Otherwise when we ask for this structure from Python (through py4j) we cannot directly cast it to a Python dict
In the companion object of BucketizerModel
we provide support for model persistence to disk.
Spark ML has some modules that are marked as private so we need to reimplement some behaviour. In the github repository this is done in ReadWrite.scala
and Utils.scala
.
To create the jar:
sbt clean assembly
Python wrapper
In case we need to provide access to our Python friends, we will need to create a wrapper on top of the Estimator.
First of all, we need to inject our custom jar to the spark context.
import pyspark
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")
conf.set("spark.jars", 'spark-mllib-custom-models-assembly-0.1.jar')
conf.set("spark.app.name", "sparkTestApp")
spark = pyspark.sql.SparkSession.builder.config(conf=conf).getOrCreate()
We will need to write a wrapper on top of both the Estimator
and the Model
. For the Estimator
is basically just boilerplate regarding the input arguments and also specify our package name in _classpath
.
HasInputCol
and HasOutputCol
save us the trouble of having to write:
inputCol = Param(
Params._dummy(), "inputCol", "The input column",
typeConverter=TypeConverters.toString)
outputCol = Param(
Params._dummy(), "outputCol", "The output column",
typeConverter=TypeConverters.toString)
And here is the model:
Note that we are calling the java-friendly version to retrieve the bins
data structure
self._call_java("javaBins")
Additionally, we provide the qualifier name of the package where the model is implemented com.custom.spark.feature.BucketizerModel
.
Finally, in the read
method we are returning a CustomJavaMLReader
. This is a custom reading behaviour that we had to reimplement in order to allow for model persistence, i.e. being able to save/load
the model. You can check the details in the repository.
Additional support must be given to support the persistence of this model in Spark’s Pipeline
context.