Integrate with Ray¶
Ray is an open-source project that makes it simple to scale any compute-intensive Python workload — from deep learning to production model serving.
Instrument your runs with Comet to start managing experiments, create dataset versions and track hyperparameters for faster and easier reproducibility and collaboration.
Ray Train¶
Ray Train scales model training for popular ML frameworks such as Torch, XGBoost, TensorFlow, and more. It seamlessly integrates with other Ray libraries such as Tune and Predictors.
Comet integrates with Ray Train by allowing you to easily monitor the resource usage of all of your workers, making sure you are fully using your expensive GPUs and that your CPUs are not the bottleneck in your training.
Connect Comet to your existing code by adding the following lines of code to your script or notebook:
import comet_ml.integration.ray
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig, RunConfig
def train_func(config):
from comet_ml.integration.ray import comet_worker_logger
with comet_worker_logger(config) as experiment:
# Your distributed training code here
...
experiment.log_metric("accuracy", accuracy)
config = {"epochs": 42}
callback = comet_ml.integration.ray.CometTrainLoggerCallback(
config, api_key="<Your API Key>", project_name="<Your Project Name>"
)
trainer = TorchTrainer(
train_loop_per_worker=train_func,
train_loop_config=config,
scaling_config=ScalingConfig(num_workers=4, use_gpu=use_gpu),
run_config=RunConfig(callbacks=[callback]),
)
results = trainer.fit()
Note
There are other ways to configure Comet. See more here.
Log automatically¶
The Ray Train integration requires to use both the CometTrainLoggerCallback
in the driver script and comet_worker_logger
in each worker node.
On the driver script¶
On the driver script, the callback will log all environment details that are normally logged except for the system metrics.
Note
If you are using the Ray Tune Comet Callback CometLoggerCallback
you need to replace it with the new Ray Train Comet Callback comet_ml.integration.ray.CometTrainLoggerCallback
which is fully compatible except for the following two limitations:
- It can only be used with Ray Train, not Ray Tune.
- It won't create Offline experiments, only Online experiments.
You can easily turn the automatic logging on and off for any or all items. See Configure Comet for Ray Train for more details.
On each worker node¶
On each worker node, the system metrics are gonna be logged automatically and will appear on the UI with the worker rank as a prefix. The Ray Train integration is also integrated with Ray's reporting mechanism so you can continue logging data through it. The comet_worker_logger
context manager returns an Experiment object which you can use to log additional data manually.
You can easily turn the automatic logging on and off for any or all items. See Configure Comet for Ray Train for more details.
Note
Don't see what you need to log here? We have your back. You can manually log any kind of data to Comet using the Experiment object. For example, use experiment.log_image to log images, or experiment.log_audio to log audio.
End-to-end example¶
Following is a basic example of using Comet with Ray Train.
If you can't wait, check out the results of this example Ray Train experiment for a preview of what's to come.
Install dependencies¶
python -m pip install "comet_ml>=3.44.0" "ray[air]>=2.1.0" tensorflow
Run the example¶
# This example showcases how to use Tensorflow with Ray Train.
# Original code:
# https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
import atexit
import argparse
import json
import os
import comet_ml
import comet_ml.integration.ray
import numpy as np
import ray
from ray.air.config import RunConfig, ScalingConfig
from ray.air.integrations.keras import Callback as TrainCheckpointReportCallback
from ray.air.result import Result
from ray.train.tensorflow import TensorflowTrainer
import tensorflow as tf
comet_ml.login()
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = (
tf.data.Dataset.from_tensor_slices((x_train, y_train))
.shuffle(60000)
.repeat()
.batch(batch_size)
)
return train_dataset
def build_and_compile_cnn_model(config):
learning_rate = config.get("lr", 0.001)
model = tf.keras.Sequential(
[
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation="relu"),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation="relu"),
tf.keras.layers.Dense(10),
]
)
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
metrics=["accuracy"],
)
return model
def train_func(config: dict):
from comet_ml.integration.ray import comet_worker_logger
from ray.air import session
per_worker_batch_size = config.get("batch_size", 64)
epochs = config.get("epochs", 3)
steps_per_epoch = config.get("steps_per_epoch", 70)
with comet_worker_logger(config) as experiment:
tf_config = json.loads(os.environ["TF_CONFIG"])
num_workers = len(tf_config["cluster"]["worker"])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = build_cnn_model()
learning_rate = config.get("lr", 0.001)
multi_worker_model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
metrics=["accuracy"],
)
callbacks = []
if session.get_world_rank() == 0:
callbacks.append(experiment.get_callback("tf-keras"))
history = multi_worker_model.fit(
multi_worker_dataset,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
callbacks=callbacks,
)
results = history.history
return results
def train_tensorflow_mnist(
num_workers: int = 2, use_gpu: bool = False, epochs: int = 4
) -> Result:
config = {"lr": 1e-3, "batch_size": 64, "epochs": epochs}
callback = comet_ml.integration.ray.CometTrainLoggerCallback(
config, project_name="comet-example-ray-train-keras"
)
trainer = TensorflowTrainer(
train_loop_per_worker=train_func,
train_loop_config=config,
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
run_config=RunConfig(callbacks=[callback]),
)
results = trainer.fit()
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--address", required=False, type=str, help="the address to use for Ray"
)
parser.add_argument(
"--num-workers",
"-n",
type=int,
default=2,
help="Sets number of workers for training.",
)
parser.add_argument(
"--use-gpu", action="store_true", default=False, help="Enables GPU training"
)
parser.add_argument(
"--epochs", type=int, default=10, help="Number of epochs to train for."
)
parser.add_argument(
"--smoke-test",
action="store_true",
default=False,
help="Finish quickly for testing.",
)
args, _ = parser.parse_known_args()
import ray
if args.smoke_test:
ray.init(num_cpus=4)
train_tensorflow_mnist()
else:
ray.init(address=args.address)
train_tensorflow_mnist(
num_workers=args.num_workers, use_gpu=args.use_gpu, epochs=args.epochs
)
Try it out!¶
Don't just take our word for it, try it out for yourself.
- For more examples using Ray Train, see our examples GitHub repository.
- Run the end-to-end example above in Colab:
Configure Comet for Ray Train¶
You can control which items are logged automatically.
On the driver script, you can pass additional Experiment arguments to the callback, for example here is how you can use a different project and disable logging the command line arguments as hyper-parameters:
callback = comet_ml.integration.ray.CometTrainLoggerCallback(
config, project_name="scoring-model", parse_args=False
)
On each worker node, you can also pass Experiment parameters to the context manager, for example here is how you can change the reporting frequency for metrics:
with comet_worker_logger(config, auto_metric_step_rate=1) as experiment:
# Your code here
For more information about configuring Comet, see Configure Comet.
Ray Tune¶
Ray Tune is a Python library for experiment execution and hyperparameter tuning at any scale.
import comet_ml
from ray import tune
from ray.air.integrations.comet import CometLoggerCallback
# Your code here...
tune.run(
...
callbacks=[
CometLoggerCallback()
])
Log automatically¶
Following integration, Comet automatically logs the following items from Ray:
- Hyperparameters
- Metrics
When using the CometLoggerCallback
with tune.run
, Comet automatically logs parameters from the Ray config
and metrics reported to Ray through tune.report()
When using rllib
, the callback also logs episode-level metrics to Comet as curves.
End-to-end example¶
Install dependencies¶
python -m pip install -U "ray[tune]" comet_ml
Run the example¶
import argparse
import comet_ml
import numpy as np
from ray import tune
from ray.air.integrations.comet import CometLoggerCallback
def train_function(config, checkpoint_dir=None):
for i in range(30):
loss = config["mean"] + config["sd"] * np.random.randn()
tune.report(loss=loss)
def tune_function():
analysis = tune.run(
train_function,
name="comet-ray-demo",
metric="loss",
mode="min",
callbacks=[CometLoggerCallback(tags=["my-trial"])],
config={"mean": tune.grid_search([1, 2, 3]), "sd": tune.uniform(0.2, 0.8)},
)
return analysis.best_config
best_config = tune_function()
Try it out!¶
Here's an example Colab Notebook for using Comet with Ray.
Configure Comet for Ray Tune¶
You can configure the CometLoggerCallback
by passing the same configuration arguments as you would to the Experiment
object. Learn more about CometLoggerCallback
in the Ray documentation.