mlflow.spark
The mlflow.spark
module provides an API for logging and loading Spark MLlib models. This module
exports Spark MLlib models with the following flavors:
- Spark MLlib (native) format
Allows models to be loaded as Spark Transformers for scoring in a Spark session. Models with this flavor can be loaded as PySpark PipelineModel objects in Python. This is the main flavor and is always produced.
mlflow.pyfunc
Supports deployment outside of Spark by instantiating a SparkContext and reading input data as a Spark DataFrame prior to scoring. Also supports deployment in Spark as a Spark UDF. Models with this flavor can be loaded as Python functions for performing inference. This flavor is always produced.
mlflow.mleap
Enables high-performance deployment outside of Spark by leveraging MLeap’s custom dataframe and pipeline representations. Models with this flavor cannot be loaded back as Python objects. Rather, they must be deserialized in Java using the
mlflow/java
package. This flavor is produced only if you specify MLeap-compatible arguments.
-
mlflow.spark.
autolog
()[source] Note
Experimental: This method may change or be removed in a future release without warning.
Enables automatic logging of Spark datasource paths, versions (if applicable), and formats when they are read. This method is not threadsafe and assumes a SparkSession already exists with the mlflow-spark JAR attached. It should be called on the Spark driver, not on the executors (i.e. do not call this method within a function parallelized by Spark). This API requires Spark 3.0 or above.
Datasource information is logged under the current active MLflow run. If no active run exists, datasource information is cached in memory & logged to the next-created active run (but not to successive runs). Note that autologging of Spark ML (MLlib) models is not currently supported via this API. Datasource-autologging is best-effort, meaning that if Spark is under heavy load or MLflow logging fails for any reason (e.g., if the MLflow server is unavailable), logging may be dropped.
For any unexpected issues with autologging, check Spark driver and executor logs in addition to stderr & stdout generated from your MLflow code - datasource information is pulled from Spark, so logs relevant to debugging may show up amongst the Spark logs.
import mlflow.spark from pyspark.sql import SparkSession # Create and persist some dummy data spark = (SparkSession.builder .config("spark.jars.packages", "org.mlflow.mlflow-spark") .getOrCreate()) df = spark.createDataFrame([ (4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")], ["id", "text"]) import tempfile tempdir = tempfile.mkdtemp() df.write.format("csv").save(tempdir) # Enable Spark datasource autologging. mlflow.spark.autolog() loaded_df = spark.read.format("csv").load(tempdir) # Call collect() to trigger a read of the Spark datasource. Datasource info # (path and format)is automatically logged to an MLflow run. loaded_df.collect() shutil.rmtree(tempdir) # clean up tempdir
-
mlflow.spark.
get_default_conda_env
()[source] - Returns
The default Conda environment for MLflow Models produced by calls to
save_model()
andlog_model()
. This Conda environment contains the current version of PySpark that is installed on the caller’s system.dev
versions of PySpark are replaced with stable versions in the resulting Conda environment (e.g., if you are running PySpark version2.4.5.dev0
, invoking this method produces a Conda environment with a dependency on PySpark version2.4.5
).
-
mlflow.spark.
load_model
(model_uri, dfs_tmpdir=None)[source] Load the Spark MLlib model from the path.
- Parameters
model_uri –
The location, in URI format, of the MLflow model, for example:
/Users/me/path/to/local/model
relative/path/to/local/model
s3://my_bucket/path/to/model
runs:/<mlflow_run_id>/run-relative/path/to/model
models:/<model_name>/<model_version>
models:/<model_name>/<stage>
For more information about supported URI schemes, see Referencing Artifacts.
dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is loaded from this destination. Defaults to
/tmp/mlflow
.
- Returns
pyspark.ml.pipeline.PipelineModel
from mlflow import spark model = mlflow.spark.load_model("spark-model") # Prepare test documents, which are unlabeled (id, text) tuples. test = spark.createDataFrame([ (4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")], ["id", "text"]) # Make predictions on test documents prediction = model.transform(test)
-
mlflow.spark.
log_model
(spark_model, artifact_path, conda_env=None, dfs_tmpdir=None, sample_input=None, registered_model_name=None)[source] Log a Spark MLlib model as an MLflow artifact for the current run. This uses the MLlib persistence format and produces an MLflow Model with the Spark flavor.
Note: If no run is active, it will instantiate a run to obtain a run_id.
- Parameters
spark_model – Spark model to be saved - MLflow can only save descendants of pyspark.ml.Model which implement MLReadable and MLWritable.
artifact_path – Run relative artifact path.
conda_env –
Either a dictionary representation of a Conda environment or the path to a Conda environment yaml file. If provided, this decsribes the environment this model should be run in. At minimum, it should specify the dependencies contained in
get_default_conda_env()
. If None, the defaultget_default_conda_env()
environment is added to the model. The following is an example dictionary representation of a Conda environment:{ 'name': 'mlflow-env', 'channels': ['defaults'], 'dependencies': [ 'python=3.7.0', 'pyspark=2.3.0' ] }
dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is written in this destination and then copied into the model’s artifact directory. This is necessary as Spark ML models read from and write to DFS if running on a cluster. If this operation completes successfully, all temporary files created on the DFS are removed. Defaults to
/tmp/mlflow
.sample_input – A sample input used to add the MLeap flavor to the model. This must be a PySpark DataFrame that the model can evaluate. If
sample_input
isNone
, the MLeap flavor is not added.registered_model_name – Note:: Experimental: This argument may change or be removed in a future release without warning. If given, create a model version under
registered_model_name
, also creating a registered model if one with the given name does not exist.
from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer training = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0) ], ["id", "text", "label"]) tokenizer = Tokenizer(inputCol="text", outputCol="words") hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features") lr = LogisticRegression(maxIter=10, regParam=0.001) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) model = pipeline.fit(training) mlflow.spark.log_model(model, "spark-model")
-
mlflow.spark.
save_model
(spark_model, path, mlflow_model=<mlflow.models.Model object>, conda_env=None, dfs_tmpdir=None, sample_input=None)[source] Save a Spark MLlib Model to a local path.
By default, this function saves models using the Spark MLlib persistence mechanism. Additionally, if a sample input is specified using the
sample_input
parameter, the model is also serialized in MLeap format and the MLeap flavor is added.- Parameters
spark_model – Spark model to be saved - MLflow can only save descendants of pyspark.ml.Model which implement MLReadable and MLWritable.
path – Local path where the model is to be saved.
mlflow_model – MLflow model config this flavor is being added to.
conda_env –
Either a dictionary representation of a Conda environment or the path to a Conda environment yaml file. If provided, this decsribes the environment this model should be run in. At minimum, it should specify the dependencies contained in
get_default_conda_env()
. If None, the defaultget_default_conda_env()
environment is added to the model. The following is an example dictionary representation of a Conda environment:{ 'name': 'mlflow-env', 'channels': ['defaults'], 'dependencies': [ 'python=3.7.0', 'pyspark=2.3.0' ] }
dfs_tmpdir – Temporary directory path on Distributed (Hadoop) File System (DFS) or local filesystem if running in local mode. The model is be written in this destination and then copied to the requested local path. This is necessary as Spark ML models read from and write to DFS if running on a cluster. All temporary files created on the DFS are removed if this operation completes successfully. Defaults to
/tmp/mlflow
.sample_input – A sample input that is used to add the MLeap flavor to the model. This must be a PySpark DataFrame that the model can evaluate. If
sample_input
isNone
, the MLeap flavor is not added.
from mlflow import spark from pyspark.ml.pipeline.PipelineModel # your pyspark.ml.pipeline.PipelineModel type model = ... mlflow.spark.save_model(model, "spark-model")