Observability for LangGraph with Opik

Opik provides a seamless integration with LangGraph, allowing you to easily log and trace your LangGraph-based applications. By using the OpikTracer callback, you can automatically capture detailed information about your LangGraph graph executions during both development and production.

Account Setup

Comet provides a hosted version of the Opik platform, simply create an account and grab your API Key.

You can also run the Opik platform locally, see the installation guide for more information.

Getting Started

Installation

To use the OpikTracer with LangGraph, you’ll need to have both the opik and langgraph packages installed. You can install them using pip:

$pip install opik langgraph langchain

Configuring Opik

Configure the Opik Python SDK for your deployment type. See the Python SDK Configuration guide for detailed instructions on:

  • CLI configuration: opik configure
  • Code configuration: opik.configure()
  • Self-hosted vs Cloud vs Enterprise setup
  • Configuration files and environment variables

Using Opik with LangGraph

Opik provides two ways to track LangGraph applications. We recommend using the track_langgraph function for a simpler experience, but you can also use the OpikTracer callback directly if you need more control.

The simplest way to track your LangGraph applications is using the track_langgraph function. This function wraps your compiled graph once, and all subsequent invocations are automatically tracked without needing to pass callbacks:

1from typing import List, Annotated
2from pydantic import BaseModel
3from opik.integrations.langchain import OpikTracer, track_langgraph
4from langchain_core.messages import HumanMessage
5from langgraph.graph import StateGraph, START, END
6from langgraph.graph.message import add_messages
7
8# create your LangGraph graph
9class State(BaseModel):
10 messages: Annotated[list, add_messages]
11
12def chatbot(state):
13 # Typically your LLM calls would be done here
14 return {"messages": "Hello, how can I help you today?"}
15
16graph = StateGraph(State)
17graph.add_node("chatbot", chatbot)
18graph.add_edge(START, "chatbot")
19graph.add_edge("chatbot", END)
20app = graph.compile()
21
22# Create OpikTracer and track the graph once - no need to pass callbacks anymore!
23# The graph visualization is automatically extracted by track_langgraph
24opik_tracer = OpikTracer(
25 tags=["production"],
26 metadata={"version": "1.0"}
27)
28app = track_langgraph(app, opik_tracer)
29
30# Now all invocations are automatically tracked
31for s in app.stream({"messages": [HumanMessage(content = "How to use LangGraph ?")]}):
32 print(s)
33
34# No callbacks needed here either!
35result = app.invoke({"messages": [HumanMessage(content = "How to use LangGraph ?")]})

This is similar to how other Opik integrations work (like OpenAI, Anthropic, etc.), where you wrap the client or object once and then use it normally.

Option 2: Using OpikTracer callback

If you need more fine-grained control or want to use different tracers for different invocations, you can use the OpikTracer callback directly:

1from typing import List, Annotated
2from pydantic import BaseModel
3from opik.integrations.langchain import OpikTracer
4from langchain_core.messages import HumanMessage
5from langgraph.graph import StateGraph, START, END
6from langgraph.graph.message import add_messages
7
8# create your LangGraph graph
9class State(BaseModel):
10 messages: Annotated[list, add_messages]
11
12def chatbot(state):
13 # Typically your LLM calls would be done here
14 return {"messages": "Hello, how can I help you today?"}
15
16graph = StateGraph(State)
17graph.add_node("chatbot", chatbot)
18graph.add_edge(START, "chatbot")
19graph.add_edge("chatbot", END)
20app = graph.compile()
21
22# Create the OpikTracer
23opik_tracer = OpikTracer()
24
25# Pass the OpikTracer callback to each invocation
26for s in app.stream({"messages": [HumanMessage(content = "How to use LangGraph ?")]},
27 config={"callbacks": [opik_tracer]}):
28 print(s)
29
30result = app.invoke({"messages": [HumanMessage(content = "How to use LangGraph ?")]},
31 config={"callbacks": [opik_tracer]})

Viewing Traces in the UI

Once tracking is enabled using either method, you will start to see the traces in the Opik UI:

Practical Example: Classification Workflow

Let’s walk through a real-world example of using LangGraph with Opik for a classification workflow. This example demonstrates how to create a graph with conditional routing and track its execution.

Setting up the Environment

First, let’s set up our environment with the necessary dependencies:

1import opik
2
3# Configure Opik
4opik.configure(use_local=False)

Creating the LangGraph Workflow

We’ll create a LangGraph workflow with 3 nodes that demonstrates conditional routing:

