Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Um guia abrangente para migrar do AutoGen para o SDK do Python do Microsoft Agent Framework.
Sumário
- Plano de Fundo
- Principais semelhanças e diferenças
- Criação e configuração do cliente de modelo
-
Mapeamento de recursos doSingle-Agent
- Criação e execução básicas do agente
- Gerenciando o estado de conversa com AgentThread
- Equivalência do Agente Assistente do OpenAI
- Suporte de streaming
- Tipos e criação de mensagens
- Criação e integração de ferramentas
- Ferramentas Hospedadas (Agent Framework Exclusive)
- Suporte ao servidor MCP
- Padrão de agente como ferramenta
- Middleware (recurso do Agent Framework)
- Agentes Personalizados
- Mapeamento de recursos de vários agentes
- de Observabilidade
- Conclusão
Contexto
O AutoGen é uma estrutura para a criação de agentes de IA e sistemas de vários agentes usando LLMs (modelos de linguagem grande). Começou como um projeto de pesquisa na Microsoft Research e foi pioneiro em vários conceitos em orquestração de vários agentes, como GroupChat e runtime de agente controlado por eventos. O projeto tem sido uma colaboração frutífera da comunidade de software livre e muitos recursos importantes vieram de colaboradores externos.
O Microsoft Agent Framework é um novo SDK de vários idiomas para criar agentes de IA e fluxos de trabalho usando LLMs. Representa uma evolução significativa das ideias pioneiras na AutoGen e incorpora lições aprendidas com o uso do mundo real. Ele é desenvolvido pelas principais equipes de AutoGen e Kernel Semântico da Microsoft e foi projetado para ser uma nova base para a criação de aplicativos de IA daqui para frente.
Este guia descreve um caminho prático de migração: ele começa cobrindo o que permanece o mesmo e o que muda rapidamente. Em seguida, aborda a configuração do cliente de modelo, os recursos de agente único e, por fim, a orquestração de vários agentes com código concreto lado a lado. Ao longo do caminho, links para exemplos executáveis no repositório do Agent Framework ajudam você a validar cada etapa.
Principais semelhanças e diferenças
O que permanece o mesmo
As fundações são familiares. Você ainda cria agentes em torno de um cliente modelo, fornece instruções e anexa ferramentas. Ambas as bibliotecas dão suporte a ferramentas de estilo de função, streaming de token, conteúdo multimodal e E/S assíncrona.
# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")
# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")
Principais Diferenças
Estilo de orquestração: o AutoGen emparelha um núcleo controlado por eventos com um alto nível
Team. O Agent Framework centra-se em um tipo baseado emWorkflowgrafo que roteia dados ao longo das bordas e ativa executores quando as entradas estão prontas.Ferramentas: AutoGen encapsula funções com
FunctionTool. O Agent Framework usa@ai_function, infere esquemas automaticamente e adiciona ferramentas hospedadas, como um interpretador de código e uma pesquisa na Web.Comportamento do agente:
AssistantAgenté de turno único, a menos que você aumentemax_tool_iterations.ChatAgenté multi turn por padrão e continua invocando ferramentas até que possa retornar uma resposta final.Runtime: o AutoGen oferece runtimes distribuídos incorporados e experimentais. O Agent Framework se concentra na composição de processo único atualmente; A execução distribuída está planejada.
Criação e configuração do cliente de modelo
Ambas as estruturas fornecem clientes de modelo para os principais provedores de IA, com APIs semelhantes, mas não idênticas.
| Característica | AutoGen | Estrutura do Agente |
|---|---|---|
| Cliente OpenAI | OpenAIChatCompletionClient |
OpenAIChatClient |
| Cliente de respostas OpenAI | ❌ Não disponível | OpenAIResponsesClient |
| OpenAI do Azure | AzureOpenAIChatCompletionClient |
AzureOpenAIChatClient |
| Respostas do Azure OpenAI | ❌ Não disponível | AzureOpenAIResponsesClient |
| IA do Azure | AzureAIChatCompletionClient |
AzureAIAgentClient |
| Anthropic | AnthropicChatCompletionClient |
🚧 Planeado |
| Ollama | OllamaChatCompletionClient |
🚧 Planeado |
| Cache |
ChatCompletionCache capa |
🚧 Planeado |
Clientes do Modelo de AutoGen
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
# OpenAI
client = OpenAIChatCompletionClient(
model="gpt-5",
api_key="your-key"
)
# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
azure_endpoint="https://your-endpoint.openai.azure.com/",
azure_deployment="gpt-5",
api_version="2024-12-01",
api_key="your-key"
)
Agent Framework ChatClients
from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient
# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")
# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")
Para obter exemplos detalhados, consulte:
- Cliente de Chat OpenAI – Configuração básica do cliente OpenAI
- Cliente de chat do Azure OpenAI – Azure OpenAI com autenticação
- Cliente de IA do Azure – Integração de agente de IA do Azure
Suporte à API de respostas (Agent Framework Exclusive)
O Agent Framework e AzureOpenAIResponsesClientOpenAIResponsesClient o suporte especializado para modelos de raciocínio e respostas estruturadas não estão disponíveis no AutoGen:
from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient
# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")
# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")
Para obter exemplos de API de Respostas, consulte:
- Azure Responses Client Basic – Azure OpenAI com respostas
- OpenAI Responses Client Basic – Integração de respostas OpenAI
Mapeamento de recursos do Single-Agent
Esta seção mapeia recursos de agente único entre o AutoGen e o Agent Framework. Com um cliente em vigor, crie um agente, anexe ferramentas e escolha entre não streaming e execução de streaming.
Criação e execução básicas do agente
Depois que você tiver um cliente de modelo configurado, a próxima etapa será criar agentes. Ambas as estruturas fornecem abstrações de agente semelhantes, mas com diferentes comportamentos padrão e opções de configuração.
AutoGen AssistantAgent
from autogen_agentchat.agents import AssistantAgent
agent = AssistantAgent(
name="assistant",
model_client=client,
system_message="You are a helpful assistant.",
tools=[my_tool],
max_tool_iterations=1 # Single-turn by default
)
# Execution
result = await agent.run(task="What's the weather?")
Agent Framework ChatAgent
from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient
# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
@ai_function
def get_time() -> str:
"""Get current time."""
return "Current time: 2:30 PM"
# Create client
client = OpenAIChatClient(model_id="gpt-5")
async def example():
# Direct creation
agent = ChatAgent(
name="assistant",
chat_client=client,
instructions="You are a helpful assistant.",
tools=[get_weather] # Multi-turn by default
)
# Factory method (more convenient)
agent = client.create_agent(
name="assistant",
instructions="You are a helpful assistant.",
tools=[get_weather]
)
# Execution with runtime tool configuration
result = await agent.run(
"What's the weather?",
tools=[get_time], # Can add tools at runtime
tool_choice="auto"
)
Principais diferenças:
-
Comportamento padrão:
ChatAgentitera automaticamente por meio de chamadas de ferramenta, enquantoAssistantAgentrequer configuração explícitamax_tool_iterations -
Configuração de runtime:
ChatAgent.run()aceitatoolsetool_choiceparâmetros para personalização por invocação - Métodos de fábrica: o Agent Framework fornece métodos de fábrica convenientes diretamente de clientes de chat
-
Gerenciamento de estado:
ChatAgenté sem estado e não mantém o histórico de conversas entre invocações, ao contrárioAssistantAgentdo que mantém o histórico de conversas como parte de seu estado
Gerenciando o estado de conversa com AgentThread
Para continuar as conversas com ChatAgent, use AgentThread para gerenciar o histórico de conversas:
# Assume we have an agent from previous examples
async def conversation_example():
# Create a new thread that will be reused
thread = agent.get_new_thread()
# First interaction - thread is empty
result1 = await agent.run("What's 2+2?", thread=thread)
print(result1.text) # "4"
# Continue conversation - thread contains previous messages
result2 = await agent.run("What about that number times 10?", thread=thread)
print(result2.text) # "40" (understands "that number" refers to 4)
# AgentThread can use external storage, similar to ChatCompletionContext in AutoGen
Sem estado por padrão: demonstração rápida
# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text) # for example, "4"
r2 = await agent.run("What about that number times 10?")
print(r2.text) # Likely ambiguous without prior context; cannot be "40"
# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text) # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text) # "40"
Para obter exemplos de gerenciamento de threads, consulte:
- IA do Azure com Thread – Gerenciamento de estado de conversa
- Cliente de chat openai com thread – padrões de uso de thread
- Threads com suporte redis – Persistindo o estado da conversa externamente
Equivalência do Agente Assistente do OpenAI
Ambas as estruturas fornecem integração à API do Assistente do OpenAI:
# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient
Para obter exemplos do Assistente do OpenAI, consulte:
- Assistentes do OpenAI Básico – Configuração básica do assistente
- Assistentes openai com ferramentas de função – integração de ferramentas personalizadas
- Azure OpenAI Assistants Basic – Configuração do assistente do Azure
- Assistentes do OpenAI com Thread – Gerenciamento de threads
Suporte de streaming
Ambas as estruturas transmitem tokens em tempo real, de clientes e de agentes, para manter as interfaces do usuário responsivas.
AutoGen Streaming
# Model client streaming
async for chunk in client.create_stream(messages):
if isinstance(chunk, str):
print(chunk, end="")
# Agent streaming
async for event in agent.run_stream(task="Hello"):
if isinstance(event, ModelClientStreamingChunkEvent):
print(event.content, end="")
elif isinstance(event, TaskResult):
print("Final result received")
Agent Framework Streaming
# Assume we have client, agent, and tools from previous examples
async def streaming_example():
# Chat client streaming
async for chunk in client.get_streaming_response("Hello", tools=tools):
if chunk.text:
print(chunk.text, end="")
# Agent streaming
async for chunk in agent.run_stream("Hello"):
if chunk.text:
print(chunk.text, end="", flush=True)
Dica: no Agent Framework, clientes e agentes produzem a mesma forma de atualização; você pode ler chunk.text em ambos os casos.
Tipos e criação de mensagens
Entender como as mensagens funcionam é crucial para a comunicação efetiva do agente. Ambas as estruturas fornecem abordagens diferentes para criação e manipulação de mensagens, com o AutoGen usando classes de mensagens separadas e o Agent Framework usando um sistema de mensagens unificado.
Tipos de mensagens autogen
from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage
# Text message
text_msg = TextMessage(content="Hello", source="user")
# Multi-modal message
multi_modal_msg = MultiModalMessage(
content=["Describe this image", image_data],
source="user"
)
# Convert to model format for use with model clients
user_message = text_msg.to_model_message()
Tipos de mensagens da Estrutura do Agente
from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64
# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")
# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"
# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
role=Role.USER,
contents=[
TextContent(text="Describe this image"),
DataContent(uri=image_uri, media_type="image/jpeg")
]
)
Principais diferenças:
- O AutoGen usa classes de mensagem separadas (
TextMessage,MultiModalMessage) com umsourcecampo - O Agent Framework usa um unificado
ChatMessagecom objetos de conteúdo tipados e umrolecampo - As mensagens do Agent Framework usam
Roleenum (USER, ASSISTANT, SYSTEM, TOOL) em vez de fontes de cadeia de caracteres
Criação e integração de ferramentas
As ferramentas estendem os recursos do agente além da geração de texto. As estruturas assumem abordagens diferentes para a criação de ferramentas, com o Agent Framework fornecendo uma geração de esquema mais automatizada.
AutoGen FunctionTool
from autogen_core.tools import FunctionTool
async def get_weather(location: str) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Manual tool creation
tool = FunctionTool(
func=get_weather,
description="Get weather information"
)
# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])
Estrutura do Agente @ai_function
from agent_framework import ai_function
from typing import Annotated
from pydantic import Field
@ai_function
def get_weather(
location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
"""Get weather for a location."""
return f"Weather in {location}: sunny"
# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])
Para obter exemplos detalhados, consulte:
- OpenAI Chat Agent Basic – Agente de chat OpenAI simples
- OpenAI com Ferramentas de Funções – Agente com ferramentas personalizadas
- Azure OpenAI Basic – Configuração do agente do Azure OpenAI
Ferramentas Hospedadas (Agent Framework Exclusive)
O Agent Framework fornece ferramentas hospedadas que não estão disponíveis no AutoGen:
from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient
# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")
# Code execution tool
code_tool = HostedCodeInterpreterTool()
# Web search tool
search_tool = HostedWebSearchTool()
agent = ChatAgent(
name="researcher",
chat_client=client,
tools=[code_tool, search_tool]
)
Para obter exemplos detalhados, consulte:
- Ia do Azure com interpretador de código – ferramenta de execução de código
- IA do Azure com várias ferramentas – várias ferramentas hospedadas
- OpenAI com Pesquisa na Web – Integração de pesquisa na Web
Requisitos e advertências:
- As ferramentas hospedadas só estão disponíveis em modelos/contas que dão suporte a elas. Verifique os direitos e o suporte ao modelo para seu provedor antes de habilitar essas ferramentas.
- A configuração difere por provedor; siga os pré-requisitos em cada exemplo para instalação e permissões.
- Nem todo modelo dá suporte a todas as ferramentas hospedadas (por exemplo, pesquisa na Web versus interpretador de código). Escolha um modelo compatível em seu ambiente.
Observação
O AutoGen dá suporte a ferramentas de execução de código local, mas esse recurso está planejado para versões futuras do Agent Framework.
Diferença de chave: o Agent Framework manipula a iteração da ferramenta automaticamente no nível do agente. Ao contrário do parâmetro do AutoGen, os agentes do max_tool_iterations Agent Framework continuam a execução da ferramenta até a conclusão por padrão, com mecanismos de segurança internos para evitar loops infinitos.
Suporte ao servidor MCP
Para integração avançada de ferramentas, ambas as estruturas dão suporte ao PROTOCOLO MCP (Model Context Protocol), permitindo que os agentes interajam com serviços externos e fontes de dados. O Agent Framework fornece suporte interno mais abrangente.
Suporte ao MCP do AutoGen
O AutoGen tem suporte básico do MCP por meio de extensões (os detalhes de implementação específicos variam de acordo com a versão).
Suporte ao MCP do Agent Framework
from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
# Stdio MCP server
mcp_tool = MCPStdioTool(
name="filesystem",
command="uvx mcp-server-filesystem",
args=["/allowed/directory"]
)
# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
name="http_mcp",
url="http://localhost:8000/sse"
)
# WebSocket MCP
ws_mcp = MCPWebsocketTool(
name="websocket_mcp",
url="ws://localhost:8000/ws"
)
agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])
Para obter exemplos de MCP, consulte:
- OpenAI com MCP local – Usando MCPStreamableHTTPTool com OpenAI
- OpenAI com MCP hospedado – Usando serviços MCP hospedados
- IA do Azure com MCP Local – Usando MCP com a IA do Azure
- IA do Azure com MCP hospedado – Usando o MCP hospedado com a IA do Azure
Padrão de agente como ferramenta
Um padrão poderoso é usar os próprios agentes como ferramentas, habilitando arquiteturas de agente hierárquico. Ambas as estruturas dão suporte a esse padrão com implementações diferentes.
AutoGen AgentTool
from autogen_agentchat.tools import AgentTool
# Create specialized agent
writer = AssistantAgent(
name="writer",
model_client=client,
system_message="You are a creative writer."
)
# Wrap as tool
writer_tool = AgentTool(agent=writer)
# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
model="gpt-5",
parallel_tool_calls=False
)
coordinator = AssistantAgent(
name="coordinator",
model_client=coordinator_client,
tools=[writer_tool]
)
As_tool do Agent Framework()
from agent_framework import ChatAgent
# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
name="writer",
chat_client=client,
instructions="You are a creative writer."
)
# Convert to tool
writer_tool = writer.as_tool(
name="creative_writer",
description="Generate creative content",
arg_name="request",
arg_description="What to write"
)
# Use in coordinator
coordinator = ChatAgent(
name="coordinator",
chat_client=client,
tools=[writer_tool]
)
Observação de migração explícita: no AutoGen, defina parallel_tool_calls=False no cliente de modelo do coordenador ao encapsular agentes como ferramentas para evitar problemas de simultaneidade ao invocar a mesma instância do agente.
No Agent Framework, as_tool() não requer desabilitar chamadas de ferramenta paralela, pois os agentes são sem estado por padrão.
Middleware (recurso do Agent Framework)
O Agent Framework apresenta recursos de middleware que a AutoGen não tem. O Middleware permite preocupações avançadas entre cortes, como registro em log, segurança e monitoramento de desempenho.
from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable
# Assume we have client from previous examples
async def logging_middleware(
context: AgentRunContext,
next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
print(f"Agent {context.agent.name} starting")
await next(context)
print(f"Agent {context.agent.name} completed")
async def security_middleware(
context: FunctionInvocationContext,
next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
if "password" in str(context.arguments):
print("Blocking function call with sensitive data")
return # Don't call next()
await next(context)
agent = ChatAgent(
name="secure_agent",
chat_client=client,
middleware=[logging_middleware, security_middleware]
)
Benefícios:
- Segurança: validação de entrada e filtragem de conteúdo
- Observabilidade: registro em log, métricas e rastreamento
- Desempenho: Cache e limitação de taxa
- Tratamento de erros: degradação normal e lógica de repetição
Para obter exemplos detalhados de middleware, consulte:
- Middleware baseado em função – middleware de função simples
- Middleware baseado em classe – middleware orientado a objeto
- Middleware de Tratamento de Exceções – Padrões de tratamento de erros
- Middleware de Estado Compartilhado – Gerenciamento de estado entre agentes
Agentes Personalizados
Às vezes, você não quer um agente com suporte de modelo– você deseja um agente determinístico ou com suporte de API com lógica personalizada. Ambas as estruturas dão suporte à criação de agentes personalizados, mas os padrões diferem.
AutoGen: Subclasse BaseChatAgent
from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken
class StaticAgent(BaseChatAgent):
def __init__(self, name: str = "static", description: str = "Static responder") -> None:
super().__init__(name, description)
@property
def produced_message_types(self) -> Sequence[type[BaseChatMessage]]: # Which message types this agent produces
return (TextMessage,)
async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
# Always return a static response
return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))
Notes:
- Implemente
on_messages(...)e retorne umResponsecom uma mensagem de chat. - Opcionalmente, implemente
on_reset(...)para limpar o estado interno entre as execuções.
Estrutura do Agente: Estender BaseAgent (com reconhecimento de thread)
from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Role,
TextContent,
)
class StaticAgent(BaseAgent):
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
# Build a static reply
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
# Persist conversation to the provided AgentThread (if any)
if thread is not None:
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
return AgentRunResponse(messages=[reply])
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
# Stream the same static response in a single chunk for simplicity
yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)
# Notify thread of input and the complete response once streaming ends
if thread is not None:
reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
normalized = self._normalize_messages(messages)
await self._notify_thread_of_new_messages(thread, normalized, reply)
Notes:
-
AgentThreadmantém o estado da conversa externamente; useagent.get_new_thread()e passe pararun/run_stream. - Chame
self._notify_thread_of_new_messages(thread, input_messages, response_messages)para que o thread tenha ambos os lados da troca. - Veja o exemplo completo: Agente Personalizado
Em seguida, vamos examinar a orquestração de vários agentes, a área em que as estruturas diferem mais.
Mapeamento de recursos de vários agentes
Visão geral do modelo de programação
Os modelos de programação de vários agentes representam a diferença mais significativa entre as duas estruturas.
Abordagem de modelo duplo da AutoGen
O AutoGen fornece dois modelos de programação:
-
autogen-core: programação controlada por eventos de baixo nível comRoutedAgentassinaturas e mensagens -
Teamabstração: modelo centrado em execução de alto nível criado com base emautogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
@message_handler
async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
# Handle specific message types
pass
# High-level Team (easier but limited)
team = RoundRobinGroupChat(
participants=[agent1, agent2],
termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")
Desafios:
- O modelo de baixo nível é muito complexo para a maioria dos usuários
- O modelo de alto nível pode se tornar limitador para comportamentos complexos
- A ponte entre os dois modelos adiciona complexidade de implementação
Modelo de fluxo de trabalho unificado do Agent Framework
O Agent Framework fornece uma única Workflow abstração que combina o melhor de ambas as abordagens:
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
response = await agent1.run(input_msg)
await ctx.send_message(response.text)
@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
response = await agent2.run(input_msg)
await ctx.yield_output(response.text) # Final output
# Build typed data flow graph
workflow = (WorkflowBuilder()
.add_edge(agent1_executor, agent2_executor)
.set_start_executor(agent1_executor)
.build())
# Example usage (would be in async context)
# result = await workflow.run("Initial input")
Para obter exemplos detalhados de fluxo de trabalho, consulte:
- Noções básicas de fluxo de trabalho – Introdução a executores e bordas
- Agentes no Fluxo de Trabalho – Integração de agentes em fluxos de trabalho
- Streaming de fluxo de trabalho – Execução de fluxo de trabalho em tempo real
Benefícios:
- Modelo unificado: abstração única para todos os níveis de complexidade
- Segurança do tipo: entradas e saídas fortemente tipada
- Visualização do grafo: Limpar representação de fluxo de dados
- Composição flexível: misturar agentes, funções e sub-fluxos de trabalho
Fluxo de trabalho vs GraphFlow
A abstração do Workflow Agent Framework é inspirada no recurso experimental GraphFlow da AutoGen, mas representa uma evolução significativa na filosofia de design:
- GraphFlow: fluxo de controle baseado em onde as bordas são transições e as mensagens são transmitidas para todos os agentes; as transições são condicionadas no conteúdo da mensagem transmitida
- Fluxo de trabalho: o fluxo de dados baseado em onde as mensagens são roteadas por meio de bordas e executores específicos são ativados por bordas, com suporte para execução simultânea.
Visão geral do visual
O diagrama a seguir contrasta o Fluxo de Grafo de fluxo de controle da AutoGen (à esquerda) com o fluxo de trabalho de fluxo de dados do Agent Framework (à direita). O GraphFlow modela agentes como nós com transições e transmissões condicionais. Executores de modelos de fluxo de trabalho (agentes, funções ou sub-fluxos de trabalho) conectados por bordas tipados; ele também dá suporte a pausas de solicitação/resposta e ponto de verificação.
flowchart LR
subgraph AutoGenGraphFlow
direction TB
U[User / Task] --> A[Agent A]
A -->|success| B[Agent B]
A -->|retry| C[Agent C]
A -. broadcast .- B
A -. broadcast .- C
end
subgraph AgentFrameworkWorkflow
direction TB
I[Input] --> E1[Executor 1]
E1 -->|"str"| E2[Executor 2]
E1 -->|"image"| E3[Executor 3]
E3 -->|"str"| E2
E2 --> OUT[(Final Output)]
end
R[Request / Response Gate]
E2 -. request .-> R
R -. resume .-> E2
CP[Checkpoint]
E1 -. save .-> CP
CP -. load .-> E1
Na prática:
- O GraphFlow usa agentes como nós e transmite mensagens; As bordas representam transições condicionais.
- O fluxo de trabalho roteia mensagens digitadas ao longo das bordas. Nós (executores) podem ser agentes, funções puras ou sub-fluxos de trabalho.
- A solicitação/resposta permite uma pausa de fluxo de trabalho para entrada externa; O ponto de verificação persiste o progresso e habilita a retomada.
Comparação de código
1) Sequencial + Condicional
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)
graph = (
DiGraphBuilder()
.add_node(writer).add_node(reviewer).add_node(editor)
.add_edge(writer, reviewer) # always
.add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
.set_entry_point(writer)
).build()
team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"Draft: {task}")
@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
decision = "approve" if "solar" in draft.lower() else "revise"
await ctx.send_message(f"{decision}:{draft}")
@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
if msg.startswith("approve:"):
await ctx.yield_output(msg.split(":", 1)[1])
else:
await ctx.yield_output("Needs revision")
workflow_seq = (
WorkflowBuilder()
.add_edge(writer_exec, reviewer_exec)
.add_edge(reviewer_exec, editor_exec)
.set_start_executor(writer_exec)
.build()
)
2) Fan-out + Join (ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d
# ALL (default): D runs after both B and C
g_all = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D).add_edge(C, D)
.set_entry_point(A)
).build()
# ANY: D runs when either B or C completes
g_any = (
DiGraphBuilder()
.add_node(A).add_node(B).add_node(C).add_node(D)
.add_edge(A, B).add_edge(A, C)
.add_edge(B, D, activation_group="join_d", activation_condition="any")
.add_edge(C, D, activation_group="join_d", activation_condition="any")
.set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B:{task}", target_id="B")
await ctx.send_message(f"C:{task}", target_id="C")
@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"B_done:{text}")
@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
await ctx.send_message(f"C_done:{text}")
@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"First: {msg}") # ANY join (first arrival)
@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
state = await ctx.get_executor_state() or {"items": []}
state["items"].append(msg)
await ctx.set_executor_state(state)
if len(state["items"]) >= 2:
await ctx.yield_output(" | ".join(state["items"])) # ALL join
wf_any = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_any).add_edge(branch_c, join_any)
.set_start_executor(start)
.build()
)
wf_all = (
WorkflowBuilder()
.add_edge(start, branch_b).add_edge(start, branch_c)
.add_edge(branch_b, join_all).add_edge(branch_c, join_all)
.set_start_executor(start)
.build()
)
3) Roteamento direcionado (sem transmissão)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never
@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
# Route selectively using target_id
if task.startswith("image:"):
await ctx.send_message(task.removeprefix("image:"), target_id="vision")
else:
await ctx.send_message(task, target_id="writer")
@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Draft: {text}")
@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
await ctx.yield_output(f"Caption: {image_ref}")
workflow = (
WorkflowBuilder()
.add_edge(ingest, write)
.add_edge(ingest, caption)
.set_start_executor(ingest)
.build()
)
# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")
O que observar:
- O GraphFlow transmite mensagens e usa transições condicionais. O comportamento de junção é configurado por meio do lado
activationdo destino e por bordaactivation_group/activation_condition(por exemplo, agrupe ambas as bordasjoin_dcom ).activation_condition="any" - O fluxo de trabalho roteia dados explicitamente; use
target_idpara selecionar executores downstream. O comportamento de junção reside no executor de recebimento (por exemplo, produza na primeira entrada versus aguarde por todos) ou por meio de construtores/agregadores de orquestração. - Executores no fluxo de trabalho são de forma livre: encapsular um
ChatAgent, uma função ou um sub-fluxo de trabalho e misturá-los no mesmo grafo.
Principais Diferenças
A tabela a seguir resume as diferenças fundamentais entre o GraphFlow da AutoGen e o fluxo de trabalho do Agent Framework:
| Aspecto | Fluxo de Grafo do AutoGen | Fluxo de trabalho do Agent Framework |
|---|---|---|
| Tipo de fluxo | Fluxo de controle (as bordas são transições) | Fluxo de dados (mensagens de rota de bordas) |
| Tipos de nó | Somente agentes | Agentes, funções, sub-fluxos de trabalho |
| Ativação | Transmissão de mensagem | Ativação baseada em borda |
| Segurança de tipo | Limited | Digitação forte por toda parte |
| Capacidade de composição | Limited | Altamente composável |
Padrões de aninhamento
Aninhamento de equipe do AutoGen
# Inner team
inner_team = RoundRobinGroupChat(
participants=[specialist1, specialist2],
termination_condition=StopAfterNMessages(3)
)
# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
participants=[coordinator, inner_team, reviewer], # Team as participant
termination_condition=StopAfterNMessages(10)
)
# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")
Características de aninhamento do AutoGen:
- A equipe aninhada recebe todas as mensagens da equipe externa
- Mensagens de equipe aninhadas são transmitidas para todos os participantes externos da equipe
- Contexto de mensagem compartilhada em todos os níveis
Aninhamento de fluxo de trabalho do Agent Framework
from agent_framework import WorkflowExecutor, WorkflowBuilder
# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor
# Create sub-workflow
sub_workflow = (WorkflowBuilder()
.add_edge(specialist1_executor, specialist2_executor)
.set_start_executor(specialist1_executor)
.build())
# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
workflow=sub_workflow,
id="sub_process"
)
# Use in parent workflow
parent_workflow = (WorkflowBuilder()
.add_edge(coordinator_executor, sub_workflow_executor)
.add_edge(sub_workflow_executor, reviewer_executor)
.set_start_executor(coordinator_executor)
.build())
Características de aninhamento do Agent Framework:
- Entrada/saída isolada por meio de
WorkflowExecutor - Nenhuma transmissão de mensagens – os dados fluem por meio de conexões específicas
- Gerenciamento de estado independente para cada nível de fluxo de trabalho
Padrões de chat em grupo
Os padrões de chat em grupo permitem que vários agentes colaborem em tarefas complexas. Veja como os padrões comuns se traduzem entre estruturas.
Padrão RoundRobinGroupChat
Implementação do AutoGen:
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages
team = RoundRobinGroupChat(
participants=[agent1, agent2, agent3],
termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")
Implementação da Estrutura do Agente:
from agent_framework import SequentialBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()
# Example usage (would be in async context)
async def sequential_example():
# Each agent appends to shared conversation
async for event in workflow.run_stream("Discuss this topic"):
if isinstance(event, WorkflowOutputEvent):
conversation_history = event.data # list[ChatMessage]
Para obter exemplos de orquestração detalhados, consulte:
- Agentes Sequenciais - Execução do agente de estilo round robin
- Executores Personalizados Sequenciais – Padrões de executor personalizado
Para padrões de execução simultâneos, o Agent Framework também fornece:
from agent_framework import ConcurrentBuilder, WorkflowOutputEvent
# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
.participants([agent1, agent2, agent3])
.build())
# Example usage (would be in async context)
async def concurrent_example():
# All agents process the input concurrently
async for event in workflow.run_stream("Process this in parallel"):
if isinstance(event, WorkflowOutputEvent):
results = event.data # Combined results from all agents
Para obter exemplos de execução simultâneos, consulte:
- Agentes Simultâneos – Execução de agente paralelo
- Executores Personalizados Simultâneos – Padrões paralelos personalizados
- Simultâneo com o Agregador Personalizado – Padrões de agregação de resultados
Padrão MagenticOneGroupChat
Implementação do AutoGen:
from autogen_agentchat.teams import MagenticOneGroupChat
team = MagenticOneGroupChat(
participants=[researcher, coder, executor],
model_client=coordinator_client,
termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")
Implementação da Estrutura do Agente:
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
ChatMessage,
MagenticBuilder,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
# Create a manager agent for orchestration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator that coordinates the workflow",
instructions="You coordinate a team to complete complex tasks efficiently.",
chat_client=OpenAIChatClient(),
)
workflow = (
MagenticBuilder()
.participants(researcher=researcher, coder=coder)
.with_standard_manager(
agent=manager_agent,
max_round_count=20,
max_stall_count=3,
max_reset_count=2,
)
.build()
)
# Example usage (would be in async context)
async def magentic_example():
output: str | None = None
async for event in workflow.run_stream("Complex research task"):
if isinstance(event, AgentRunUpdateEvent):
props = event.data.additional_properties if event.data else None
event_type = props.get("magentic_event_type") if props else None
if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
text = event.data.text if event.data else ""
print(f"[ORCHESTRATOR]: {text}")
elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
if event.data and event.data.text:
print(f"[{agent_id}]: {event.data.text}", end="")
elif isinstance(event, WorkflowOutputEvent):
output_messages = cast(list[ChatMessage], event.data)
if output_messages:
output = output_messages[-1].text
Opções de personalização do Agent Framework:
O fluxo de trabalho magêntico fornece opções abrangentes de personalização:
- Configuração do gerenciador: use um ChatAgent com instruções personalizadas e configurações de modelo
-
Limites redondos:
max_round_count, ,max_stall_countmax_reset_count -
Streaming de eventos: usar
AgentRunUpdateEventcommagentic_event_typemetadados - Especialização do agente: instruções e ferramentas personalizadas por agente
- Human-in-the-loop: Revisão do plano, aprovação da ferramenta e intervenção de parada
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
MAGENTIC_EVENT_TYPE_AGENT_DELTA,
MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
AgentRunUpdateEvent,
ChatAgent,
MagenticBuilder,
MagenticHumanInterventionDecision,
MagenticHumanInterventionKind,
MagenticHumanInterventionReply,
MagenticHumanInterventionRequest,
RequestInfoEvent,
WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient
# Create manager agent with custom configuration
manager_agent = ChatAgent(
name="MagenticManager",
description="Orchestrator for complex tasks",
instructions="Custom orchestration instructions...",
chat_client=OpenAIChatClient(model_id="gpt-4o"),
)
workflow = (
MagenticBuilder()
.participants(
researcher=researcher_agent,
coder=coder_agent,
analyst=analyst_agent,
)
.with_standard_manager(
agent=manager_agent,
max_round_count=15, # Limit total rounds
max_stall_count=2, # Trigger stall handling
max_reset_count=1, # Allow one reset on failure
)
.with_plan_review() # Enable human plan review
.with_human_input_on_stall() # Enable human intervention on stalls
.build()
)
# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
req = cast(MagenticHumanInterventionRequest, event.data)
if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
# Review and approve the plan
reply = MagenticHumanInterventionReply(
decision=MagenticHumanInterventionDecision.APPROVE
)
async for ev in workflow.send_responses_streaming({event.request_id: reply}):
pass # Handle continuation
Para obter exemplos magênticos detalhados, consulte:
- Fluxo de trabalho magêntico básico – Fluxo de trabalho de vários agentes orquestrados padrão
- Magnético com Ponto de Verificação – Fluxos de trabalho orquestrados persistentes
- Atualização do Plano Humano Magêntico - Revisão do plano humano no loop
- Esclarecimento do Agente Magêntico – Aprovação da ferramenta para esclarecimento do agente
- Plano humano magêntico - Intervenção humana em barracas
Padrões futuros
O roteiro do Agent Framework inclui vários padrões de AutoGen atualmente em desenvolvimento:
- Padrão swarm: coordenação de agente baseada em entrega
- SelectorGroupChat: seleção de alto-falante orientada por LLM
Human-in-the-Loop com resposta de solicitação
Um novo recurso importante no Agent Framework Workflow é o conceito de solicitação e resposta, que permite que os fluxos de trabalho pausem a execução e aguardem a entrada externa antes de continuar. Essa funcionalidade não está presente na abstração da Team AutoGen e permite padrões sofisticados de humanos no loop.
Limitações de AutoGen
A abstração do Team AutoGen é executada continuamente uma vez iniciada e não fornece mecanismos internos para pausar a execução para entrada humana. Qualquer funcionalidade humana no loop requer implementações personalizadas fora da estrutura.
Agent Framework Request-Response API
O Agent Framework fornece recursos internos de solicitação-resposta em que qualquer executor pode enviar solicitações usando ctx.request_info() e manipular respostas com o @response_handler decorador.
from agent_framework import (
RequestInfoEvent, WorkflowBuilder, WorkflowContext,
Executor, handler, response_handler
)
from dataclasses import dataclass
# Assume we have agent_executor defined elsewhere
# Define typed request payload
@dataclass
class ApprovalRequest:
"""Request human approval for agent output."""
content: str = ""
agent_name: str = ""
# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
@handler
async def review_content(
self,
agent_response: str,
ctx: WorkflowContext
) -> None:
# Request human input with structured data
approval_request = ApprovalRequest(
content=agent_response,
agent_name="writer_agent"
)
await ctx.request_info(request_data=approval_request, response_type=str)
@response_handler
async def handle_approval_response(
self,
original_request: ApprovalRequest,
decision: str,
ctx: WorkflowContext
) -> None:
decision_lower = decision.strip().lower()
original_content = original_request.content
if decision_lower == "approved":
await ctx.yield_output(f"APPROVED: {original_content}")
else:
await ctx.yield_output(f"REVISION NEEDED: {decision}")
# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")
workflow = (WorkflowBuilder()
.add_edge(agent_executor, reviewer)
.set_start_executor(agent_executor)
.build())
Executando fluxos de trabalho humanos no loop
O Agent Framework fornece APIs de streaming para lidar com o ciclo de pausa e retomada:
from agent_framework import RequestInfoEvent, WorkflowOutputEvent
# Assume we have workflow defined from previous examples
async def run_with_human_input():
pending_responses = None
completed = False
while not completed:
# First iteration uses run_stream, subsequent use send_responses_streaming
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("initial input")
)
events = [event async for event in stream]
pending_responses = None
# Collect human requests and outputs
for event in events:
if isinstance(event, RequestInfoEvent):
# Display request to human and collect response
request_data = event.data # ApprovalRequest instance
print(f"Review needed: {request_data.content}")
human_response = input("Enter 'approved' or revision notes: ")
pending_responses = {event.request_id: human_response}
elif isinstance(event, WorkflowOutputEvent):
print(f"Final result: {event.data}")
completed = True
Para obter exemplos de fluxo de trabalho human-in-the-loop, consulte:
- Adivinhando jogo com entrada humana – fluxo de trabalho interativo com comentários do usuário
- Fluxo de trabalho como agente com entrada humana – fluxos de trabalho aninhados com interação humana
Fluxos de trabalho de ponto de verificação e retomada
Outra vantagem fundamental do Agent Framework Workflow sobre a abstração da Team AutoGen é o suporte interno para o ponto de verificação e a retomada da execução. Isso permite que os fluxos de trabalho sejam pausados, persistentes e retomados posteriormente de qualquer ponto de verificação, fornecendo tolerância a falhas e habilitando fluxos de trabalho de execução longa ou assíncrona.
Limitações de AutoGen
A abstração do Team AutoGen não fornece funcionalidades internas de ponto de verificação. Qualquer mecanismo de persistência ou recuperação deve ser implementado externamente, muitas vezes exigindo lógica complexa de gerenciamento de estado e serialização.
Ponto de Verificação da Estrutura do Agente
O Agent Framework fornece um ponto de verificação FileCheckpointStorage abrangente e o with_checkpointing() método em WorkflowBuilder. Captura de pontos de verificação:
-
Estado do executor: estado local para cada executor usando
ctx.set_executor_state() -
Estado compartilhado: estado entre executores usando
ctx.set_shared_state() - Filas de mensagens: Mensagens pendentes entre executores
- Posição do fluxo de trabalho: Progresso da execução atual e próximas etapas
from agent_framework import (
FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
Executor, handler
)
from typing_extensions import Never
class ProcessingExecutor(Executor):
@handler
async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
# Process the data
result = f"Processed: {data.upper()}"
print(f"Processing: '{data}' -> '{result}'")
# Persist executor-local state
prev_state = await ctx.get_executor_state() or {}
count = prev_state.get("count", 0) + 1
await ctx.set_executor_state({
"count": count,
"last_input": data,
"last_output": result
})
# Persist shared state for other executors
await ctx.set_shared_state("original_input", data)
await ctx.set_shared_state("processed_output", result)
await ctx.send_message(result)
class FinalizeExecutor(Executor):
@handler
async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
result = f"Final: {data}"
await ctx.yield_output(result)
# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")
# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
.add_edge(processing_executor, finalize_executor)
.set_start_executor(processing_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build())
# Example usage (would be in async context)
async def checkpoint_example():
# Run workflow - checkpoints are created automatically
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
Retomada de Pontos de Verificação
O Agent Framework fornece APIs para listar, inspecionar e retomar de pontos de verificação específicos:
from typing_extensions import Never
from agent_framework import (
Executor,
FileCheckpointStorage,
WorkflowContext,
WorkflowBuilder,
get_checkpoint_summary,
handler,
)
class UpperCaseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
result = text.upper()
await ctx.send_message(result)
class ReverseExecutor(Executor):
@handler
async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
result = text[::-1]
await ctx.yield_output(result)
def create_workflow(checkpoint_storage: FileCheckpointStorage):
"""Create a workflow with two executors and checkpointing."""
upper_executor = UpperCaseExecutor(id="upper")
reverse_executor = ReverseExecutor(id="reverse")
return (WorkflowBuilder()
.add_edge(upper_executor, reverse_executor)
.set_start_executor(upper_executor)
.with_checkpointing(checkpoint_storage=checkpoint_storage)
.build())
# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
async def checkpoint_resume_example():
# List available checkpoints
checkpoints = await checkpoint_storage.list_checkpoints()
# Display checkpoint information
for checkpoint in checkpoints:
summary = get_checkpoint_summary(checkpoint)
print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")
# Resume from a specific checkpoint
if checkpoints:
chosen_checkpoint_id = checkpoints[0].checkpoint_id
# Create new workflow instance and resume
new_workflow = create_workflow(checkpoint_storage)
async for event in new_workflow.run_stream(
checkpoint_id=chosen_checkpoint_id,
checkpoint_storage=checkpoint_storage
):
print(f"Resumed event: {event}")
Recursos avançados de ponto de verificação
Ponto de verificação com integração humana no loop:
O ponto de verificação funciona perfeitamente com fluxos de trabalho humanos no loop, permitindo que os fluxos de trabalho sejam pausados para entrada humana e retomados posteriormente. Ao retomar de um ponto de verificação que contém solicitações pendentes, essas solicitações serão remetidas como eventos:
# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
# Resume from checkpoint - pending requests will be re-emitted
request_info_events = []
async for event in workflow.run_stream(
checkpoint_id=checkpoint_id,
checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
request_info_events.append(event)
# Handle re-emitted pending request
responses = {}
for event in request_info_events:
response = handle_request(event.data)
responses[event.request_id] = response
# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
print(f"Event: {event}")
Principais benefícios
Em comparação com o AutoGen, o ponto de verificação do Agent Framework fornece:
- Persistência automática: nenhum gerenciamento de estado manual necessário
- Recuperação granular: retomar de qualquer limite de superespasagem
- Isolamento de estado: separar o estado local e compartilhado do executor
- Integração humana no loop: pausa-retomada perfeita com entrada humana
- Tolerância a falhas: recuperação robusta de falhas ou interrupções
Exemplos práticos
Para obter exemplos abrangentes de ponto de verificação, consulte:
- Ponto de verificação com Currículo – Ponto de verificação básico e retomada interativa
- Ponto de verificação com o Human-in-the-Loop – Fluxos de trabalho persistentes com portões de aprovação humana
- Ponto de verificação de sub-fluxo de trabalho – Fluxos de trabalho aninhados de ponto de verificação
- Ponto de Verificação Magêntico – Pontos de verificação orquestrados fluxos de trabalho de vários agentes
Observability
O AutoGen e o Agent Framework fornecem recursos de observabilidade, mas com diferentes abordagens e recursos.
AutoGen Observabilidade
O AutoGen tem suporte nativo para OpenTelemetry com instrumentação para:
-
Rastreamento de runtime:
SingleThreadedAgentRuntimeeGrpcWorkerAgentRuntime -
Execução da ferramenta:
BaseToolcomexecute_toolintervalos seguindo convenções semânticas do GenAI -
Operações do agente:
BaseChatAgentcomcreate_agenteinvoke_agentintervalos
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime
# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)
# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)
Observabilidade da Estrutura do Agente
O Agent Framework fornece uma observabilidade abrangente por meio de várias abordagens:
- Configuração de código zero: instrumentação automática por meio de variáveis de ambiente
- Configuração manual: configuração programática com parâmetros personalizados
- Telemetria avançada: agentes, fluxos de trabalho e acompanhamento de execução de ferramentas
- Saída do console: registro em log e visualização do console interno
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient
# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317
# Or manual setup
setup_observability(
otlp_endpoint="http://localhost:4317"
)
# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")
async def observability_example():
# Observability is automatically applied to all agents and workflows
agent = ChatAgent(name="assistant", chat_client=client)
result = await agent.run("Hello") # Automatically traced
Principais diferenças:
- Complexidade de instalação: o Agent Framework oferece opções mais simples de instalação de código zero
- Escopo: o Agent Framework fornece uma cobertura mais ampla, incluindo a observabilidade no nível do fluxo de trabalho
- Visualização: o Agent Framework inclui saída de console interna e interface do usuário de desenvolvimento
- Configuração: o Agent Framework oferece opções de configuração mais flexíveis
Para obter exemplos detalhados de observabilidade, consulte:
- Configuração de código zero – Configuração de variável de ambiente
- Configuração Manual – Configuração programática
- Observabilidade do agente – telemetria de agente único
- Observabilidade do fluxo de trabalho – rastreamento de fluxo de trabalho de vários agentes
Conclusion
Este guia de migração fornece um mapeamento abrangente entre o AutoGen e o Microsoft Agent Framework, abrangendo tudo, desde a criação básica de agente até fluxos de trabalho complexos de vários agentes. Principais takeaways para migração:
- A migração de agente único é simples, com APIs semelhantes e funcionalidades aprimoradas no Agent Framework
- Os padrões de vários agentes exigem repensar sua abordagem de arquiteturas baseadas em fluxo de dados para eventos, mas se você já estiver familiarizado com o GraphFlow, a transição será mais fácil
- O Agent Framework oferece recursos adicionais, como middleware, ferramentas hospedadas e fluxos de trabalho tipados
Para obter exemplos adicionais e diretrizes de implementação detalhadas, consulte o diretório de exemplos do Agent Framework .
Categorias de exemplo adicionais
A Estrutura do Agente fornece exemplos em várias outras áreas importantes:
- Threads: exemplos de thread – Gerenciando o estado e o contexto da conversa
- Entrada multimodal: exemplos multimodal – Trabalhando com imagens e outros tipos de mídia
- Provedores de Contexto: exemplos do Provedor de Contexto – Padrões de integração de contexto externo