Build your Notebook
This notebook requires Kaptain SDK 1.3.x or later.
Kaptain supports training a model in one cluster, uploading the model and artifacts to a shared object storage (such as S3), and deploying in another cluster. In this notebook, you will go through building a simple model based off the MNIST dataset and uploading that model and training state to S3, so you can deploy that model in another cluster.
What You Need
Ensure you go through all steps included in the prerequisites page:
Create a Docker
secret
and an AWS credentialssecret
.Create a
PodDefault
configuration referencing the created secrets.Launch a Jupyter notebook server with said
PodDefault
configuration.
You will be able to open this notebook after launching the notebook server.
Ensure You Are Ready to Start
Before proceeding, let's verify that the notebook server was configured and launched correctly:
Ensure you are using the correct notebook image, by verifying that TensorFlow is available:
%%sh pip list | grep tensorflow
CODEOutput:
tensorflow 2.9.1 tensorflow-datasets 4.5.2 tensorflow-estimator 2.9.0 tensorflow-io-gcs-filesystem 0.27.0 tensorflow-metadata 1.10.0
CODEEnsure that the docker
secret
is mounted. You should not see an error:%%sh ls -la ~/.docker/config.json
CODEOutput:
lrwxrwxrwx 1 root istio 18 Oct 6 07:45 /home/kubeflow/.docker/config.json -> ..data/config.json
CODEVerify that the AWS environment variables are set. You should see
AWS_ACCESS_KEY_ID
,AWS_REGION
, andAWS_SECRET_ACCESS_KEY
:%%sh set | egrep ^AWS_ | cut -f 1 -d '='
CODEOutput:
AWS_ACCESS_KEY_ID AWS_REGION AWS_SECRET_ACCESS_KEY
CODE
Adapt the Model Code
To use the Kaptain SDK, you need to add two lines of code to the original model code:
One right after the model training (here: Keras'
fit
method), to save the trained model to the configured object storage, S3.Another right after the model evaluation (here: Keras'
evaluate
method), to record the metrics of interest.%%writefile trainer.py import argparse import logging import tensorflow as tf import tensorflow_datasets as tfds from kaptain.platform.model_export_util import ModelExportUtil from kaptain.platform.metadata_util import MetadataUtil logging.getLogger().setLevel(logging.INFO) def get_datasets(buffer_size): datasets, ds_info = tfds.load(name="mnist", data_dir="datasets", download=False, with_info=True, as_supervised=True) mnist_train, mnist_test = datasets["train"], datasets["test"] def scale(image, label): image = tf.cast(image, tf.float32) / 255.0 return image, label train_dataset = mnist_train.map(scale).cache().shuffle(buffer_size).repeat() test_dataset = mnist_test.map(scale) return train_dataset, test_dataset def compile_model(args): model = tf.keras.Sequential( [ tf.keras.layers.Conv2D(32, 3, activation="relu", input_shape=(28, 28, 1)), tf.keras.layers.MaxPooling2D(), tf.keras.layers.Flatten(), tf.keras.layers.Dense(64, activation="relu"), tf.keras.layers.Dense(10, activation="softmax"), ] ) model.compile( loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.SGD( learning_rate=args.learning_rate, momentum=args.momentum ), metrics=["accuracy"], ) return model def main(): parser = argparse.ArgumentParser(description="TensorFlow MNIST Trainer") parser.add_argument( "--batch-size", type=int, default=64, metavar="N", help="Batch size for training (default: 64)", ) parser.add_argument( "--buffer-size", type=int, default=10000, metavar="N", help="Number of training examples to buffer before shuffling" "default: 10000)", ) parser.add_argument( "--epochs", type=int, default=5, metavar="N", help="Number of epochs to train (default: 5)", ) parser.add_argument( "--steps", type=int, default=10, metavar="N", help="Number of batches to train the model on in each epoch (default: 10)", ) parser.add_argument( "--learning-rate", type=float, default=0.5, metavar="N", help="Learning rate (default: 0.5)", ) parser.add_argument( "--momentum", type=float, default=0.1, metavar="N", help="Accelerates SGD in the relevant direction and dampens oscillations (default: 0.1)", ) args, _ = parser.parse_known_args() strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() logging.debug(f"num_replicas_in_sync: {strategy.num_replicas_in_sync}") global_batch_size = args.batch_size * strategy.num_replicas_in_sync train_dataset, test_dataset = get_datasets(buffer_size=args.buffer_size) train_dataset = train_dataset.batch(batch_size=global_batch_size) test_dataset = test_dataset.batch(batch_size=global_batch_size) dataset_options = tf.data.Options() dataset_options.experimental_distribute.auto_shard_policy = ( tf.data.experimental.AutoShardPolicy.DATA ) train_datasets_sharded = train_dataset.with_options(dataset_options) test_dataset_sharded = test_dataset.with_options(dataset_options) with strategy.scope(): model = compile_model(args=args) # Train the model model.fit(train_datasets_sharded, epochs=args.epochs, steps_per_epoch=args.steps) # Save the trained model with the Kaptain SDK exporter utility model.save("mnist") ModelExportUtil().upload_model("mnist") eval_loss, eval_acc = model.evaluate(test_dataset_sharded, verbose=0, steps=args.steps) # Record the evaluation metrics for use with the hyperparameter tuner MetadataUtil.record_metrics({"loss": eval_loss, "accuracy": eval_acc}) if __name__ == "__main__": main()
CODE
Define the Model
The central abstraction of the Kaptain SDK is a model. The model contains configuration about how the model is built, and also contains state about what was built and where files were uploaded.
extra_files = ["datasets/mnist"]
base_image = "mesosphere/kubeflow:2.2.0-tensorflow-2.9.1"
# Replace with your docker repository and a tag (optional), e.g. "repository/image" or "repository/image:tag"
image_name = "mesosphere/kubeflow:mnist-sdk-example"
# Use the name of the file with additional python packages to install into model image (e.g. "requirements.txt")
requirements = None
from kaptain.model.models import Model
from kaptain.model.frameworks import ModelFramework
from kaptain.config import Config
from kaptain.platform.config.s3 import S3ConfigurationProvider
from kaptain.platform.config.docker import DockerConfigurationProvider
config = Config(
docker_config_provider=DockerConfigurationProvider.default(),
storage_config_provider=S3ConfigurationProvider.from_env(),
)
model = Model(
id="dev/mnist",
name="MNIST",
description="MNIST Model",
version="0.0.1",
framework=ModelFramework.TENSORFLOW,
framework_version="2.8.0",
main_file="trainer.py",
extra_files=extra_files,
image_name=image_name,
base_image=base_image,
requirements=requirements,
config=config,
)
The id
is a unique identifier of the model. The identifier shown indicates it is an MNIST model in development.
The fields member
and description
are for humans: to inform your colleagues and yourself of what the model is about. version
is the models' own version, so it is easy to identify models by their iteration. The framework
and framework_version
make the time metadata human-readable.
Since a Docker image is built in the background when you train
or tune
a Model
instance, you must provide a base_image
. Provide the name of the final image image_name
with or without an image tag. If the tag is omitted, a concatenation of model id
, framework
, and framework_version
is used.
The main_file
specifies the name of file that contains the model code, that is, trainer.py
for the purposes of this tutorial.
To specify additional Python packages required for training or serving, provide the path to your requirements file via the requirements
parameter of the Model
class. You can find details on the format of the requirements file in the pip official documentation.
Refer to ?Model
for more details.
Train the Model
workers = 2
gpus = 0
memory = "5G"
cpu = "1"
model.train(
workers=workers,
cpu=cpu,
memory=memory,
gpus=gpus,
hyperparameters={"--steps": 10, "--epochs": 5},
args={}, # additional command line arguments for the training job.
)
[I 221006 06:22:58 image_builder:80] Skipping image build for the model - the image 'mesosphere/kubeflow:mnist-sdk-example' with the same contents has already been published to the registry.
[I 221006 06:22:58 job_runner:132] Submitting a new training job "mnist-tfjob-65818540".
[I 221006 06:22:58 job_runner:58] Waiting for the training job to complete...
[I 221006 06:23:02 kubernetes:268] Waiting for Master Node Training Model to start...
[I 221006 06:23:05 kubernetes:274] Master Node Training Model started in pod: mnist-tfjob-65818540-chief-0.
10/10 [==============================] - 4s 89ms/step - loss: 2.1121 - accuracy: 0.3289
[I 221006 06:23:17 kubernetes:334] [mnist-tfjob-65818540-chief-0/tensorflow] logs:
Epoch 2/5
10/10 [==============================] - 1s 97ms/step - loss: 2.2689 - accuracy: 0.3844
Epoch 3/5
10/10 [==============================] - 1s 97ms/step - loss: 2.0894 - accuracy: 0.2984
Epoch 4/5
10/10 [==============================] - 1s 79ms/step - loss: 1.9951 - accuracy: 0.3484
[I 221006 06:23:22 kubernetes:334] [mnist-tfjob-65818540-chief-0/tensorflow] logs:
Epoch 5/5
10/10 [==============================] - 1s 100ms/step - loss: 1.2166 - accuracy: 0.6273
[I 221006 06:23:54 models:418] Model training is completed.
[I 221006 06:23:54 model_util:48] Saved model to /tmp/tmprqsm360t
[I 221006 06:23:55 model_util:53] Model uploaded to s3://kaptain/models/dev/mnist/trained/b69dc6f6e3c246858cf43a1eba8be5f5/0001/.state.
[I 221006 06:23:55 model_util:54] Model state saved.
The output should be:
True
If you set resource quotas for a namespace, users have to specify cpu
and memory
explicitly in the SDK. Otherwise, tasks such as training and tuning will fail with Error creating: pods ... is forbidden: failed quota: kf-resource-quota: must specify cpu,memory
. These fields are optional when resource quotas are not set. In case the issue appears for other types of workloads, we recommended you configure defaults for the user namespace using the Limit Range.
The low accuracy of the model is to make the demonstration of distributed training quicker, as in the next section the model's hyperparameters are optimized anyway.
Save Model State to S3
Saving the model state to S3 allows it to be imported to another cluster, so that the cluster can access the current training configuration, version information, and other necessary metadata.
model.save_as_json()
Output:
[I 221006 06:26:06 model_util:48] Saved model to /tmp/tmpkd7d5spr
[I 221006 06:26:07 model_util:53] Model uploaded to s3://kaptain/models/dev/mnist/trained/b69dc6f6e3c246858cf43a1eba8be5f5/0001/.state.
[I 221006 06:26:07 model_util:54] Model state saved.
Verify that the Model is Exported to S3, along with save state
Run the following command to see a list of files including, but not limited to, saved_model.pb
and .state/model.json
:
from kaptain.platform.storage import storage_factory
storage_client = storage_factory.get_client(config.storage_config_provider)
storage_client.list(model.meta().saved_model_uri)
Output:
['s3://kaptain/models/dev/mnist/trained/b69dc6f6e3c246858cf43a1eba8be5f5/0001/.state/model.json',
's3://kaptain/models/dev/mnist/trained/b69dc6f6e3c246858cf43a1eba8be5f5/0001/keras_metadata.pb',
's3://kaptain/models/dev/mnist/trained/b69dc6f6e3c246858cf43a1eba8be5f5/0001/saved_model.pb',
's3://kaptain/models/dev/mnist/trained/b69dc6f6e3c246858cf43a1eba8be5f5/0001/variables/variables.data-00000-of-00001',
's3://kaptain/models/dev/mnist/trained/b69dc6f6e3c246858cf43a1eba8be5f5/0001/variables/variables.index']
Make Note of the Model URL, and Proceed to Deploy the Model on a New Cluster
You need this URL to load the model state in your target deployment cluster. Run the following command, and note the output:
model.meta().saved_model_uri
Output:
's3://kaptain/models/dev/mnist/trained/b69dc6f6e3c246858cf43a1eba8be5f5/0001'
Continue to Deploy your Notebook in your target model deployment cluster.