> ## Documentation Index
> Fetch the complete documentation index at: https://docs.langchain.com/llms.txt
> Use this file to discover all available pages before exploring further.
# Streaming
LangGraph implements a streaming system to surface real-time updates. Streaming is crucial for enhancing the responsiveness of applications built on LLMs. By displaying output progressively, even before a complete response is ready, streaming significantly improves user experience (UX), particularly when dealing with the latency of LLMs.
What's possible with LangGraph streaming:
* [**Stream graph state**](#stream-graph-state) — get state updates / values with `updates` and `values` modes.
* [**Stream subgraph outputs**](#stream-subgraph-outputs) — include outputs from both the parent graph and any nested subgraphs.
* [**Stream LLM tokens**](#messages) — capture token streams from anywhere: inside nodes, subgraphs, or tools.
* [**Stream custom data**](#stream-custom-data) — send custom updates or progress signals directly from tool functions.
* [**Use multiple streaming modes**](#stream-multiple-modes) — choose from `values` (full state), `updates` (state deltas), `messages` (LLM tokens + metadata), `custom` (arbitrary user data), or `debug` (detailed traces).
## Supported stream modes
Pass one or more of the following stream modes as a list to the [`stream`](https://reference.langchain.com/python/langgraph/graphs/#langgraph.graph.state.CompiledStateGraph.stream) or [`astream`](https://reference.langchain.com/python/langgraph/graphs/#langgraph.graph.state.CompiledStateGraph.astream) methods:
| Mode | Description |
| ---------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `values` | Streams the full value of the state after each step of the graph. |
| `updates` | Streams the updates to the state after each step of the graph. If multiple updates are made in the same step (e.g., multiple nodes are run), those updates are streamed separately. |
| `custom` | Streams custom data from inside your graph nodes. |
| `messages` | Streams 2-tuples (LLM token, metadata) from any graph nodes where an LLM is invoked. |
| `debug` | Streams as much information as possible throughout the execution of the graph. |
## Basic usage example
LangGraph graphs expose the [`stream`](https://reference.langchain.com/python/langgraph/pregel/#langgraph.pregel.Pregel.stream) (sync) and [`astream`](https://reference.langchain.com/python/langgraph/pregel/#langgraph.pregel.Pregel.astream) (async) methods to yield streamed outputs as iterators.
```python theme={null}
for chunk in graph.stream(inputs, stream_mode="updates"):
print(chunk)
```
```python theme={null}
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
# The stream() method returns an iterator that yields streamed outputs
for chunk in graph.stream( # [!code highlight]
{"topic": "ice cream"},
# Set stream_mode="updates" to stream only the updates to the graph state after each node
# Other stream modes are also available. See supported stream modes for details
stream_mode="updates", # [!code highlight]
):
print(chunk)
```
```python theme={null}
{'refineTopic': {'topic': 'ice cream and cats'}}
{'generateJoke': {'joke': 'This is a joke about ice cream and cats'}}
```
## Stream multiple modes
You can pass a list as the `stream_mode` parameter to stream multiple modes at once.
The streamed outputs will be tuples of `(mode, chunk)` where `mode` is the name of the stream mode and `chunk` is the data streamed by that mode.
```python theme={null}
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
print(chunk)
```
## Stream graph state
Use the stream modes `updates` and `values` to stream the state of the graph as it executes.
* `updates` streams the **updates** to the state after each step of the graph.
* `values` streams the **full value** of the state after each step of the graph.
```python theme={null}
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " and cats"}
def generate_joke(state: State):
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
```
Use this to stream only the **state updates** returned by the nodes after each step. The streamed outputs include the name of the node as well as the update.
```python theme={null}
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode="updates", # [!code highlight]
):
print(chunk)
```
Use this to stream the **full state** of the graph after each step.
```python theme={null}
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode="values", # [!code highlight]
):
print(chunk)
```
## Stream subgraph outputs
To include outputs from [subgraphs](/oss/python/langgraph/use-subgraphs) in the streamed outputs, you can set `subgraphs=True` in the `.stream()` method of the parent graph. This will stream outputs from both the parent graph and any subgraphs.
The outputs will be streamed as tuples `(namespace, data)`, where `namespace` is a tuple with the path to the node where a subgraph is invoked, e.g. `("parent_node:", "child_node:")`.
```python theme={null}
for chunk in graph.stream(
{"foo": "foo"},
# Set subgraphs=True to stream outputs from subgraphs
subgraphs=True, # [!code highlight]
stream_mode="updates",
):
print(chunk)
```
```python theme={null}
from langgraph.graph import START, StateGraph
from typing import TypedDict
# Define subgraph
class SubgraphState(TypedDict):
foo: str # note that this key is shared with the parent graph state
bar: str
def subgraph_node_1(state: SubgraphState):
return {"bar": "bar"}
def subgraph_node_2(state: SubgraphState):
return {"foo": state["foo"] + state["bar"]}
subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()
# Define parent graph
class ParentState(TypedDict):
foo: str
def node_1(state: ParentState):
return {"foo": "hi! " + state["foo"]}
builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()
for chunk in graph.stream(
{"foo": "foo"},
stream_mode="updates",
# Set subgraphs=True to stream outputs from subgraphs
subgraphs=True, # [!code highlight]
):
print(chunk)
```
```
((), {'node_1': {'foo': 'hi! foo'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_1': {'bar': 'bar'}})
(('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',), {'subgraph_node_2': {'foo': 'hi! foobar'}})
((), {'node_2': {'foo': 'hi! foobar'}})
```
**Note** that we are receiving not just the node updates, but we also the namespaces which tell us what graph (or subgraph) we are streaming from.
### Debugging
Use the `debug` streaming mode to stream as much information as possible throughout the execution of the graph. The streamed outputs include the name of the node as well as the full state.
```python theme={null}
for chunk in graph.stream(
{"topic": "ice cream"},
stream_mode="debug", # [!code highlight]
):
print(chunk)
```
## LLM tokens
Use the `messages` streaming mode to stream Large Language Model (LLM) outputs **token by token** from any part of your graph, including nodes, tools, subgraphs, or tasks.
The streamed output from [`messages` mode](#supported-stream-modes) is a tuple `(message_chunk, metadata)` where:
* `message_chunk`: the token or message segment from the LLM.
* `metadata`: a dictionary containing details about the graph node and LLM invocation.
> If your LLM is not available as a LangChain integration, you can stream its outputs using `custom` mode instead. See [use with any LLM](#use-with-any-llm) for details.
**Manual config required for async in Python \< 3.11**
When using Python \< 3.11 with async code, you must explicitly pass [`RunnableConfig`](https://reference.langchain.com/python/langchain_core/runnables/#langchain_core.runnables.RunnableConfig) to `ainvoke()` to enable proper streaming. See [Async with Python \< 3.11](#async) for details or upgrade to Python 3.11+.
```python theme={null}
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START
@dataclass
class MyState:
topic: str
joke: str = ""
model = init_chat_model(model="gpt-4.1-mini")
def call_model(state: MyState):
"""Call the LLM to generate a joke about a topic"""
# Note that message events are emitted even when the LLM is run using .invoke rather than .stream
model_response = model.invoke( # [!code highlight]
[
{"role": "user", "content": f"Generate a joke about {state.topic}"}
]
)
return {"joke": model_response.content}
graph = (
StateGraph(MyState)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
# The "messages" stream mode returns an iterator of tuples (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for message_chunk, metadata in graph.stream(
{"topic": "ice cream"},
stream_mode="messages", # [!code highlight]
):
if message_chunk.content:
print(message_chunk.content, end="|", flush=True)
```
#### Filter by LLM invocation
You can associate `tags` with LLM invocations to filter the streamed tokens by LLM invocation.
```python theme={null}
from langchain.chat_models import init_chat_model
# model_1 is tagged with "joke"
model_1 = init_chat_model(model="gpt-4.1-mini", tags=['joke'])
# model_2 is tagged with "poem"
model_2 = init_chat_model(model="gpt-4.1-mini", tags=['poem'])
graph = ... # define a graph that uses these LLMs
# The stream_mode is set to "messages" to stream LLM tokens
# The metadata contains information about the LLM invocation, including the tags
async for msg, metadata in graph.astream(
{"topic": "cats"},
stream_mode="messages", # [!code highlight]
):
# Filter the streamed tokens by the tags field in the metadata to only include
# the tokens from the LLM invocation with the "joke" tag
if metadata["tags"] == ["joke"]:
print(msg.content, end="|", flush=True)
```
```python theme={null}
from typing import TypedDict
from langchain.chat_models import init_chat_model
from langgraph.graph import START, StateGraph
# The joke_model is tagged with "joke"
joke_model = init_chat_model(model="gpt-4.1-mini", tags=["joke"])
# The poem_model is tagged with "poem"
poem_model = init_chat_model(model="gpt-4.1-mini", tags=["poem"])
class State(TypedDict):
topic: str
joke: str
poem: str
async def call_model(state, config):
topic = state["topic"]
print("Writing joke...")
# Note: Passing the config through explicitly is required for python < 3.11
# Since context var support wasn't added before then: https://docs.python.org/3/library/asyncio-task.html#creating-tasks
# The config is passed through explicitly to ensure the context vars are propagated correctly
# This is required for Python < 3.11 when using async code. Please see the async section for more details
joke_response = await joke_model.ainvoke(
[{"role": "user", "content": f"Write a joke about {topic}"}],
config,
)
print("\n\nWriting poem...")
poem_response = await poem_model.ainvoke(
[{"role": "user", "content": f"Write a short poem about {topic}"}],
config,
)
return {"joke": joke_response.content, "poem": poem_response.content}
graph = (
StateGraph(State)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
# The stream_mode is set to "messages" to stream LLM tokens
# The metadata contains information about the LLM invocation, including the tags
async for msg, metadata in graph.astream(
{"topic": "cats"},
stream_mode="messages",
):
if metadata["tags"] == ["joke"]:
print(msg.content, end="|", flush=True)
```
#### Filter by node
To stream tokens only from specific nodes, use `stream_mode="messages"` and filter the outputs by the `langgraph_node` field in the streamed metadata:
```python theme={null}
# The "messages" stream mode returns a tuple of (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for msg, metadata in graph.stream(
inputs,
stream_mode="messages", # [!code highlight]
):
# Filter the streamed tokens by the langgraph_node field in the metadata
# to only include the tokens from the specified node
if msg.content and metadata["langgraph_node"] == "some_node_name":
...
```
```python theme={null}
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4.1-mini")
class State(TypedDict):
topic: str
joke: str
poem: str
def write_joke(state: State):
topic = state["topic"]
joke_response = model.invoke(
[{"role": "user", "content": f"Write a joke about {topic}"}]
)
return {"joke": joke_response.content}
def write_poem(state: State):
topic = state["topic"]
poem_response = model.invoke(
[{"role": "user", "content": f"Write a short poem about {topic}"}]
)
return {"poem": poem_response.content}
graph = (
StateGraph(State)
.add_node(write_joke)
.add_node(write_poem)
# write both the joke and the poem concurrently
.add_edge(START, "write_joke")
.add_edge(START, "write_poem")
.compile()
)
# The "messages" stream mode returns a tuple of (message_chunk, metadata)
# where message_chunk is the token streamed by the LLM and metadata is a dictionary
# with information about the graph node where the LLM was called and other information
for msg, metadata in graph.stream(
{"topic": "cats"},
stream_mode="messages", # [!code highlight]
):
# Filter the streamed tokens by the langgraph_node field in the metadata
# to only include the tokens from the write_poem node
if msg.content and metadata["langgraph_node"] == "write_poem":
print(msg.content, end="|", flush=True)
```
## Stream custom data
To send **custom user-defined data** from inside a LangGraph node or tool, follow these steps:
1. Use [`get_stream_writer`](https://reference.langchain.com/python/langgraph/config/#langgraph.config.get_stream_writer) to access the stream writer and emit custom data.
2. Set `stream_mode="custom"` when calling `.stream()` or `.astream()` to get the custom data in the stream. You can combine multiple modes (e.g., `["updates", "custom"]`), but at least one must be `"custom"`.
**No [`get_stream_writer`](https://reference.langchain.com/python/langgraph/config/#langgraph.config.get_stream_writer) in async for Python \< 3.11**
In async code running on Python \< 3.11, [`get_stream_writer`](https://reference.langchain.com/python/langgraph/config/#langgraph.config.get_stream_writer) will not work.
Instead, add a `writer` parameter to your node or tool and pass it manually.
See [Async with Python \< 3.11](#async) for usage examples.
```python theme={null}
from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START
class State(TypedDict):
query: str
answer: str
def node(state: State):
# Get the stream writer to send custom data
writer = get_stream_writer()
# Emit a custom key-value pair (e.g., progress update)
writer({"custom_key": "Generating custom data inside node"})
return {"answer": "some data"}
graph = (
StateGraph(State)
.add_node(node)
.add_edge(START, "node")
.compile()
)
inputs = {"query": "example"}
# Set stream_mode="custom" to receive the custom data in the stream
for chunk in graph.stream(inputs, stream_mode="custom"):
print(chunk)
```
```python theme={null}
from langchain.tools import tool
from langgraph.config import get_stream_writer
@tool
def query_database(query: str) -> str:
"""Query the database."""
# Access the stream writer to send custom data
writer = get_stream_writer() # [!code highlight]
# Emit a custom key-value pair (e.g., progress update)
writer({"data": "Retrieved 0/100 records", "type": "progress"}) # [!code highlight]
# perform query
# Emit another custom key-value pair
writer({"data": "Retrieved 100/100 records", "type": "progress"})
return "some-answer"
graph = ... # define a graph that uses this tool
# Set stream_mode="custom" to receive the custom data in the stream
for chunk in graph.stream(inputs, stream_mode="custom"):
print(chunk)
```
## Use with any LLM
You can use `stream_mode="custom"` to stream data from **any LLM API** — even if that API does **not** implement the LangChain chat model interface.
This lets you integrate raw LLM clients or external services that provide their own streaming interfaces, making LangGraph highly flexible for custom setups.
```python theme={null}
from langgraph.config import get_stream_writer
def call_arbitrary_model(state):
"""Example node that calls an arbitrary model and streams the output"""
# Get the stream writer to send custom data
writer = get_stream_writer() # [!code highlight]
# Assume you have a streaming client that yields chunks
# Generate LLM tokens using your custom streaming client
for chunk in your_custom_streaming_client(state["topic"]):
# Use the writer to send custom data to the stream
writer({"custom_llm_chunk": chunk}) # [!code highlight]
return {"result": "completed"}
graph = (
StateGraph(State)
.add_node(call_arbitrary_model)
# Add other nodes and edges as needed
.compile()
)
# Set stream_mode="custom" to receive the custom data in the stream
for chunk in graph.stream(
{"topic": "cats"},
stream_mode="custom", # [!code highlight]
):
# The chunk will contain the custom data streamed from the llm
print(chunk)
```
```python theme={null}
import operator
import json
from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START
from openai import AsyncOpenAI
openai_client = AsyncOpenAI()
model_name = "gpt-4.1-mini"
async def stream_tokens(model_name: str, messages: list[dict]):
response = await openai_client.chat.completions.create(
messages=messages, model=model_name, stream=True
)
role = None
async for chunk in response:
delta = chunk.choices[0].delta
if delta.role is not None:
role = delta.role
if delta.content:
yield {"role": role, "content": delta.content}
# this is our tool
async def get_items(place: str) -> str:
"""Use this tool to list items one might find in a place you're asked about."""
writer = get_stream_writer()
response = ""
async for msg_chunk in stream_tokens(
model_name,
[
{
"role": "user",
"content": (
"Can you tell me what kind of items "
f"i might find in the following place: '{place}'. "
"List at least 3 such items separating them by a comma. "
"And include a brief description of each item."
),
}
],
):
response += msg_chunk["content"]
writer(msg_chunk)
return response
class State(TypedDict):
messages: Annotated[list[dict], operator.add]
# this is the tool-calling graph node
async def call_tool(state: State):
ai_message = state["messages"][-1]
tool_call = ai_message["tool_calls"][-1]
function_name = tool_call["function"]["name"]
if function_name != "get_items":
raise ValueError(f"Tool {function_name} not supported")
function_arguments = tool_call["function"]["arguments"]
arguments = json.loads(function_arguments)
function_response = await get_items(**arguments)
tool_message = {
"tool_call_id": tool_call["id"],
"role": "tool",
"name": function_name,
"content": function_response,
}
return {"messages": [tool_message]}
graph = (
StateGraph(State)
.add_node(call_tool)
.add_edge(START, "call_tool")
.compile()
)
```
Let's invoke the graph with an [`AIMessage`](https://reference.langchain.com/python/langchain/messages/#langchain.messages.AIMessage) that includes a tool call:
```python theme={null}
inputs = {
"messages": [
{
"content": None,
"role": "assistant",
"tool_calls": [
{
"id": "1",
"function": {
"arguments": '{"place":"bedroom"}',
"name": "get_items",
},
"type": "function",
}
],
}
]
}
async for chunk in graph.astream(
inputs,
stream_mode="custom",
):
print(chunk["content"], end="|", flush=True)
```
## Disable streaming for specific chat models
If your application mixes models that support streaming with those that do not, you may need to explicitly disable streaming for
models that do not support it.
Set `streaming=False` when initializing the model.
```python theme={null}
from langchain.chat_models import init_chat_model
model = init_chat_model(
"claude-sonnet-4-5-20250929",
# Set streaming=False to disable streaming for the chat model
streaming=False # [!code highlight]
)
```
```python theme={null}
from langchain_openai import ChatOpenAI
# Set streaming=False to disable streaming for the chat model
model = ChatOpenAI(model="o1-preview", streaming=False)
```
Not all chat model integrations support the `streaming` parameter. If your model doesn't support it, use `disable_streaming=True` instead. This parameter is available on all chat models via the base class.
### Async with Python \< 3.11
In Python versions \< 3.11, [asyncio tasks](https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task) do not support the `context` parameter.
This limits LangGraph ability to automatically propagate context, and affects LangGraph's streaming mechanisms in two key ways:
1. You **must** explicitly pass [`RunnableConfig`](https://python.langchain.com/docs/concepts/runnables/#runnableconfig) into async LLM calls (e.g., `ainvoke()`), as callbacks are not automatically propagated.
2. You **cannot** use [`get_stream_writer`](https://reference.langchain.com/python/langgraph/config/#langgraph.config.get_stream_writer) in async nodes or tools — you must pass a `writer` argument directly.
```python theme={null}
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain.chat_models import init_chat_model
model = init_chat_model(model="gpt-4.1-mini")
class State(TypedDict):
topic: str
joke: str
# Accept config as an argument in the async node function
async def call_model(state, config):
topic = state["topic"]
print("Generating joke...")
# Pass config to model.ainvoke() to ensure proper context propagation
joke_response = await model.ainvoke( # [!code highlight]
[{"role": "user", "content": f"Write a joke about {topic}"}],
config,
)
return {"joke": joke_response.content}
graph = (
StateGraph(State)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
# Set stream_mode="messages" to stream LLM tokens
async for chunk, metadata in graph.astream(
{"topic": "ice cream"},
stream_mode="messages", # [!code highlight]
):
if chunk.content:
print(chunk.content, end="|", flush=True)
```
```python theme={null}
from typing import TypedDict
from langgraph.types import StreamWriter
class State(TypedDict):
topic: str
joke: str
# Add writer as an argument in the function signature of the async node or tool
# LangGraph will automatically pass the stream writer to the function
async def generate_joke(state: State, writer: StreamWriter): # [!code highlight]
writer({"custom_key": "Streaming custom data while generating a joke"})
return {"joke": f"This is a joke about {state['topic']}"}
graph = (
StateGraph(State)
.add_node(generate_joke)
.add_edge(START, "generate_joke")
.compile()
)
# Set stream_mode="custom" to receive the custom data in the stream # [!code highlight]
async for chunk in graph.astream(
{"topic": "ice cream"},
stream_mode="custom",
):
print(chunk)
```
***
[Edit this page on GitHub](https://github.com/langchain-ai/docs/edit/main/src/oss/langgraph/streaming.mdx) or [file an issue](https://github.com/langchain-ai/docs/issues/new/choose).
[Connect these docs](/use-these-docs) to Claude, VSCode, and more via MCP for real-time answers.