Skip to main content

Strands Agents integration

Temporal's integration with Strands Agents gives your Strands agents durable execution, automatic retries, and timeouts via the Temporal platform. The plugin routes Strands model invocations, tool calls, MCP tool calls, and hooks through Temporal Activities, so every step the agent takes is recorded in workflow history.

info

The Temporal Python SDK integration with Strands Agents is currently at an experimental release stage. The API may change in future versions.

Code snippets in this guide are taken from the Strands Agents plugin samples. Refer to the samples for the complete code.

Prerequisites

Install the plugin

Install the Temporal Python SDK with Strands Agents support (requires temporalio 1.28.0 or later):

uv add "temporalio[strands-agents]"

or with pip:

pip install "temporalio[strands-agents]"

Quickstart

Define a Workflow that holds a TemporalAgent, then register StrandsPlugin on the Worker:

import asyncio
from datetime import timedelta

from temporalio import workflow
from temporalio.client import Client
from temporalio.contrib.strands import StrandsPlugin, TemporalAgent
from temporalio.worker import Worker


@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(start_to_close_timeout=timedelta(seconds=60))

@workflow.run
async def run(self, prompt: str) -> str:
result = await self.agent.invoke_async(prompt)
return str(result)


async def main() -> None:
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="strands",
workflows=[MyWorkflow],
plugins=[StrandsPlugin()],
)
await worker.run()


if __name__ == "__main__":
asyncio.run(main())

Start the Workflow from a client:

import asyncio

from temporalio.client import Client

from workflow import MyWorkflow


async def main() -> None:
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
MyWorkflow.run,
"Hello",
id="strands-quickstart",
task_queue="strands",
)
print(result)


if __name__ == "__main__":
asyncio.run(main())
danger

Inside a Workflow, always call agent.invoke_async(message) — not agent(message). The synchronous form spawns a worker thread, which the Workflow sandbox blocks.

Models

StrandsPlugin(models=...) takes a mapping of name → factory. Each factory is called lazily on first use (on the Worker, outside the Workflow sandbox) and the constructed model is cached for the Worker's lifetime. If models is omitted, the plugin registers a single BedrockModel() factory under the name "bedrock", matching Strands' own implicit default. Select a model per agent with TemporalAgent(model="name", ...):

from strands.models.anthropic import AnthropicModel
from strands.models.bedrock import BedrockModel

# Workflow
@workflow.defn
class MultiModelWorkflow:
def __init__(self) -> None:
self.agent_a = TemporalAgent(
model="claude",
start_to_close_timeout=timedelta(seconds=60),
)
self.agent_b = TemporalAgent(
model="bedrock",
start_to_close_timeout=timedelta(seconds=60),
)

# Worker
Worker(..., plugins=[StrandsPlugin(models={
"claude": lambda: AnthropicModel(client_args={"api_key": "..."}),
"bedrock": lambda: BedrockModel(),
})])

Each TemporalAgent carries its own Activity options (timeouts, retry policy, task queue, streaming topic) and dispatches to the shared model Activity, which resolves the model name against the registered factories at runtime. A name not present in models raises ValueError inside the Activity.

Tools

Wrap non-deterministic tools as Temporal Activities, register them with the Worker, and pass them to the agent through workflow.activity_as_tool:

from strands_tools import shell
from temporalio import activity
from temporalio.contrib.strands import StrandsPlugin, TemporalAgent
from temporalio.contrib.strands import workflow as strands_workflow


@activity.defn
async def fetch_user(user_id: str) -> dict:
...


@activity.defn(name="shell")
async def shell_activity(command: str) -> dict:
return shell.shell(command=command, non_interactive=True)


# Workflow
agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[
strands_workflow.activity_as_tool(fetch_user, start_to_close_timeout=timedelta(seconds=30)),
strands_workflow.activity_as_tool(shell_activity, start_to_close_timeout=timedelta(seconds=15)),
],
)

# Worker
Worker(
...,
activities=[fetch_user, shell_activity],
plugins=[StrandsPlugin()],
)

If you're using built-in strands_tools, wrap them in a thin async function decorated with @activity.defn so they run as Temporal Activities.

Hooks

Strands' hook system lets you subscribe callbacks to events in the agent lifecycle — invocation start and end, model call before and after, tool call before and after, and message added. Pass hooks=[MyHookProvider()] to TemporalAgent; single-agent hook events fire in Workflow context, so deterministic callbacks just work:

from strands.hooks import HookProvider, HookRegistry
from strands.hooks.events import AfterToolCallEvent
from temporalio import workflow


