Skip to content

Integrate with Metaflow¶

Metaflow is a human-friendly Python library that helps scientists and engineers build and manage real-life data science projects. Metaflow was originally developed at Netflix to boost productivity of data scientists who work on a wide variety of projects from classical statistics to state-of-the-art deep learning.

Open In Colab

Start logging¶

  1. Import Metaflow integration with from comet_ml.integration.metaflow import comet_flow at the top of your pipeline script.
  2. Annotate your Flow class with the @comet_flow decorator.
from comet_ml.integration.metaflow import comet_flow
from metaflow import FlowSpec, step


@comet_flow
class HelloFlow(FlowSpec):

    @step
    def start(self):
        self.next(self.end)

    @step
    def end(self):
        pass


if __name__ == "__main__":
    HelloFlow()

Visualize Metaflow runs¶

The Comet-Metaflow integration allows you to track both individual tasks and the state of the flow as a whole. The state of the flow can be visualized using the Metaflow Panel available in the Featured tab.

metaflow-flow-integration.png

You can recreate the view above by:

The Metaflow Flow can be used to either visualize the latest state of the DAG or the static graph, similar to what is available in the Metaflow UI. In addition, all tasks that have been tracked in Comet can be directly accessed by clicking on each task.

Log automatically¶

The Comet Metaflow integration creates several Experiments, one for the run itself and one for each task.

Run Experiment¶

The Run Experiment logs the following information automatically:

Item NameItem TypeDescription
{current_flow_name} - {current_run_id} -graph.jsonAssetThe Metaflow Flow graph exported as JSON
comet_run_idOtherThe Metaflow unique Run ID
metaflow_branch_nameOtherMetaflow branch name
metaflow_flow_nameOtherThe Metaflow Flow Name
metaflow_graph_fileOtherThe name of the Comet Experiment Asset containing the Metaflow Flow graph
metaflow_is_productionOtherMetafow is production flag
metaflow_is_user_branchOtherMetaflow is user branch flag
metaflow_project_flow_nameOtherMetaflow full project name
metaflow_project_nameOtherThe user-provided Metaflow project name
metaflow_run_idOtherThe Metaflow unique Run ID
metaflow_statusOtherThe status of the Metaflow Flow, can be Running, Completed or Failed
pipeline_typeOtherInternal field used to distinguish between integrations

In addition, the run Experiment is tagged with run and all tags set on the flow itself. For more details on tagging, see Metaflow documentation.

Metaflow Parameters are saved as hyperparameters automatically.

You can access the Comet Experiment key for the current run, using self.run_comet_experiment_key. See the example below.

    @step
    def evaluate(self):
        from comet_ml import API
        from sklearn.metrics import accuracy_score

        accuracy = accuracy_score(self.prediction, self.Y_test)

        run_experiment = API().get_experiment_by_key(self.run_comet_experiment_key)
        run_experiment.log_metric("Run accuracy", accuracy)

Task Experiment¶

By default, the Comet integration creates a Comet Experiment for each step. If a step is executed with a foreach loop, the Comet integration will create a Comet Experiment for each parallel execution of that step. See below how to skip creating a Comet Experiment for some of your steps.

All of those Comet Experiment will logs the following information automatically:

Item nameItem typeDescription
comet_run_idOtherThe Metaflow unique Run ID
comet_step_idOtherThe current Metaflow step ID
comet_task_idOtherThe current Metaflow task ID
metaflow_branch_nameOtherMetaflow branch name
metaflow_flow_nameOtherThe Metaflow Flow Name
metaflow_is_productionOtherMetafow is production flag
metaflow_is_user_branchOtherMetaflow is user branch flag
metaflow_origin_run_idOtherMetaflow Run ID of the original run when a flow is resumed
metaflow_project_flow_nameOtherMetaflow full project name
metaflow_project_nameOtherThe user-provided Metaflow project name
metaflow_run_experimentOtherThe Comet Experiment Id for the Run Experiment
metaflow_run_idOtherThe Metaflow unique Run ID
metaflow_statusOtherThe status of the Metaflow step, can be Running, Completed or Failed
metaflow_step_nameOtherThe name of the Metaflow step
metaflow-card-{step_name}-{card_number}.htmlHTML AssetEach metaflow card is logged as a separate HTML Asset
pipeline_typeOtherInternal field used to distinguish between integrations

