OpikConnector

class opik.integrations.haystack.OpikConnector(*args, **kwargs)

Bases: object

OpikConnector connects Haystack LLM framework with [Opik](https://github.com/comet-ml/opik) in order to enable the tracing of operations and data flow within various components of a pipeline.

Simply add this component to your pipeline, but do not connect it to any other component. The OpikConnector will automatically trace the operations and data flow within the pipeline.

In order to configure Opik, you will need to call first install the Opik SDK using pip install opik and then run opik configure from the command line. Alternatively you can configure Opik using environment variables, you can find more information about how to configure Opik [here](https://www.comet.com/docs/opik/tracing/sdk_configuration).

In addition, you need to set the HAYSTACK_CONTENT_TRACING_ENABLED environment variable to true in order to enable Haystack tracing in your pipeline.

Example

You can use the OpikConnector in the following way:

import os

os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

from haystack import Pipeline
from haystack.components.builders import ChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from opik.integrations.haystack import (
    OpikConnector,
)

if __name__ == "__main__":
    pipe = Pipeline()
    pipe.add_component("tracer", OpikConnector("Chat example"))
    pipe.add_component("prompt_builder", ChatPromptBuilder())
    pipe.add_component("llm", OpenAIChatGenerator(model="gpt-3.5-turbo"))

    pipe.connect("prompt_builder.prompt", "llm.messages")

    messages = [
        ChatMessage.from_system(
            "Always respond in German even if some input data is in other languages."
        ),
        ChatMessage.from_user("Tell me about {{location}}"),
    ]

    response = pipe.run(
        data={
            "prompt_builder": {
                "template_variables": {"location": "Berlin"},
                "template": messages,
            }
        }
    )
    print(response["llm"]["replies"][0])
    print(response["tracer"]["trace_url"])

Note

You may disable flushing the data after each component by setting the HAYSTACK_OPIK_ENFORCE_FLUSH environent variable to false. By default, the data is flushed after each component and blocks the thread until the data is sent to Opik. Caution: Disabling this feature may result in data loss if the program crashes before the data is sent to Opik. Make sure you will call the flush() method explicitly before the program exits. E.g. by using tracer.actual_tracer.flush():

from haystack.tracing import tracer

tracer.actual_tracer.flush()
run(invocation_context: Dict[str, Any] | None = None) Dict[str, Any]

Runs the OpikConnector component.

Parameters:

invocation_context – A dictionary with additional context for the invocation. This parameter is useful when users want to mark this particular invocation with additional information, e.g. a run id from their own execution framework, user id, etc. These key-value pairs are then visible in the Opik traces.

Returns:

  • name: The name of the tracing component.

  • trace_id: The Opik trace id.

  • project_url: The URL to the Opik project with tracing data.

Return type:

A dictionary with the following keys