Source code for mlflow.gluon

import os

import gorilla
import mxnet as mx
import pandas as pd
import yaml
from mxnet import gluon
from mxnet import sym
from mxnet.gluon.contrib.estimator import Estimator, EpochEnd, TrainBegin, TrainEnd
from mxnet.gluon.nn import HybridSequential

import mlflow
from mlflow import pyfunc
from mlflow.exceptions import MlflowException
from mlflow.models import Model
from mlflow.tracking.artifact_utils import _download_artifact_from_uri
from mlflow.utils import experimental
from mlflow.utils.autologging_utils import try_mlflow_log
from mlflow.utils.environment import _mlflow_conda_env

FLAVOR_NAME = "gluon"
_MODEL_SAVE_PATH = "net"


[docs]@experimental def load_model(model_uri, ctx): """ Load a Gluon model from a local file or a run. :param model_uri: The location, in URI format, of the MLflow model. For example: - ``/Users/me/path/to/local/model`` - ``relative/path/to/local/model`` - ``s3://my_bucket/path/to/model`` - ``runs:/<mlflow_run_id>/run-relative/path/to/model`` - ``models:/<model_name>/<model_version>`` - ``models:/<model_name>/<stage>`` For more information about supported URI schemes, see `Referencing Artifacts <https://www.mlflow.org/docs/latest/concepts.html# artifact-locations>`_. :param ctx: Either CPU or GPU. :return: A Gluon model instance. .. code-block:: python :caption: Example # Load persisted model as a Gluon model, make inferences against an NDArray model = mlflow.gluon.load_model("runs:/" + gluon_random_data_run.info.run_id + "/model") model(nd.array(np.random.rand(1000, 1, 32))) """ local_model_path = _download_artifact_from_uri(artifact_uri=model_uri) model_arch_path = os.path.join(local_model_path, "data", _MODEL_SAVE_PATH) + "-symbol.json" model_params_path = os.path.join(local_model_path, "data", _MODEL_SAVE_PATH) + "-0000.params" symbol = sym.load(model_arch_path) inputs = sym.var('data', dtype='float32') net = gluon.SymbolBlock(symbol, inputs) net.collect_params().load(model_params_path, ctx) return net
class _GluonModelWrapper: def __init__(self, gluon_model): self.gluon_model = gluon_model def predict(self, df): """ :param df: A Pandas DataFrame containing input array values. A DataFrame input, `df` is converted to an MXNet ndarray via `ndarray = mx.nd.array(df.values)`. :return: A Pandas DataFrame containing output array values. The underlying MXNet array can be extracted from the output DataFrame as `ndarray = mx.nd.array(df.values)`. """ ndarray = mx.nd.array(df.values) return pd.DataFrame(self.gluon_model(ndarray).asnumpy()) def _load_pyfunc(path): """ Load PyFunc implementation. Called by ``pyfunc.load_pyfunc``. :param path: Local filesystem path to the MLflow Model with the ``gluon`` flavor. """ m = load_model(path, mx.current_context()) return _GluonModelWrapper(m)
[docs]@experimental def save_model(gluon_model, path, mlflow_model=Model(), conda_env=None): """ Save a Gluon model to a path on the local file system. :param gluon_model: Gluon model to be saved. Must be already hybridized. :param path: Local path where the model is to be saved. :param mlflow_model: MLflow model config this flavor is being added to. :param conda_env: Either a dictionary representation of a Conda environment or the path to a Conda environment yaml file. If provided, this decribes the environment this model should be run in. At minimum, it should specify the dependencies contained in :func:`get_default_conda_env()`. If ``None``, the default :func:`mlflow.gluon.get_default_conda_env()` environment is added to the model. The following is an *example* dictionary representation of a Conda environment:: { 'name': 'mlflow-env', 'channels': ['defaults'], 'dependencies': [ 'python=3.7.0', 'mxnet=1.5.0' ] } .. code-block:: python :caption: Example from mxnet.gluon import Trainer from mxnet.gluon.contrib import estimator from mxnet.gluon.loss import SoftmaxCrossEntropyLoss from mxnet.gluon.nn import HybridSequential from mxnet.metric import Accuracy import mlflow # Build, compile, and train your model gluon_model_path = ... net = HybridSequential() with net.name_scope(): ... net.hybridize() net.collect_params().initialize() softmax_loss = SoftmaxCrossEntropyLoss() trainer = Trainer(net.collect_params()) est = estimator.Estimator(net=net, loss=softmax_loss, metrics=Accuracy(), trainer=trainer) est.fit(train_data=train_data, epochs=100, val_data=validation_data) # Save the model as an MLflow Model mlflow.gluon.save_model(net, gluon_model_path) """ path = os.path.abspath(path) if os.path.exists(path): raise MlflowException("Path '{}' already exists".format(path)) data_subpath = "data" data_path = os.path.join(path, data_subpath) os.makedirs(data_path) # The epoch argument of the export method does not play any role in selecting # a specific epoch's paramaters, and is there only for display purposes. gluon_model.export(os.path.join(data_path, _MODEL_SAVE_PATH)) with open(os.path.join(path, "architecture.txt"), "w") as fp: fp.write(str(gluon_model)) conda_env_subpath = "conda.yaml" if conda_env is None: conda_env = get_default_conda_env() elif not isinstance(conda_env, dict): with open(conda_env, "r") as f: conda_env = yaml.safe_load(f) with open(os.path.join(path, conda_env_subpath), "w") as f: yaml.safe_dump(conda_env, stream=f, default_flow_style=False) pyfunc.add_to_model(mlflow_model, loader_module="mlflow.gluon", env=conda_env_subpath) mlflow_model.save(os.path.join(path, "MLmodel"))
[docs]def get_default_conda_env(): """ :return: The default Conda environment for MLflow Models produced by calls to :func:`save_model()` and :func:`log_model()`. """ pip_deps = ["mxnet=={}".format(mx.__version__)] return _mlflow_conda_env(additional_pip_deps=pip_deps)
[docs]@experimental def log_model(gluon_model, artifact_path, conda_env=None): """ Log a Gluon model as an MLflow artifact for the current run. :param gluon_model: Gluon model to be saved. Must be already hybridized. :param artifact_path: Run-relative artifact path. :param conda_env: Either a dictionary representation of a Conda environment or the path to a Conda environment yaml file. If provided, this decribes the environment this model should be run in. At minimum, it should specify the dependencies contained in :func:`get_default_conda_env()`. If ``None``, the default :func:`mlflow.gluon.get_default_conda_env()` environment is added to the model. The following is an *example* dictionary representation of a Conda environment:: { 'name': 'mlflow-env', 'channels': ['defaults'], 'dependencies': [ 'python=3.7.0', 'mxnet=1.5.0' ] } .. code-block:: python :caption: Example from mxnet.gluon import Trainer from mxnet.gluon.contrib import estimator from mxnet.gluon.loss import SoftmaxCrossEntropyLoss from mxnet.gluon.nn import HybridSequential from mxnet.metric import Accuracy import mlflow # Build, compile, and train your model net = HybridSequential() with net.name_scope(): ... net.hybridize() net.collect_params().initialize() softmax_loss = SoftmaxCrossEntropyLoss() trainer = Trainer(net.collect_params()) est = estimator.Estimator(net=net, loss=softmax_loss, metrics=Accuracy(), trainer=trainer) # Log metrics and log the model with mlflow.start_run(): est.fit(train_data=train_data, epochs=100, val_data=validation_data) mlflow.gluon.log_model(net, "model") """ Model.log(artifact_path=artifact_path, flavor=mlflow.gluon, gluon_model=gluon_model, conda_env=conda_env)
[docs]@experimental def autolog(): """ Enable automatic logging from Gluon to MLflow. Logs loss and any other metrics specified in the fit function, and optimizer data as parameters. Model checkpoints are logged as artifacts to a 'models' directory. """ class __MLflowGluonCallback(EpochEnd, TrainEnd, TrainBegin): def __init__(self): self.current_epoch = 0 def epoch_end(self, estimator, *args, **kwargs): logs = {} for metric in estimator.train_metrics: metric_name, metric_val = metric.get() logs[metric_name] = metric_val for metric in estimator.val_metrics: metric_name, metric_val = metric.get() logs[metric_name] = metric_val try_mlflow_log(mlflow.log_metrics, logs, step=self.current_epoch) self.current_epoch += 1 def train_begin(self, estimator, *args, **kwargs): try_mlflow_log(mlflow.log_param, "num_layers", len(estimator.net)) if estimator.max_epoch is not None: try_mlflow_log(mlflow.log_param, "epochs", estimator.max_epoch) if estimator.max_batch is not None: try_mlflow_log(mlflow.log_param, "batches", estimator.max_batch) try_mlflow_log(mlflow.log_param, "optimizer_name", type(estimator.trainer.optimizer).__name__) if hasattr(estimator.trainer.optimizer, "lr"): try_mlflow_log(mlflow.log_param, "learning_rate", estimator.trainer.optimizer.lr) if hasattr(estimator.trainer.optimizer, "epsilon"): try_mlflow_log(mlflow.log_param, "epsilon", estimator.trainer.optimizer.epsilon) def train_end(self, estimator, *args, **kwargs): if isinstance(estimator.net, HybridSequential): try_mlflow_log(log_model, estimator.net, artifact_path="model") @gorilla.patch(Estimator) def fit(self, *args, **kwargs): if not mlflow.active_run(): auto_end_run = True else: auto_end_run = False original = gorilla.get_original_attribute(Estimator, "fit") if len(args) >= 4: l = list(args) l[3] += [__MLflowGluonCallback()] args = tuple(l) elif "event_handlers" in kwargs: kwargs["event_handlers"] += [__MLflowGluonCallback()] else: kwargs["event_handlers"] = [__MLflowGluonCallback()] result = original(self, *args, **kwargs) if auto_end_run: mlflow.end_run() return result settings = gorilla.Settings(allow_hit=True, store_hit=True) gorilla.apply(gorilla.Patch(Estimator, "fit", fit, settings=settings))