1from langgraph.graph import StateGraph, END
2from typing import TypedDict, Optional
3
4# Define the graph state
5class GraphState(TypedDict):
6 question: Optional[str] = None
7 classification: Optional[str] = None
8 response: Optional[str] = None
9
10# Create the node functions
11def classify(question: str) -> str:
12 return "greeting" if question.startswith("Hello") else "search"
13
14def classify_input_node(state):
15 question = state.get("question", "").strip()
16 classification = classify(question)
17 return {"classification": classification}
18
19def handle_greeting_node(state):
20 return {"response": "Hello! How can I help you today?"}
21
22def handle_search_node(state):
23 question = state.get("question", "").strip()
24 search_result = f"Search result for '{question}'"
25 return {"response": search_result}
26
27# Create the workflow
28workflow = StateGraph(GraphState)
29workflow.add_node("classify_input", classify_input_node)
30workflow.add_node("handle_greeting", handle_greeting_node)
31workflow.add_node("handle_search", handle_search_node)
32
33# Add conditional routing
34def decide_next_node(state):
35 return (
36 "handle_greeting"
37 if state.get("classification") == "greeting"
38 else "handle_search"
39 )
40
41workflow.add_conditional_edges(
42 "classify_input",
43 decide_next_node,
44 {"handle_greeting": "handle_greeting", "handle_search": "handle_search"},
45)
46
47workflow.set_entry_point("classify_input")
48workflow.add_edge("handle_greeting", END)
49workflow.add_edge("handle_search", END)
50
51app = workflow.compile()

Executing with Opik Tracing

Now let’s execute the workflow with Opik tracing enabled using track_langgraph:

1from opik.integrations.langchain import OpikTracer, track_langgraph
2
3# Create OpikTracer and track the graph once
4# The graph visualization is automatically extracted by track_langgraph
5opik_tracer = OpikTracer(
6 project_name="classification-workflow"
7)
8app = track_langgraph(app, opik_tracer)
9
10# Execute the workflow - no callbacks needed!
11inputs = {"question": "Hello, how are you?"}
12result = app.invoke(inputs)
13print(result)
14
15# Test with a different input - still tracked automatically
16inputs = {"question": "What is machine learning?"}
17result = app.invoke(inputs)
18print(result)

The graph execution is now logged on the Opik platform and can be viewed in the UI. The trace will show the complete execution path through the graph, including the classification decision and the chosen response path.

Compatibility with Opik tracing context

LangGraph tracing integrates seamlessly with Opik’s tracing context, allowing you to call @track-decorated functions (and most use most of other native Opik integrations) from within your graph nodes and have them automatically attached to the trace tree.

Synchronous execution (invoke)

For synchronous graph execution using invoke(), everything works out of the box. You can access current spans/traces from LangGraph nodes and call tracked functions inside them:

1import opik_context
2from opik import track
3from opik.integrations.langchain import OpikTracer, track_langgraph
4from langgraph.graph import StateGraph, START, END
5
6@track
7def process_data(value: int) -> int:
8 """Custom tracked function that will be attached to the trace tree."""
9 return value * 2
10
11def my_node(state):
12 current_trace_data = opik_context.get_current_trace_data()
13 current_span_data = opik_context.get_current_span_data() # will return the span for `my_node`, created by OpikTracer
14
15 # This tracked function call will automatically be part of the trace tree
16 result = process_data(state["value"])
17 return {"value": result}
18
19# Build and execute graph
20graph = StateGraph(dict)
21graph.add_node("processor", my_node)
22graph.add_edge(START, "processor")
23graph.add_edge("processor", END)
24
25app = graph.compile()
26opik_tracer = OpikTracer()
27app = track_langgraph(app, opik_tracer)
28
29# Synchronous execution - tracked functions work automatically
30result = app.invoke({"value": 21})

Asynchronous execution (ainvoke)

For asynchronous graph execution using ainvoke(), you need to explicitly propagate the trace context to @track-decorated functions using the extract_current_langgraph_span_data helper:

This is due to a LangChain framework limitation that doesn’t automatically share the execution context between callbacks (like OpikTracer) and node code in async scenarios. The explicit trace context propagation via distributed headers is required for seamless tracking across async boundaries.