In addition, each task Experiment is tagged with task, with the step name and all tags sets on the flow itself. For more details on tagging, see Metaflow documentation.

Metaflow Parameters are saved as hyperparameters automatically.

You can access the Comet experiment inside each task using self.comet_experiment. See the example below.

    @step
    def fit(self):
        # Import model
        from sklearn.naive_bayes import GaussianNB

        model = GaussianNB()

        model.fit(self.X_train, self.Y_train)

        self.prediction = model.predict(self.X_test)

        self.comet_experiment.log_confusion_matrix(self.Y_test, self.prediction)

Metaflow Cards¶

The Metaflow integration automatically logs Metaflow cards for each Task. Metaflow cards are exported to HTML and are logged as assets to the corresponding Comet Experiment.

To visualize them, you can use the HTML Asset Viewer featured panel at both the single Experiment and project level.

metaflow-card-viewer.png

End-to-end example¶

The following is a basic example of using Comet with Metaflow.

If you can't wait, check out the results of this example Metaflow project for a preview of what's to come.

Install dependencies¶

python -m pip install "comet_ml>=3.44.0" metaflow

Run the example¶

Run the following Metaflow example with: python helloworld.py run.

# coding: utf-8

from comet_ml import login
from comet_ml.integration.metaflow import comet_flow

from metaflow import FlowSpec, step

# Login to Comet if needed
login()


@comet_flow(project_name="comet-example-metaflow-hello-world")
class HelloFlow(FlowSpec):
    """
    A flow where Metaflow prints 'Hi'.

    Run this flow to validate that Metaflow is installed correctly.

    """

    @step
    def start(self):
        """
        This is the 'start' step. All flows must have a step named 'start' that
        is the first step in the flow.

        """
        print("HelloFlow is starting.")
        self.next(self.hello)

    @step
    def hello(self):
        """
        A step for metaflow to introduce itself.

        """
        print("Metaflow says: Hi!")
        self.next(self.end)

    @step
    def end(self):
        """
        This is the 'end' step. All flows must have an 'end' step, which is the
        last step in the flow.

        """
        print("HelloFlow is all done.")


if __name__ == "__main__":
    HelloFlow()

Try it out!¶

Don't just take our word for it, try it out for yourself.

Skip Metaflow steps¶

If your flow contains a lot of steps or a loop with a lot of iterations and the resulting experiments for those steps have no or little value, you can skip those steps. If a step is skipped, the Comet integration won't create a live Comet experiment.

To do that, apply the decorator @comet_ml.integration.metaflow.comet_skip to the Metaflow step you want to skip. Here is an example where the iteration step won't create 20 Comet experiments:

from comet_ml.integration.metaflow import comet_flow, comet_skip

@comet_flow(project_name="metaflow-loop-acceptance-test")
class LoopFlow(FlowSpec):

    @step
    def start(self):
        self.values = list(range(20))
        self.next(self.iteration, foreach="values")

    @comet_skip
    @step
    def iteration(self):
        print("Called with %d" % self.input)
        self.double = self.input * 2
        self.next(self.join)

    @step
    def join(self, inputs):
        self.results = [input.double for input in inputs]
        self.next(self.end)

    @step
    def end(self):
        print('\n'.join([str(x) for x in self.results]))

Warning

When a step is skipped, a Comet experiment is still accessible through self.comet_experiment. Calls to logging methods (like self.comet_experiment.log_metrics) will continue to work but all data will be discarded.

Configure Comet for Metaflow¶

The comet_flow decorator can be called, either without any arguments or with the following arguments:

Argument NameArgument TypeDescription
project_nameStringThe Comet Project name to use.
workspaceStringThe Comet Workspace name to use.
# TODO
experiment = comet_ml.start(
    log_graph=True, # Can be True or False.
    auto_metric_logging=True # Can be True or False
)

Add or remove these fields from your .comet.config file under the [comet_auto_log] section to enable or disable logging.

[comet_auto_log]
graph=true # can be true or false
metrics=true # can be true or false
export COMET_AUTO_LOG_GRAPH=true # Can be true or false
export COMET_AUTO_LOG_METRICS=true # Can be true or false

For more details on the arguments, see Experiment.__init__.

These arguments will impact all Experiments created by the integration, the run Experiment and all of the task Experiments.

Nov. 18, 2024