class AuditHook(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(AfterToolCallEvent, self._on_tool_call)

def _on_tool_call(self, event: AfterToolCallEvent) -> None:
workflow.logger.info(f"tool {event.tool_use['name']} finished")


agent = TemporalAgent(start_to_close_timeout=timedelta(seconds=60), hooks=[AuditHook()])
danger

Hook callbacks run in Workflow context, so they must be deterministic — no time.time(), uuid.uuid4(), or I/O. For callbacks that need I/O (audit logging, metrics, alerting), use workflow.activity_as_hook() to dispatch the work as a Temporal Activity:

from temporalio import activity
from temporalio.contrib.strands.workflow import activity_as_hook


@activity.defn
async def persist_tool_call(tool_name: str) -> None:
# I/O safely in an activity.
...


class AuditHook(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(
AfterToolCallEvent,
activity_as_hook(
persist_tool_call,
activity_input=lambda event: event.tool_use["name"],
start_to_close_timeout=timedelta(seconds=10),
),
)

activity_input extracts serializable values from the event to pass as the Activity's input. Use a dataclass or Pydantic model for multiple values. This is needed because hook events hold references to the Agent, AgentTool instances, and other objects that don't cross the Activity boundary.

Human-in-the-loop interrupts

Strands offers two human-in-the-loop surfaces; both work with the plugin. In each case, agent.invoke_async() returns AgentResult(stop_reason="interrupt", interrupts=[...]) instead of raising. Pair this with a Signal handler that supplies responses, then resume by calling agent.invoke_async(responses).

Hook-based interrupts

A hook on an interruptible event (for example, BeforeToolCallEvent) can pause the agent by calling event.interrupt(name, reason=...). The hook runs in Workflow context, so it must be deterministic — no I/O.

from strands.hooks import HookProvider, HookRegistry
from strands.hooks.events import BeforeToolCallEvent
from temporalio import workflow


class ApprovalHook(HookProvider):
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(BeforeToolCallEvent, self._gate)

def _gate(self, event: BeforeToolCallEvent) -> None:
if event.interrupt("approval", reason="confirm delete") != "approve":
event.cancel_tool = "denied"


@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[delete_thing],
hooks=[ApprovalHook()],
)
self._approval: str | None = None

@workflow.signal
def approve(self, response: str) -> None:
self._approval = response

@workflow.run
async def run(self, prompt: str) -> str:
result = await self.agent.invoke_async(prompt)
if result.stop_reason == "interrupt":
await workflow.wait_condition(lambda: self._approval is not None)
result = await self.agent.invoke_async([
{"interruptResponse": {"interruptId": result.interrupts[0].id, "response": self._approval}}
])
return str(result)

Tool-body interrupts

A @strands.tool function can raise InterruptException(Interrupt(...)) directly. The agent stops with the interrupt, and the Workflow handles the resume the same way as for hooks:

from strands import tool
from strands.interrupt import Interrupt, InterruptException


@tool
def delete_thing(name: str) -> str:
raise InterruptException(
Interrupt(id=f"delete:{name}", name="approval", reason=f"delete {name}?")
)

The same works from an activity_as_tool-wrapped Activity. The plugin's failure converter preserves the Interrupt payload across the Activity boundary, so AgentResult.interrupts is populated just like the in-Workflow case:

from strands.interrupt import Interrupt, InterruptException
from temporalio import activity
from temporalio.contrib.strands.workflow import activity_as_tool


@activity.defn
async def delete_thing(name: str) -> str:
if not await policy.is_authorized(name):
raise InterruptException(
Interrupt(id=f"delete:{name}", name="approval", reason=f"delete {name}?")
)
await storage.delete(name)
return f"deleted {name}"


@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[activity_as_tool(delete_thing, start_to_close_timeout=timedelta(seconds=10))],
)
danger

Activity-tool interrupts rely on the plugin's failure converter, which is installed via the client's data converter. Attach StrandsPlugin to the client (not just the Worker) for them to work — Workers built from that client pick up the plugin automatically:

client = await Client.connect("localhost:7233", plugins=[StrandsPlugin()])
Worker(client, task_queue="strands", workflows=[MyWorkflow], activities=[delete_thing])

Structured output

Like Strands' Agent, TemporalAgent supports structured output with structured_output_model. The plugin defaults to the pydantic_data_converter, so Pydantic types serialize cleanly across the Activity and Workflow boundary:

from pydantic import BaseModel


class PersonInfo(BaseModel):
name: str
age: int


@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
structured_output_model=PersonInfo,
)

@workflow.run
async def run(self, prompt: str) -> PersonInfo:
result = await self.agent.invoke_async(prompt)
return result.structured_output

Streaming

