Skip to content

comet_mpm.CometMPM ΒΆ

CometMPM(
    api_key: Optional[str] = None,
    workspace_name: Optional[str] = None,
    model_name: Optional[str] = None,
    model_version: Optional[str] = None,
    disabled: Optional[bool] = None,
    asyncio: bool = False,
    max_batch_size: Optional[int] = None,
    max_batch_time: Optional[int] = None,
)

The Comet MPM class is used to upload a model's input and output features to MPM

Creates the Comet MPM Event logger object.

Parameters:

  • api_key (Optional[str], default: None ) –

    The Comet API Key

  • workspace_name (Optional[str], default: None ) –

    The Comet Workspace Name of the model

  • model_name (Optional[str], default: None ) –

    The Comet Model Name of the model

  • model_version (Optional[str], default: None ) –

    The Comet Model Version of the model

  • disabled (Optional[bool], default: None ) –

    If set to True, CometMPM will not send anything to the backend.

  • asyncio (bool, default: False ) –

    Set to True if you are using an Asyncio-based framework like FastAPI.

  • max_batch_size (Optional[int], default: None ) –

    Maximum number of MPM events sent in a batch, can also be configured using the environment variable MPM_MAX_BATCH_SIZE.

  • max_batch_time (Optional[int], default: None ) –

    Maximum time before a batch of events is submitted to MPM, can also be configured using the environment variable MPM_MAX_BATCH_SIZE.

FunctionsΒΆ

connect ΒΆ

connect() -> Optional[Awaitable[None]]

When using CometMPM in asyncio mode, this coroutine needs to be awaited at the server start.

end ΒΆ

end(timeout: Optional[int] = None) -> Optional[Awaitable[None]]

Ensure that all data has been sent to Comet and close the MPM object. After that, no data can be logged anymore. Waits for up to 30 seconds if timeout is not set.

join ΒΆ

join(timeout: Optional[int] = None) -> Optional[Awaitable[None]]

MPM.join is deprecated, use MPM.end instead.

log_dataframe ΒΆ

log_dataframe(
    dataframe,
    prediction_id_column: str,
    feature_columns: Optional[List[str]] = None,
    output_value_column: Optional[str] = None,
    output_probability_column: Optional[str] = None,
    output_features_columns: Optional[List[str]] = None,
    labels_columns: Optional[List[str]] = None,
    timestamp_column: Optional[str] = None,
) -> LogEventsResult

This function logs each row of a Pandas DataFrame as an MPM event. The events are structured as described in the log_event method, so please refer to it for full context.

Parameters:

  • dataframe –

    The Pandas DataFrame to be logged.

  • prediction_id_column (str) –

    This column should contain the prediction_id values for the events.

  • feature_columns (Optional[List[str]], default: None ) –

    If provided, these columns will be used as the input_features for the events.

  • output_features_columns (Optional[List[str]], default: None ) –

    If provided, these columns will be used as the output_features for the events.

  • output_value_column (Optional[str], default: None ) –

    Deprecated, please use the output_features_column field instead. If provided, this column will be used as the output_value for the events.

  • output_probability_column (Optional[str], default: None ) –

    Deprecated, please use the output_features_column field instead. If provided, this column will be used as the output_probability for the events.

  • labels_columns (Optional[List[str]], default: None ) –

    If provided, these columns will be used as the labels for the events.

  • timestamp_column (Optional[str], default: None ) –

    If provided, this column will be used as the timestamp (seconds since epoch start in UTC timezone) for the events.

log_event ΒΆ

log_event(
    prediction_id: str,
    input_features: Optional[Dict[str, Any]] = None,
    output_value: Optional[Any] = None,
    output_probability: Optional[Any] = None,
    output_features: Optional[Dict[str, Any]] = None,
    labels: Optional[Dict[str, Union[int, float, bool, str]]] = None,
    timestamp: Optional[float] = None,
) -> Optional[Awaitable[None]]

Asynchronously log a single event to MPM. Events are identified by the mandatory prediction_id parameter. If you send multiple events with the same prediction_id, the Comet platform with automatically reject the duplicate events.

Parameters:

  • prediction_id (str) –

    The unique prediction ID. It can be provided by the framework, you, or a random unique value such as str(uuid4()).

  • input_features (Optional[Dict[str, Any]], default: None ) –

    If provided, it must be a flat dictionary where the keys are the feature names, and the values are native Python scalars, such as integers, floats, booleans, or strings. For example: {"age": 42, "income": 42894.89}.

  • output_value (Optional[Any], default: None ) –

    The prediction as a native Python scalar, such as an integer, float, boolean, or string.

  • output_probability (Optional[Any], default: None ) –

    If provided, it must be a float between 0 and 1, indicating the model's confidence in the prediction.

  • output_features (Optional[Dict[str, Any]], default: None ) –

    A dictionary of output features.

  • labels (Optional[Dict[str, Union[int, float, bool, str]]], default: None ) –

    If provided, it must be a flat dictionary where the keys are the label names, and the values are native Python scalars, such as integers, floats, booleans, or strings. For example: {"person": 2, "bicycle": 1, "car": 3}.

  • timestamp (Optional[float], default: None ) –

    An optional timestamp to associate with the event (seconds since epoch in UTC timezone). If not provided, the current time will be used.