1from opik import track
2from opik.integrations.langchain import OpikTracer, track_langgraph, extract_current_langgraph_span_data
3from langgraph.graph import StateGraph, START, END
4
5@track
6def process_data(value: int) -> int:
7 """Custom tracked function that needs distributed trace headers in async context."""
8 return value * 2
9
10async def my_async_node(state, config):
11 # Extract current span data from LangGraph config. `opik_context` doesn't work here due to langgraph platform limitations related to context propagation.
12 span_data = extract_current_langgraph_span_data(config)
13
14 # Pass distributed trace headers to attach the tracked function to the trace tree
15 result = process_data(
16 state["value"],
17 opik_distributed_trace_headers=span_data.get_distributed_trace_headers() # all tracked functions implicitly support this parameter
18 )
19 return {"value": result}
20
21# Build and execute graph
22graph = StateGraph(dict)
23graph.add_node("processor", my_async_node)
24graph.add_edge(START, "processor")
25graph.add_edge("processor", END)
26
27app = graph.compile()
28opik_tracer = OpikTracer()
29app = track_langgraph(app, opik_tracer)
30
31# Asynchronous execution - requires explicit trace context propagation
32result = await app.ainvoke({"value": 21})

Alternatively, if you don’t want to use the @track decorator, you can use the opik.start_as_current_span context manager with distributed headers:

1import opik
2from opik.integrations.langchain import OpikTracer, track_langgraph, extract_current_langgraph_span_data
3from langgraph.graph import StateGraph, START, END
4
5async def my_async_node(state, config):
6 span_data = extract_current_langgraph_span_data(config)
7
8 # Use context manager with distributed headers
9 with opik.start_as_current_span(
10 name="custom_operation",
11 input={"input": state["value"]},
12 opik_distributed_trace_headers=span_data.get_distributed_trace_headers()
13 ) as span_data:
14 # Your custom logic here
15 result = state["value"] * 2
16 span_data.output = {"output": result}
17
18 return {"value": result}
19
20# Build and execute graph
21graph = StateGraph(dict)
22graph.add_node("processor", my_async_node)
23graph.add_edge(START, "processor")
24graph.add_edge("processor", END)
25
26app = graph.compile()
27opik_tracer = OpikTracer()
28app = track_langgraph(app, opik_tracer)
29
30result = await app.ainvoke({"value": 21})

Logging threads

When you are running multi-turn conversations using LangGraph persistence, Opik will use Langgraph’s thread_id as Opik thread_id. Here is an example below:

1import sqlite3
2from langgraph.checkpoint.sqlite import SqliteSaver
3from typing import Annotated
4from pydantic import BaseModel
5from opik.integrations.langchain import OpikTracer, track_langgraph
6from langchain_core.messages import HumanMessage
7from langgraph.graph import StateGraph, START, END
8from langgraph.graph.message import add_messages
9from langchain.chat_models import init_chat_model
10
11llm = init_chat_model("openai:gpt-4.1")
12
13
14# create your LangGraph graph
15class State(BaseModel):
16 messages: Annotated[list, add_messages]
17
18
19def chatbot(state):
20 # Typically your LLM calls would be done here
21 return {"messages": [llm.invoke(state.messages)]}
22
23
24graph = StateGraph(State)
25graph.add_node("chatbot", chatbot)
26graph.add_edge(START, "chatbot")
27graph.add_edge("chatbot", END)
28
29# Create a new SqliteSaver instance
30# Note: check_same_thread=False is OK as the implementation uses a lock
31# to ensure thread safety.
32conn = sqlite3.connect("checkpoints.sqlite", check_same_thread=False)
33memory = SqliteSaver(conn)
34
35app = graph.compile(checkpointer=memory)
36
37# Create the OpikTracer and track the graph
38opik_tracer = OpikTracer()
39app = track_langgraph(app, opik_tracer)
40
41thread_id = "e424a45e-7763-443a-94ae-434b39b67b72"
42config = {"configurable": {"thread_id": thread_id}}
43
44# Initialize the state
45state = State(**app.get_state(config).values) or State(messages=[])
46print("STATE", state)
47
48# Add the user message
49state.messages.append(HumanMessage(content="Hello, my name is Bob, how are you doing ?"))
50# state.messages.append(HumanMessage(content="What is my name ?"))
51
52result = app.invoke(state, config=config)
53
54print("Result", result)

Updating logged traces

You can use the OpikTracer.created_traces method to access the trace IDs collected by the OpikTracer callback:

1from opik.integrations.langchain import OpikTracer
2
3opik_tracer = OpikTracer()
4
5# Calling LangGraph stream or invoke functions
6
7traces = opik_tracer.created_traces()
8print([trace.id for trace in traces])

These can then be used with the Opik.log_traces_feedback_scores method to update the logged traces.

Advanced usage

The OpikTracer object has a flush method that can be used to make sure that all traces are logged to the Opik platform before you exit a script. This method will return once all traces have been logged or if the timeout is reach, whichever comes first.

1from opik.integrations.langchain import OpikTracer
2
3opik_tracer = OpikTracer()
4opik_tracer.flush()