To forward model chunks to external consumers, pass streaming_topic="..." to TemporalAgent and host a WorkflowStream on the Workflow. Each StreamEvent is published on the named topic from inside the model Activity; subscribers read via WorkflowStreamClient. Chunks are batched on streaming_batch_interval (default 100ms):

from temporalio.contrib.workflow_streams import WorkflowStream, WorkflowStreamClient


# Workflow
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
self.stream = WorkflowStream()
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
streaming_topic="events",
)


# Client
async for item in WorkflowStreamClient.create(client, workflow_id).subscribe(
["events"], result_type=StreamEvent,
):
print(item.data)

MCP

StrandsPlugin(mcp_clients=...) takes a mapping of name → MCPClient factory, mirroring the models= pattern. The plugin registers a per-server {name}-call-tool Activity and connects at Worker startup to enumerate tools. Workflow-side, TemporalMCPClient(server="name") is a pure handle: it references the server by name and carries the per-call Activity options.

from datetime import timedelta

from mcp import StdioServerParameters, stdio_client
from strands.tools.mcp.mcp_client import MCPClient
from temporalio import workflow
from temporalio.contrib.strands import StrandsPlugin, TemporalAgent, TemporalMCPClient


# Workflow
@workflow.defn
class MyWorkflow:
def __init__(self) -> None:
echo = TemporalMCPClient(server="echo", start_to_close_timeout=timedelta(seconds=30))
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[echo],
)


# Worker
Worker(
...,
plugins=[StrandsPlugin(
mcp_clients={
"echo": lambda: MCPClient(
lambda: stdio_client(
StdioServerParameters(command="...", args=[...]),
),
),
},
)],
)

Each factory returns a fully configured MCPClient, so you can pass options like tool_filters, prefix, elicitation_callback, or tasks_config to it.

note

The plugin connects to each MCP server once at Worker startup to enumerate tools. The schema is frozen for the Worker's lifetime; restart Workers to pick up MCP-server changes. If a server is unavailable at startup, the Worker fails to start.

Retries

TemporalAgent disables Strands' built-in ModelRetryStrategy so retries are handled exclusively by Temporal. Configure retries via retry_policy on TemporalAgent, and on the Activity options accepted by workflow.activity_as_tool, workflow.activity_as_hook, and TemporalMCPClient:

from temporalio.common import RetryPolicy


TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(maximum_attempts=3),
)

Passing retry_strategy=... to TemporalAgent(...) raises ValueError; remove the argument (or pass retry_strategy=None) and put the retry config on the Activity options instead.

Continue-as-new

A chat-style Workflow accumulates message history with every turn and eventually hits Temporal's per-Workflow history limit. Use continue-as-new to start a fresh execution while carrying agent.messages forward as input:

from dataclasses import dataclass, field

from strands.types.content import Messages
from temporalio import workflow


@dataclass
class ChatInput:
messages: Messages = field(default_factory=list)


@workflow.defn
class ChatWorkflow:
def __init__(self) -> None:
self._pending: list[str] = []
self._done = False

@workflow.signal
def user_says(self, prompt: str) -> None:
self._pending.append(prompt)

@workflow.signal
def end_chat(self) -> None:
self._done = True

@workflow.run
async def run(self, input: ChatInput) -> None:
agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
messages=list(input.messages),
)
while True:
await workflow.wait_condition(lambda: self._pending or self._done)
if self._done:
return
await agent.invoke_async(self._pending.pop(0))
if workflow.info().is_continue_as_new_suggested():
workflow.continue_as_new(ChatInput(messages=agent.messages))

Observability

StrandsPlugin composes cleanly with the OpenTelemetry plugin. Register OpenTelemetryPlugin on the client (Workers built from that client pick it up automatically) and StrandsPlugin on the Worker. You'll get OTel spans around the model, tool, and MCP Activities the plugin schedules, plus any spans Strands itself emits inside invoke_async:

import opentelemetry.trace
from temporalio.client import Client
from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker


opentelemetry.trace.set_tracer_provider(create_tracer_provider())

client = await Client.connect("localhost:7233", plugins=[OpenTelemetryPlugin()])

Worker(
client,
task_queue="strands",
workflows=[MyWorkflow],
plugins=[StrandsPlugin()],
)

Set the tracer provider before connecting the client.

Snapshots

TemporalAgent.take_snapshot() and TemporalAgent.load_snapshot() raise NotImplementedError. Temporal's event history already persists Workflow state durably at a finer granularity than Strands snapshots, so calling either inside a Workflow is redundant.

Samples

The Strands Agents plugin samples demonstrate all supported patterns end-to-end.