log_label ΒΆ

log_label(
    prediction_id: str,
    label: Optional[Any] = None,
    labels: Optional[Dict[str, Union[int, float, bool, str]]] = None,
    timestamp: Optional[float] = None,
) -> Optional[Awaitable[None]]

Send an MPM event containing the ground truth value for a prediction whose input and output features are already stored in Comet.

Parameters:

  • prediction_id (str) –

    The unique prediction ID

  • label (Optional[Any], default: None ) –

    Deprecated, please use the labels instead. If provided, this value will be used put as 'value' within the labels.

  • labels (Optional[Dict[str, Union[int, float, bool, str]]], default: None ) –

    The ground truth values for the prediction. It must be a flat dictionary where the keys are the label names, and the values are native Python scalars, such as integers, floats, booleans, or strings. For example: {"person": 2, "bicycle": 1, "car": 3}.

  • timestamp (Optional[float], default: None ) –

    An optional timestamp to associate with the label (seconds since epoch in UTC timezone). If not provided, the current time will be used.

upload_dataset_csv ΒΆ

upload_dataset_csv(
    file_path: str,
    dataset_type: str,
    dataset_name: Optional[str] = None,
    na_values: Optional[str] = None,
    keep_default_na_values: Optional[str] = None,
) -> None

Uploads dataset from a local CSV file to the backend, streaming the data line by line.

This method facilitates the creation or updating of a dataset with data streamed from a specified CSV file. The uploaded data is used as a reference for detecting drift in a production model within the same workspace. Each line of the CSV file is sent as an event to the dataset, allowing for incremental updates.

Parameters:

  • file_path (str) –

    The path to the local CSV file whose data is to be streamed to the backend.

  • dataset_type (Literal[EVENTS, LATE_LABELS, TRAINING_EVENTS]) –

    Type of the dataset to be updated

  • dataset_name (str, default: None ) –

    The name of the dataset where the data will be stored. If a model with this name does not exist, a new model will be created. If the model already exists, new records will be added to it (duplicated predictionIds will be ignored). In case dataset_type is TRAINING_EVENTS this is mandatory as the MPM model_name is the production model, which the dataset_name parameter is referring to.

  • na_values (str, default: None ) –

    Additional strings to recognize as NA/NaN. By default, the system recognizes standard missing values (like empty fields, 'NaN', 'NULL', etc.). Specifying this parameter allows for the inclusion of custom missing value identifiers, enhancing the flexibility in data handling. If specified, it should be as comma delimiter string. The default list is (Note that empty string is also in this list): None,,null,NULL,N/A,NA,NaN,n/a,nan

  • keep_default_na_values (str, default: None ) –

    A boolean that determines whether to include the default set of NA identifiers in addition to the values specified in 'na_values'. If True, both default and specified missing value identifiers are used. If False, only the values specified in 'na_values' are considered.

Note

CSV Format:

  • The first line of the CSV file must contain headers.
  • Columns:

    1. timestamp (optional): If missing, the current timestamp will be used as the event time. If specified, it should be the millis since epoch.
    2. predictionId (optional): Unique identifier for each event. If missing, a UUID will be generated. Duplicate predictionIds in new events will be ignored.
    3. feature_* columns: These prefixed columns specify the input features for the model, e.g., 'feature_age' or 'feature_color'.
    4. prediction_* columns: These prefixed columns are for the output features, e.g., 'prediction_animal' or 'prediction_probability'.
    5. label_value_* columns: These columns are for the label values of the event, e.g., 'label_value_price' or 'label_value_animal'.

Sample CSV content:

timestamp,predictionId,feature_oneMoreFeature,feature_anotherFeature,feature_someFeature,prediction_fingers_count,prediction_probability,prediction_value,label_value_fingers_count,label_value_animal
1713006000001,someAssetId_-1895825684,Dog,special,53.09863247819340,7,0.87,Bird,4,Fish
1713006600001,someAssetId_926457604,null,special,55.73110218323990,1,0.69,Fish,6,Fish
1713007200001,someAssetId_2145792990,Rabbit,special,49.40627545548700,4,0.59,Bird,1,Fish
Example
1
2
3
4
5
6
7
8
from comet_mpm import CometMPM

MPM = CometMPM()
MPM.upload_dataset_csv(
    file_path="path/to/your/data.csv",
    dataset_type="TRAINING_EVENTS",  # Or use 'EVENTS', 'LATE_LABELS' as needed
    dataset_name="your-dataset-name"
)
Jan. 17, 2025