Compartilhar via


Guia de migração do AutoGen para o Microsoft Agent Framework

Um guia abrangente para migrar do AutoGen para o SDK do Python do Microsoft Agent Framework.

Sumário

Contexto

O AutoGen é uma estrutura para a criação de agentes de IA e sistemas de vários agentes usando LLMs (modelos de linguagem grande). Começou como um projeto de pesquisa na Microsoft Research e foi pioneiro em vários conceitos em orquestração de vários agentes, como GroupChat e runtime de agente controlado por eventos. O projeto tem sido uma colaboração frutífera da comunidade de software livre e muitos recursos importantes vieram de colaboradores externos.

O Microsoft Agent Framework é um novo SDK de vários idiomas para criar agentes de IA e fluxos de trabalho usando LLMs. Representa uma evolução significativa das ideias pioneiras na AutoGen e incorpora lições aprendidas com o uso do mundo real. Ele é desenvolvido pelas principais equipes de AutoGen e Kernel Semântico da Microsoft e foi projetado para ser uma nova base para a criação de aplicativos de IA daqui para frente.

Este guia descreve um caminho prático de migração: ele começa cobrindo o que permanece o mesmo e o que muda rapidamente. Em seguida, aborda a configuração do cliente de modelo, os recursos de agente único e, por fim, a orquestração de vários agentes com código concreto lado a lado. Ao longo do caminho, links para exemplos executáveis no repositório do Agent Framework ajudam você a validar cada etapa.

Principais semelhanças e diferenças

O que permanece o mesmo

As fundações são familiares. Você ainda cria agentes em torno de um cliente modelo, fornece instruções e anexa ferramentas. Ambas as bibliotecas dão suporte a ferramentas de estilo de função, streaming de token, conteúdo multimodal e E/S assíncrona.

# Both frameworks follow similar patterns
# AutoGen
agent = AssistantAgent(name="assistant", model_client=client, tools=[my_tool])
result = await agent.run(task="Help me with this task")

# Agent Framework
agent = ChatAgent(name="assistant", chat_client=client, tools=[my_tool])
result = await agent.run("Help me with this task")

Principais Diferenças

  1. Estilo de orquestração: o AutoGen emparelha um núcleo controlado por eventos com um alto nível Team. O Agent Framework centra-se em um tipo baseado em Workflow grafo que roteia dados ao longo das bordas e ativa executores quando as entradas estão prontas.

  2. Ferramentas: AutoGen encapsula funções com FunctionTool. O Agent Framework usa @ai_function, infere esquemas automaticamente e adiciona ferramentas hospedadas, como um interpretador de código e uma pesquisa na Web.

  3. Comportamento do agente: AssistantAgent é de turno único, a menos que você aumente max_tool_iterations. ChatAgent é multi turn por padrão e continua invocando ferramentas até que possa retornar uma resposta final.

  4. Runtime: o AutoGen oferece runtimes distribuídos incorporados e experimentais. O Agent Framework se concentra na composição de processo único atualmente; A execução distribuída está planejada.

Criação e configuração do cliente de modelo

Ambas as estruturas fornecem clientes de modelo para os principais provedores de IA, com APIs semelhantes, mas não idênticas.

Característica AutoGen Estrutura do Agente
Cliente OpenAI OpenAIChatCompletionClient OpenAIChatClient
Cliente de respostas OpenAI ❌ Não disponível OpenAIResponsesClient
OpenAI do Azure AzureOpenAIChatCompletionClient AzureOpenAIChatClient
Respostas do Azure OpenAI ❌ Não disponível AzureOpenAIResponsesClient
IA do Azure AzureAIChatCompletionClient AzureAIAgentClient
Anthropic AnthropicChatCompletionClient 🚧 Planeado
Ollama OllamaChatCompletionClient 🚧 Planeado
Cache ChatCompletionCache capa 🚧 Planeado

Clientes do Modelo de AutoGen

from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient

# OpenAI
client = OpenAIChatCompletionClient(
    model="gpt-5",
    api_key="your-key"
)

# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    azure_deployment="gpt-5",
    api_version="2024-12-01",
    api_key="your-key"
)

Agent Framework ChatClients

from agent_framework.openai import OpenAIChatClient
from agent_framework.azure import AzureOpenAIChatClient

# OpenAI (reads API key from environment)
client = OpenAIChatClient(model_id="gpt-5")

# Azure OpenAI (uses environment or default credentials; see samples for auth options)
client = AzureOpenAIChatClient(model_id="gpt-5")

Para obter exemplos detalhados, consulte:

Suporte à API de respostas (Agent Framework Exclusive)

O Agent Framework e AzureOpenAIResponsesClientOpenAIResponsesClient o suporte especializado para modelos de raciocínio e respostas estruturadas não estão disponíveis no AutoGen:

from agent_framework.azure import AzureOpenAIResponsesClient
from agent_framework.openai import OpenAIResponsesClient

# Azure OpenAI with Responses API
azure_responses_client = AzureOpenAIResponsesClient(model_id="gpt-5")

# OpenAI with Responses API
openai_responses_client = OpenAIResponsesClient(model_id="gpt-5")

Para obter exemplos de API de Respostas, consulte:

Mapeamento de recursos do Single-Agent

Esta seção mapeia recursos de agente único entre o AutoGen e o Agent Framework. Com um cliente em vigor, crie um agente, anexe ferramentas e escolha entre não streaming e execução de streaming.

Criação e execução básicas do agente

Depois que você tiver um cliente de modelo configurado, a próxima etapa será criar agentes. Ambas as estruturas fornecem abstrações de agente semelhantes, mas com diferentes comportamentos padrão e opções de configuração.

AutoGen AssistantAgent

from autogen_agentchat.agents import AssistantAgent

agent = AssistantAgent(
    name="assistant",
    model_client=client,
    system_message="You are a helpful assistant.",
    tools=[my_tool],
    max_tool_iterations=1  # Single-turn by default
)

# Execution
result = await agent.run(task="What's the weather?")

Agent Framework ChatAgent

from agent_framework import ChatAgent, ai_function
from agent_framework.openai import OpenAIChatClient

# Create simple tools for the example
@ai_function
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

@ai_function
def get_time() -> str:
    """Get current time."""
    return "Current time: 2:30 PM"

# Create client
client = OpenAIChatClient(model_id="gpt-5")

async def example():
    # Direct creation
    agent = ChatAgent(
        name="assistant",
        chat_client=client,
        instructions="You are a helpful assistant.",
        tools=[get_weather]  # Multi-turn by default
    )

    # Factory method (more convenient)
    agent = client.create_agent(
        name="assistant",
        instructions="You are a helpful assistant.",
        tools=[get_weather]
    )

    # Execution with runtime tool configuration
    result = await agent.run(
        "What's the weather?",
        tools=[get_time],  # Can add tools at runtime
        tool_choice="auto"
    )

Principais diferenças:

  • Comportamento padrão: ChatAgent itera automaticamente por meio de chamadas de ferramenta, enquanto AssistantAgent requer configuração explícita max_tool_iterations
  • Configuração de runtime: ChatAgent.run() aceita tools e tool_choice parâmetros para personalização por invocação
  • Métodos de fábrica: o Agent Framework fornece métodos de fábrica convenientes diretamente de clientes de chat
  • Gerenciamento de estado: ChatAgent é sem estado e não mantém o histórico de conversas entre invocações, ao contrário AssistantAgent do que mantém o histórico de conversas como parte de seu estado

Gerenciando o estado de conversa com AgentThread

Para continuar as conversas com ChatAgent, use AgentThread para gerenciar o histórico de conversas:

# Assume we have an agent from previous examples
async def conversation_example():
    # Create a new thread that will be reused
    thread = agent.get_new_thread()

    # First interaction - thread is empty
    result1 = await agent.run("What's 2+2?", thread=thread)
    print(result1.text)  # "4"

    # Continue conversation - thread contains previous messages
    result2 = await agent.run("What about that number times 10?", thread=thread)
    print(result2.text)  # "40" (understands "that number" refers to 4)

    # AgentThread can use external storage, similar to ChatCompletionContext in AutoGen

Sem estado por padrão: demonstração rápida

# Without a thread (two independent invocations)
r1 = await agent.run("What's 2+2?")
print(r1.text)  # for example, "4"

r2 = await agent.run("What about that number times 10?")
print(r2.text)  # Likely ambiguous without prior context; cannot be "40"

# With a thread (shared context across calls)
thread = agent.get_new_thread()
print((await agent.run("What's 2+2?", thread=thread)).text)  # "4"
print((await agent.run("What about that number times 10?", thread=thread)).text)  # "40"

Para obter exemplos de gerenciamento de threads, consulte:

Equivalência do Agente Assistente do OpenAI

Ambas as estruturas fornecem integração à API do Assistente do OpenAI:

# AutoGen OpenAIAssistantAgent
from autogen_ext.agents.openai import OpenAIAssistantAgent
# Agent Framework has OpenAI Assistants support via OpenAIAssistantsClient
from agent_framework.openai import OpenAIAssistantsClient

Para obter exemplos do Assistente do OpenAI, consulte:

Suporte de streaming

Ambas as estruturas transmitem tokens em tempo real, de clientes e de agentes, para manter as interfaces do usuário responsivas.

AutoGen Streaming

# Model client streaming
async for chunk in client.create_stream(messages):
    if isinstance(chunk, str):
        print(chunk, end="")

# Agent streaming
async for event in agent.run_stream(task="Hello"):
    if isinstance(event, ModelClientStreamingChunkEvent):
        print(event.content, end="")
    elif isinstance(event, TaskResult):
        print("Final result received")

Agent Framework Streaming

# Assume we have client, agent, and tools from previous examples
async def streaming_example():
    # Chat client streaming
    async for chunk in client.get_streaming_response("Hello", tools=tools):
        if chunk.text:
            print(chunk.text, end="")

    # Agent streaming
    async for chunk in agent.run_stream("Hello"):
        if chunk.text:
            print(chunk.text, end="", flush=True)

Dica: no Agent Framework, clientes e agentes produzem a mesma forma de atualização; você pode ler chunk.text em ambos os casos.

Tipos e criação de mensagens

Entender como as mensagens funcionam é crucial para a comunicação efetiva do agente. Ambas as estruturas fornecem abordagens diferentes para criação e manipulação de mensagens, com o AutoGen usando classes de mensagens separadas e o Agent Framework usando um sistema de mensagens unificado.

Tipos de mensagens autogen

from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core.models import UserMessage

# Text message
text_msg = TextMessage(content="Hello", source="user")

# Multi-modal message
multi_modal_msg = MultiModalMessage(
    content=["Describe this image", image_data],
    source="user"
)

# Convert to model format for use with model clients
user_message = text_msg.to_model_message()

Tipos de mensagens da Estrutura do Agente

from agent_framework import ChatMessage, TextContent, DataContent, UriContent, Role
import base64

# Text message
text_msg = ChatMessage(role=Role.USER, text="Hello")

# Supply real image bytes, or use a data: URI/URL via UriContent
image_bytes = b"<your_image_bytes>"
image_b64 = base64.b64encode(image_bytes).decode()
image_uri = f"data:image/jpeg;base64,{image_b64}"

# Multi-modal message with mixed content
multi_modal_msg = ChatMessage(
    role=Role.USER,
    contents=[
        TextContent(text="Describe this image"),
        DataContent(uri=image_uri, media_type="image/jpeg")
    ]
)

Principais diferenças:

  • O AutoGen usa classes de mensagem separadas (TextMessage, MultiModalMessage) com um source campo
  • O Agent Framework usa um unificado ChatMessage com objetos de conteúdo tipados e um role campo
  • As mensagens do Agent Framework usam Role enum (USER, ASSISTANT, SYSTEM, TOOL) em vez de fontes de cadeia de caracteres

Criação e integração de ferramentas

As ferramentas estendem os recursos do agente além da geração de texto. As estruturas assumem abordagens diferentes para a criação de ferramentas, com o Agent Framework fornecendo uma geração de esquema mais automatizada.

AutoGen FunctionTool

from autogen_core.tools import FunctionTool

async def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Manual tool creation
tool = FunctionTool(
    func=get_weather,
    description="Get weather information"
)

# Use with agent
agent = AssistantAgent(name="assistant", model_client=client, tools=[tool])

Estrutura do Agente @ai_function

from agent_framework import ai_function
from typing import Annotated
from pydantic import Field

@ai_function
def get_weather(
    location: Annotated[str, Field(description="The location to get weather for")]
) -> str:
    """Get weather for a location."""
    return f"Weather in {location}: sunny"

# Direct use with agent (automatic conversion)
agent = ChatAgent(name="assistant", chat_client=client, tools=[get_weather])

Para obter exemplos detalhados, consulte:

Ferramentas Hospedadas (Agent Framework Exclusive)

O Agent Framework fornece ferramentas hospedadas que não estão disponíveis no AutoGen:

from agent_framework import ChatAgent, HostedCodeInterpreterTool, HostedWebSearchTool
from agent_framework.azure import AzureOpenAIChatClient

# Azure OpenAI client with a model that supports hosted tools
client = AzureOpenAIChatClient(model_id="gpt-5")

# Code execution tool
code_tool = HostedCodeInterpreterTool()

# Web search tool
search_tool = HostedWebSearchTool()

agent = ChatAgent(
    name="researcher",
    chat_client=client,
    tools=[code_tool, search_tool]
)

Para obter exemplos detalhados, consulte:

Requisitos e advertências:

  • As ferramentas hospedadas só estão disponíveis em modelos/contas que dão suporte a elas. Verifique os direitos e o suporte ao modelo para seu provedor antes de habilitar essas ferramentas.
  • A configuração difere por provedor; siga os pré-requisitos em cada exemplo para instalação e permissões.
  • Nem todo modelo dá suporte a todas as ferramentas hospedadas (por exemplo, pesquisa na Web versus interpretador de código). Escolha um modelo compatível em seu ambiente.

Observação

O AutoGen dá suporte a ferramentas de execução de código local, mas esse recurso está planejado para versões futuras do Agent Framework.

Diferença de chave: o Agent Framework manipula a iteração da ferramenta automaticamente no nível do agente. Ao contrário do parâmetro do AutoGen, os agentes do max_tool_iterations Agent Framework continuam a execução da ferramenta até a conclusão por padrão, com mecanismos de segurança internos para evitar loops infinitos.

Suporte ao servidor MCP

Para integração avançada de ferramentas, ambas as estruturas dão suporte ao PROTOCOLO MCP (Model Context Protocol), permitindo que os agentes interajam com serviços externos e fontes de dados. O Agent Framework fornece suporte interno mais abrangente.

Suporte ao MCP do AutoGen

O AutoGen tem suporte básico do MCP por meio de extensões (os detalhes de implementação específicos variam de acordo com a versão).

Suporte ao MCP do Agent Framework

from agent_framework import ChatAgent, MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework.openai import OpenAIChatClient

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

# Stdio MCP server
mcp_tool = MCPStdioTool(
    name="filesystem",
    command="uvx mcp-server-filesystem",
    args=["/allowed/directory"]
)

# HTTP streaming MCP
http_mcp = MCPStreamableHTTPTool(
    name="http_mcp",
    url="http://localhost:8000/sse"
)

# WebSocket MCP
ws_mcp = MCPWebsocketTool(
    name="websocket_mcp",
    url="ws://localhost:8000/ws"
)

agent = ChatAgent(name="assistant", chat_client=client, tools=[mcp_tool])

Para obter exemplos de MCP, consulte:

Padrão de agente como ferramenta

Um padrão poderoso é usar os próprios agentes como ferramentas, habilitando arquiteturas de agente hierárquico. Ambas as estruturas dão suporte a esse padrão com implementações diferentes.

AutoGen AgentTool

from autogen_agentchat.tools import AgentTool

# Create specialized agent
writer = AssistantAgent(
    name="writer",
    model_client=client,
    system_message="You are a creative writer."
)

# Wrap as tool
writer_tool = AgentTool(agent=writer)

# Use in coordinator (requires disabling parallel tool calls)
coordinator_client = OpenAIChatCompletionClient(
    model="gpt-5",
    parallel_tool_calls=False
)
coordinator = AssistantAgent(
    name="coordinator",
    model_client=coordinator_client,
    tools=[writer_tool]
)

As_tool do Agent Framework()

from agent_framework import ChatAgent

# Assume we have client from previous examples
# Create specialized agent
writer = ChatAgent(
    name="writer",
    chat_client=client,
    instructions="You are a creative writer."
)

# Convert to tool
writer_tool = writer.as_tool(
    name="creative_writer",
    description="Generate creative content",
    arg_name="request",
    arg_description="What to write"
)

# Use in coordinator
coordinator = ChatAgent(
    name="coordinator",
    chat_client=client,
    tools=[writer_tool]
)

Observação de migração explícita: no AutoGen, defina parallel_tool_calls=False no cliente de modelo do coordenador ao encapsular agentes como ferramentas para evitar problemas de simultaneidade ao invocar a mesma instância do agente. No Agent Framework, as_tool() não requer desabilitar chamadas de ferramenta paralela, pois os agentes são sem estado por padrão.

Middleware (recurso do Agent Framework)

O Agent Framework apresenta recursos de middleware que a AutoGen não tem. O Middleware permite preocupações avançadas entre cortes, como registro em log, segurança e monitoramento de desempenho.

from agent_framework import ChatAgent, AgentRunContext, FunctionInvocationContext
from typing import Callable, Awaitable

# Assume we have client from previous examples
async def logging_middleware(
    context: AgentRunContext,
    next: Callable[[AgentRunContext], Awaitable[None]]
) -> None:
    print(f"Agent {context.agent.name} starting")
    await next(context)
    print(f"Agent {context.agent.name} completed")

async def security_middleware(
    context: FunctionInvocationContext,
    next: Callable[[FunctionInvocationContext], Awaitable[None]]
) -> None:
    if "password" in str(context.arguments):
        print("Blocking function call with sensitive data")
        return  # Don't call next()
    await next(context)

agent = ChatAgent(
    name="secure_agent",
    chat_client=client,
    middleware=[logging_middleware, security_middleware]
)

Benefícios:

  • Segurança: validação de entrada e filtragem de conteúdo
  • Observabilidade: registro em log, métricas e rastreamento
  • Desempenho: Cache e limitação de taxa
  • Tratamento de erros: degradação normal e lógica de repetição

Para obter exemplos detalhados de middleware, consulte:

Agentes Personalizados

Às vezes, você não quer um agente com suporte de modelo– você deseja um agente determinístico ou com suporte de API com lógica personalizada. Ambas as estruturas dão suporte à criação de agentes personalizados, mas os padrões diferem.

AutoGen: Subclasse BaseChatAgent

from typing import Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import BaseChatMessage, TextMessage, StopMessage
from autogen_core import CancellationToken

class StaticAgent(BaseChatAgent):
    def __init__(self, name: str = "static", description: str = "Static responder") -> None:
        super().__init__(name, description)

    @property
    def produced_message_types(self) -> Sequence[type[BaseChatMessage]]:  # Which message types this agent produces
        return (TextMessage,)

    async def on_messages(self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> Response:
        # Always return a static response
        return Response(chat_message=TextMessage(content="Hello from AutoGen custom agent", source=self.name))

Notes:

  • Implemente on_messages(...) e retorne um Response com uma mensagem de chat.
  • Opcionalmente, implemente on_reset(...) para limpar o estado interno entre as execuções.

Estrutura do Agente: Estender BaseAgent (com reconhecimento de thread)

from collections.abc import AsyncIterable
from typing import Any
from agent_framework import (
    AgentRunResponse,
    AgentRunResponseUpdate,
    AgentThread,
    BaseAgent,
    ChatMessage,
    Role,
    TextContent,
)

class StaticAgent(BaseAgent):
    async def run(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AgentRunResponse:
        # Build a static reply
        reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])

        # Persist conversation to the provided AgentThread (if any)
        if thread is not None:
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

        return AgentRunResponse(messages=[reply])

    async def run_stream(
        self,
        messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
        *,
        thread: AgentThread | None = None,
        **kwargs: Any,
    ) -> AsyncIterable[AgentRunResponseUpdate]:
        # Stream the same static response in a single chunk for simplicity
        yield AgentRunResponseUpdate(contents=[TextContent(text="Hello from AF custom agent")], role=Role.ASSISTANT)

        # Notify thread of input and the complete response once streaming ends
        if thread is not None:
            reply = ChatMessage(role=Role.ASSISTANT, contents=[TextContent(text="Hello from AF custom agent")])
            normalized = self._normalize_messages(messages)
            await self._notify_thread_of_new_messages(thread, normalized, reply)

Notes:

  • AgentThreadmantém o estado da conversa externamente; use agent.get_new_thread() e passe pararun/run_stream .
  • Chame self._notify_thread_of_new_messages(thread, input_messages, response_messages) para que o thread tenha ambos os lados da troca.
  • Veja o exemplo completo: Agente Personalizado

Em seguida, vamos examinar a orquestração de vários agentes, a área em que as estruturas diferem mais.

Mapeamento de recursos de vários agentes

Visão geral do modelo de programação

Os modelos de programação de vários agentes representam a diferença mais significativa entre as duas estruturas.

Abordagem de modelo duplo da AutoGen

O AutoGen fornece dois modelos de programação:

  1. autogen-core: programação controlada por eventos de baixo nível com RoutedAgent assinaturas e mensagens
  2. Team abstração: modelo centrado em execução de alto nível criado com base em autogen-core
# Low-level autogen-core (complex)
class MyAgent(RoutedAgent):
    @message_handler
    async def handle_message(self, message: TextMessage, ctx: MessageContext) -> None:
        # Handle specific message types
        pass

# High-level Team (easier but limited)
team = RoundRobinGroupChat(
    participants=[agent1, agent2],
    termination_condition=StopAfterNMessages(5)
)
result = await team.run(task="Collaborate on this task")

Desafios:

  • O modelo de baixo nível é muito complexo para a maioria dos usuários
  • O modelo de alto nível pode se tornar limitador para comportamentos complexos
  • A ponte entre os dois modelos adiciona complexidade de implementação

Modelo de fluxo de trabalho unificado do Agent Framework

O Agent Framework fornece uma única Workflow abstração que combina o melhor de ambas as abordagens:

from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

# Assume we have agent1 and agent2 from previous examples
@executor(id="agent1")
async def agent1_executor(input_msg: str, ctx: WorkflowContext[str]) -> None:
    response = await agent1.run(input_msg)
    await ctx.send_message(response.text)

@executor(id="agent2")
async def agent2_executor(input_msg: str, ctx: WorkflowContext[Never, str]) -> None:
    response = await agent2.run(input_msg)
    await ctx.yield_output(response.text)  # Final output

# Build typed data flow graph
workflow = (WorkflowBuilder()
           .add_edge(agent1_executor, agent2_executor)
           .set_start_executor(agent1_executor)
           .build())

# Example usage (would be in async context)
# result = await workflow.run("Initial input")

Para obter exemplos detalhados de fluxo de trabalho, consulte:

Benefícios:

  • Modelo unificado: abstração única para todos os níveis de complexidade
  • Segurança do tipo: entradas e saídas fortemente tipada
  • Visualização do grafo: Limpar representação de fluxo de dados
  • Composição flexível: misturar agentes, funções e sub-fluxos de trabalho

Fluxo de trabalho vs GraphFlow

A abstração do Workflow Agent Framework é inspirada no recurso experimental GraphFlow da AutoGen, mas representa uma evolução significativa na filosofia de design:

  • GraphFlow: fluxo de controle baseado em onde as bordas são transições e as mensagens são transmitidas para todos os agentes; as transições são condicionadas no conteúdo da mensagem transmitida
  • Fluxo de trabalho: o fluxo de dados baseado em onde as mensagens são roteadas por meio de bordas e executores específicos são ativados por bordas, com suporte para execução simultânea.

Visão geral do visual

O diagrama a seguir contrasta o Fluxo de Grafo de fluxo de controle da AutoGen (à esquerda) com o fluxo de trabalho de fluxo de dados do Agent Framework (à direita). O GraphFlow modela agentes como nós com transições e transmissões condicionais. Executores de modelos de fluxo de trabalho (agentes, funções ou sub-fluxos de trabalho) conectados por bordas tipados; ele também dá suporte a pausas de solicitação/resposta e ponto de verificação.

flowchart LR

  subgraph AutoGenGraphFlow
    direction TB
    U[User / Task] --> A[Agent A]
    A -->|success| B[Agent B]
    A -->|retry| C[Agent C]
    A -. broadcast .- B
    A -. broadcast .- C
  end

  subgraph AgentFrameworkWorkflow
    direction TB
    I[Input] --> E1[Executor 1]
    E1 -->|"str"| E2[Executor 2]
    E1 -->|"image"| E3[Executor 3]
    E3 -->|"str"| E2
    E2 --> OUT[(Final Output)]
  end

  R[Request / Response Gate]
  E2 -. request .-> R
  R -. resume .-> E2

  CP[Checkpoint]
  E1 -. save .-> CP
  CP -. load .-> E1

Na prática:

  • O GraphFlow usa agentes como nós e transmite mensagens; As bordas representam transições condicionais.
  • O fluxo de trabalho roteia mensagens digitadas ao longo das bordas. Nós (executores) podem ser agentes, funções puras ou sub-fluxos de trabalho.
  • A solicitação/resposta permite uma pausa de fluxo de trabalho para entrada externa; O ponto de verificação persiste o progresso e habilita a retomada.

Comparação de código

1) Sequencial + Condicional
# AutoGen GraphFlow (fluent builder) — writer → reviewer → editor (conditional)
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow

writer = AssistantAgent(name="writer", description="Writes a draft", model_client=client)
reviewer = AssistantAgent(name="reviewer", description="Reviews the draft", model_client=client)
editor = AssistantAgent(name="editor", description="Finalizes the draft", model_client=client)

graph = (
    DiGraphBuilder()
    .add_node(writer).add_node(reviewer).add_node(editor)
    .add_edge(writer, reviewer)  # always
    .add_edge(reviewer, editor, condition=lambda msg: "approve" in msg.to_model_text())
    .set_entry_point(writer)
).build()

team = GraphFlow(participants=[writer, reviewer, editor], graph=graph)
result = await team.run(task="Draft a short paragraph about solar power")
# Agent Framework Workflow — sequential executors with conditional logic
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="writer")
async def writer_exec(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"Draft: {task}")

@executor(id="reviewer")
async def reviewer_exec(draft: str, ctx: WorkflowContext[str]) -> None:
    decision = "approve" if "solar" in draft.lower() else "revise"
    await ctx.send_message(f"{decision}:{draft}")

@executor(id="editor")
async def editor_exec(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    if msg.startswith("approve:"):
        await ctx.yield_output(msg.split(":", 1)[1])
    else:
        await ctx.yield_output("Needs revision")

workflow_seq = (
    WorkflowBuilder()
    .add_edge(writer_exec, reviewer_exec)
    .add_edge(reviewer_exec, editor_exec)
    .set_start_executor(writer_exec)
    .build()
)
2) Fan-out + Join (ALL vs ANY)
# AutoGen GraphFlow — A → (B, C) → D with ALL/ANY join
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
A, B, C, D = agent_a, agent_b, agent_c, agent_d

# ALL (default): D runs after both B and C
g_all = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D).add_edge(C, D)
    .set_entry_point(A)
).build()

# ANY: D runs when either B or C completes
g_any = (
    DiGraphBuilder()
    .add_node(A).add_node(B).add_node(C).add_node(D)
    .add_edge(A, B).add_edge(A, C)
    .add_edge(B, D, activation_group="join_d", activation_condition="any")
    .add_edge(C, D, activation_group="join_d", activation_condition="any")
    .set_entry_point(A)
).build()
# Agent Framework Workflow — A → (B, C) → aggregator (ALL vs ANY)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="A")
async def start(task: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B:{task}", target_id="B")
    await ctx.send_message(f"C:{task}", target_id="C")

@executor(id="B")
async def branch_b(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"B_done:{text}")

@executor(id="C")
async def branch_c(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(f"C_done:{text}")

@executor(id="join_any")
async def join_any(msg: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"First: {msg}")  # ANY join (first arrival)

@executor(id="join_all")
async def join_all(msg: str, ctx: WorkflowContext[str, str]) -> None:
    state = await ctx.get_executor_state() or {"items": []}
    state["items"].append(msg)
    await ctx.set_executor_state(state)
    if len(state["items"]) >= 2:
        await ctx.yield_output(" | ".join(state["items"]))  # ALL join

wf_any = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_any).add_edge(branch_c, join_any)
    .set_start_executor(start)
    .build()
)

wf_all = (
    WorkflowBuilder()
    .add_edge(start, branch_b).add_edge(start, branch_c)
    .add_edge(branch_b, join_all).add_edge(branch_c, join_all)
    .set_start_executor(start)
    .build()
)
3) Roteamento direcionado (sem transmissão)
from agent_framework import WorkflowBuilder, executor, WorkflowContext
from typing_extensions import Never

@executor(id="ingest")
async def ingest(task: str, ctx: WorkflowContext[str]) -> None:
    # Route selectively using target_id
    if task.startswith("image:"):
        await ctx.send_message(task.removeprefix("image:"), target_id="vision")
    else:
        await ctx.send_message(task, target_id="writer")

@executor(id="writer")
async def write(text: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Draft: {text}")

@executor(id="vision")
async def caption(image_ref: str, ctx: WorkflowContext[Never, str]) -> None:
    await ctx.yield_output(f"Caption: {image_ref}")

workflow = (
    WorkflowBuilder()
    .add_edge(ingest, write)
    .add_edge(ingest, caption)
    .set_start_executor(ingest)
    .build()
)

# Example usage (async):
# await workflow.run("Summarize the benefits of solar power")
# await workflow.run("image:https://example.com/panel.jpg")

O que observar:

  • O GraphFlow transmite mensagens e usa transições condicionais. O comportamento de junção é configurado por meio do lado activation do destino e por borda activation_group/activation_condition(por exemplo, agrupe ambas as bordas join_d com ).activation_condition="any"
  • O fluxo de trabalho roteia dados explicitamente; use target_id para selecionar executores downstream. O comportamento de junção reside no executor de recebimento (por exemplo, produza na primeira entrada versus aguarde por todos) ou por meio de construtores/agregadores de orquestração.
  • Executores no fluxo de trabalho são de forma livre: encapsular um ChatAgent, uma função ou um sub-fluxo de trabalho e misturá-los no mesmo grafo.

Principais Diferenças

A tabela a seguir resume as diferenças fundamentais entre o GraphFlow da AutoGen e o fluxo de trabalho do Agent Framework:

Aspecto Fluxo de Grafo do AutoGen Fluxo de trabalho do Agent Framework
Tipo de fluxo Fluxo de controle (as bordas são transições) Fluxo de dados (mensagens de rota de bordas)
Tipos de nó Somente agentes Agentes, funções, sub-fluxos de trabalho
Ativação Transmissão de mensagem Ativação baseada em borda
Segurança de tipo Limited Digitação forte por toda parte
Capacidade de composição Limited Altamente composável

Padrões de aninhamento

Aninhamento de equipe do AutoGen

# Inner team
inner_team = RoundRobinGroupChat(
    participants=[specialist1, specialist2],
    termination_condition=StopAfterNMessages(3)
)

# Outer team with nested team as participant
outer_team = RoundRobinGroupChat(
    participants=[coordinator, inner_team, reviewer],  # Team as participant
    termination_condition=StopAfterNMessages(10)
)

# Messages are broadcasted to all participants including nested team
result = await outer_team.run("Complex task requiring collaboration")

Características de aninhamento do AutoGen:

  • A equipe aninhada recebe todas as mensagens da equipe externa
  • Mensagens de equipe aninhadas são transmitidas para todos os participantes externos da equipe
  • Contexto de mensagem compartilhada em todos os níveis

Aninhamento de fluxo de trabalho do Agent Framework

from agent_framework import WorkflowExecutor, WorkflowBuilder

# Assume we have executors from previous examples
# specialist1_executor, specialist2_executor, coordinator_executor, reviewer_executor

# Create sub-workflow
sub_workflow = (WorkflowBuilder()
               .add_edge(specialist1_executor, specialist2_executor)
               .set_start_executor(specialist1_executor)
               .build())

# Wrap as executor
sub_workflow_executor = WorkflowExecutor(
    workflow=sub_workflow,
    id="sub_process"
)

# Use in parent workflow
parent_workflow = (WorkflowBuilder()
                  .add_edge(coordinator_executor, sub_workflow_executor)
                  .add_edge(sub_workflow_executor, reviewer_executor)
                  .set_start_executor(coordinator_executor)
                  .build())

Características de aninhamento do Agent Framework:

  • Entrada/saída isolada por meio de WorkflowExecutor
  • Nenhuma transmissão de mensagens – os dados fluem por meio de conexões específicas
  • Gerenciamento de estado independente para cada nível de fluxo de trabalho

Padrões de chat em grupo

Os padrões de chat em grupo permitem que vários agentes colaborem em tarefas complexas. Veja como os padrões comuns se traduzem entre estruturas.

Padrão RoundRobinGroupChat

Implementação do AutoGen:

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import StopAfterNMessages

team = RoundRobinGroupChat(
    participants=[agent1, agent2, agent3],
    termination_condition=StopAfterNMessages(10)
)
result = await team.run("Discuss this topic")

Implementação da Estrutura do Agente:

from agent_framework import SequentialBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Sequential workflow through participants
workflow = SequentialBuilder().participants([agent1, agent2, agent3]).build()

# Example usage (would be in async context)
async def sequential_example():
    # Each agent appends to shared conversation
    async for event in workflow.run_stream("Discuss this topic"):
        if isinstance(event, WorkflowOutputEvent):
            conversation_history = event.data  # list[ChatMessage]

Para obter exemplos de orquestração detalhados, consulte:

Para padrões de execução simultâneos, o Agent Framework também fornece:

from agent_framework import ConcurrentBuilder, WorkflowOutputEvent

# Assume we have agent1, agent2, agent3 from previous examples
# Concurrent workflow for parallel processing
workflow = (ConcurrentBuilder()
           .participants([agent1, agent2, agent3])
           .build())

# Example usage (would be in async context)
async def concurrent_example():
    # All agents process the input concurrently
    async for event in workflow.run_stream("Process this in parallel"):
        if isinstance(event, WorkflowOutputEvent):
            results = event.data  # Combined results from all agents

Para obter exemplos de execução simultâneos, consulte:

Padrão MagenticOneGroupChat

Implementação do AutoGen:

from autogen_agentchat.teams import MagenticOneGroupChat

team = MagenticOneGroupChat(
    participants=[researcher, coder, executor],
    model_client=coordinator_client,
    termination_condition=StopAfterNMessages(20)
)
result = await team.run("Complex research and analysis task")

Implementação da Estrutura do Agente:

from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    ChatMessage,
    MagenticBuilder,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create a manager agent for orchestration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator that coordinates the workflow",
    instructions="You coordinate a team to complete complex tasks efficiently.",
    chat_client=OpenAIChatClient(),
)

workflow = (
    MagenticBuilder()
    .participants(researcher=researcher, coder=coder)
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=20,
        max_stall_count=3,
        max_reset_count=2,
    )
    .build()
)

# Example usage (would be in async context)
async def magentic_example():
    output: str | None = None
    async for event in workflow.run_stream("Complex research task"):
        if isinstance(event, AgentRunUpdateEvent):
            props = event.data.additional_properties if event.data else None
            event_type = props.get("magentic_event_type") if props else None

            if event_type == MAGENTIC_EVENT_TYPE_ORCHESTRATOR:
                text = event.data.text if event.data else ""
                print(f"[ORCHESTRATOR]: {text}")
            elif event_type == MAGENTIC_EVENT_TYPE_AGENT_DELTA:
                agent_id = props.get("agent_id", event.executor_id) if props else event.executor_id
                if event.data and event.data.text:
                    print(f"[{agent_id}]: {event.data.text}", end="")

        elif isinstance(event, WorkflowOutputEvent):
            output_messages = cast(list[ChatMessage], event.data)
            if output_messages:
                output = output_messages[-1].text

Opções de personalização do Agent Framework:

O fluxo de trabalho magêntico fornece opções abrangentes de personalização:

  • Configuração do gerenciador: use um ChatAgent com instruções personalizadas e configurações de modelo
  • Limites redondos: max_round_count, , max_stall_countmax_reset_count
  • Streaming de eventos: usar AgentRunUpdateEvent com magentic_event_type metadados
  • Especialização do agente: instruções e ferramentas personalizadas por agente
  • Human-in-the-loop: Revisão do plano, aprovação da ferramenta e intervenção de parada
# Advanced customization example with human-in-the-loop
from typing import cast
from agent_framework import (
    MAGENTIC_EVENT_TYPE_AGENT_DELTA,
    MAGENTIC_EVENT_TYPE_ORCHESTRATOR,
    AgentRunUpdateEvent,
    ChatAgent,
    MagenticBuilder,
    MagenticHumanInterventionDecision,
    MagenticHumanInterventionKind,
    MagenticHumanInterventionReply,
    MagenticHumanInterventionRequest,
    RequestInfoEvent,
    WorkflowOutputEvent,
)
from agent_framework.openai import OpenAIChatClient

# Create manager agent with custom configuration
manager_agent = ChatAgent(
    name="MagenticManager",
    description="Orchestrator for complex tasks",
    instructions="Custom orchestration instructions...",
    chat_client=OpenAIChatClient(model_id="gpt-4o"),
)

workflow = (
    MagenticBuilder()
    .participants(
        researcher=researcher_agent,
        coder=coder_agent,
        analyst=analyst_agent,
    )
    .with_standard_manager(
        agent=manager_agent,
        max_round_count=15,      # Limit total rounds
        max_stall_count=2,       # Trigger stall handling
        max_reset_count=1,       # Allow one reset on failure
    )
    .with_plan_review()           # Enable human plan review
    .with_human_input_on_stall()  # Enable human intervention on stalls
    .build()
)

# Handle human intervention requests during execution
async for event in workflow.run_stream("Complex task"):
    if isinstance(event, RequestInfoEvent) and event.request_type is MagenticHumanInterventionRequest:
        req = cast(MagenticHumanInterventionRequest, event.data)
        if req.kind == MagenticHumanInterventionKind.PLAN_REVIEW:
            # Review and approve the plan
            reply = MagenticHumanInterventionReply(
                decision=MagenticHumanInterventionDecision.APPROVE
            )
            async for ev in workflow.send_responses_streaming({event.request_id: reply}):
                pass  # Handle continuation

Para obter exemplos magênticos detalhados, consulte:

Padrões futuros

O roteiro do Agent Framework inclui vários padrões de AutoGen atualmente em desenvolvimento:

  • Padrão swarm: coordenação de agente baseada em entrega
  • SelectorGroupChat: seleção de alto-falante orientada por LLM

Human-in-the-Loop com resposta de solicitação

Um novo recurso importante no Agent Framework Workflow é o conceito de solicitação e resposta, que permite que os fluxos de trabalho pausem a execução e aguardem a entrada externa antes de continuar. Essa funcionalidade não está presente na abstração da Team AutoGen e permite padrões sofisticados de humanos no loop.

Limitações de AutoGen

A abstração do Team AutoGen é executada continuamente uma vez iniciada e não fornece mecanismos internos para pausar a execução para entrada humana. Qualquer funcionalidade humana no loop requer implementações personalizadas fora da estrutura.

Agent Framework Request-Response API

O Agent Framework fornece recursos internos de solicitação-resposta em que qualquer executor pode enviar solicitações usando ctx.request_info() e manipular respostas com o @response_handler decorador.

from agent_framework import (
    RequestInfoEvent, WorkflowBuilder, WorkflowContext, 
    Executor, handler, response_handler
)
from dataclasses import dataclass

# Assume we have agent_executor defined elsewhere

# Define typed request payload
@dataclass
class ApprovalRequest:
    """Request human approval for agent output."""
    content: str = ""
    agent_name: str = ""

# Workflow executor that requests human approval
class ReviewerExecutor(Executor):
    
    @handler
    async def review_content(
        self,
        agent_response: str,
        ctx: WorkflowContext
    ) -> None:
        # Request human input with structured data
        approval_request = ApprovalRequest(
            content=agent_response,
            agent_name="writer_agent"
        )
        await ctx.request_info(request_data=approval_request, response_type=str)
    
    @response_handler
    async def handle_approval_response(
        self,
        original_request: ApprovalRequest,
        decision: str,
        ctx: WorkflowContext
    ) -> None:
        decision_lower = decision.strip().lower()
        original_content = original_request.content

        if decision_lower == "approved":
            await ctx.yield_output(f"APPROVED: {original_content}")
        else:
            await ctx.yield_output(f"REVISION NEEDED: {decision}")

# Build workflow with human-in-the-loop
reviewer = ReviewerExecutor(id="reviewer")

workflow = (WorkflowBuilder()
           .add_edge(agent_executor, reviewer)
           .set_start_executor(agent_executor)
           .build())

Executando fluxos de trabalho humanos no loop

O Agent Framework fornece APIs de streaming para lidar com o ciclo de pausa e retomada:

from agent_framework import RequestInfoEvent, WorkflowOutputEvent

# Assume we have workflow defined from previous examples
async def run_with_human_input():
    pending_responses = None
    completed = False

    while not completed:
        # First iteration uses run_stream, subsequent use send_responses_streaming
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("initial input")
        )

        events = [event async for event in stream]
        pending_responses = None

        # Collect human requests and outputs
        for event in events:
            if isinstance(event, RequestInfoEvent):
                # Display request to human and collect response
                request_data = event.data  # ApprovalRequest instance
                print(f"Review needed: {request_data.content}")

                human_response = input("Enter 'approved' or revision notes: ")
                pending_responses = {event.request_id: human_response}

            elif isinstance(event, WorkflowOutputEvent):
                print(f"Final result: {event.data}")
                completed = True

Para obter exemplos de fluxo de trabalho human-in-the-loop, consulte:

Fluxos de trabalho de ponto de verificação e retomada

Outra vantagem fundamental do Agent Framework Workflow sobre a abstração da Team AutoGen é o suporte interno para o ponto de verificação e a retomada da execução. Isso permite que os fluxos de trabalho sejam pausados, persistentes e retomados posteriormente de qualquer ponto de verificação, fornecendo tolerância a falhas e habilitando fluxos de trabalho de execução longa ou assíncrona.

Limitações de AutoGen

A abstração do Team AutoGen não fornece funcionalidades internas de ponto de verificação. Qualquer mecanismo de persistência ou recuperação deve ser implementado externamente, muitas vezes exigindo lógica complexa de gerenciamento de estado e serialização.

Ponto de Verificação da Estrutura do Agente

O Agent Framework fornece um ponto de verificação FileCheckpointStorage abrangente e o with_checkpointing() método em WorkflowBuilder. Captura de pontos de verificação:

  • Estado do executor: estado local para cada executor usando ctx.set_executor_state()
  • Estado compartilhado: estado entre executores usando ctx.set_shared_state()
  • Filas de mensagens: Mensagens pendentes entre executores
  • Posição do fluxo de trabalho: Progresso da execução atual e próximas etapas
from agent_framework import (
    FileCheckpointStorage, WorkflowBuilder, WorkflowContext,
    Executor, handler
)
from typing_extensions import Never

class ProcessingExecutor(Executor):
    @handler
    async def process(self, data: str, ctx: WorkflowContext[str]) -> None:
        # Process the data
        result = f"Processed: {data.upper()}"
        print(f"Processing: '{data}' -> '{result}'")

        # Persist executor-local state
        prev_state = await ctx.get_executor_state() or {}
        count = prev_state.get("count", 0) + 1
        await ctx.set_executor_state({
            "count": count,
            "last_input": data,
            "last_output": result
        })

        # Persist shared state for other executors
        await ctx.set_shared_state("original_input", data)
        await ctx.set_shared_state("processed_output", result)

        await ctx.send_message(result)

class FinalizeExecutor(Executor):
    @handler
    async def finalize(self, data: str, ctx: WorkflowContext[Never, str]) -> None:
        result = f"Final: {data}"
        await ctx.yield_output(result)

# Configure checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
processing_executor = ProcessingExecutor(id="processing")
finalize_executor = FinalizeExecutor(id="finalize")

# Build workflow with checkpointing enabled
workflow = (WorkflowBuilder()
           .add_edge(processing_executor, finalize_executor)
           .set_start_executor(processing_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
           .build())

# Example usage (would be in async context)
async def checkpoint_example():
    # Run workflow - checkpoints are created automatically
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

Retomada de Pontos de Verificação

O Agent Framework fornece APIs para listar, inspecionar e retomar de pontos de verificação específicos:

from typing_extensions import Never

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowContext,
    WorkflowBuilder,
    get_checkpoint_summary,
    handler,
)

class UpperCaseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[str]) -> None:
        result = text.upper()
        await ctx.send_message(result)

class ReverseExecutor(Executor):
    @handler
    async def process(self, text: str, ctx: WorkflowContext[Never, str]) -> None:
        result = text[::-1]
        await ctx.yield_output(result)

def create_workflow(checkpoint_storage: FileCheckpointStorage):
    """Create a workflow with two executors and checkpointing."""
    upper_executor = UpperCaseExecutor(id="upper")
    reverse_executor = ReverseExecutor(id="reverse")

    return (WorkflowBuilder()
           .add_edge(upper_executor, reverse_executor)
           .set_start_executor(upper_executor)
           .with_checkpointing(checkpoint_storage=checkpoint_storage)
           .build())

# Assume we have checkpoint_storage from previous examples
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

async def checkpoint_resume_example():
    # List available checkpoints
    checkpoints = await checkpoint_storage.list_checkpoints()

    # Display checkpoint information
    for checkpoint in checkpoints:
        summary = get_checkpoint_summary(checkpoint)
        print(f"Checkpoint {summary.checkpoint_id}: iteration={summary.iteration_count}")

    # Resume from a specific checkpoint
    if checkpoints:
        chosen_checkpoint_id = checkpoints[0].checkpoint_id

        # Create new workflow instance and resume
        new_workflow = create_workflow(checkpoint_storage)
        async for event in new_workflow.run_stream(
            checkpoint_id=chosen_checkpoint_id,
            checkpoint_storage=checkpoint_storage
        ):
            print(f"Resumed event: {event}")

Recursos avançados de ponto de verificação

Ponto de verificação com integração humana no loop:

O ponto de verificação funciona perfeitamente com fluxos de trabalho humanos no loop, permitindo que os fluxos de trabalho sejam pausados para entrada humana e retomados posteriormente. Ao retomar de um ponto de verificação que contém solicitações pendentes, essas solicitações serão remetidas como eventos:

# Assume we have workflow, checkpoint_id, and checkpoint_storage from previous examples
async def resume_with_pending_requests_example():
    # Resume from checkpoint - pending requests will be re-emitted
    request_info_events = []
    async for event in workflow.run_stream(
        checkpoint_id=checkpoint_id,
        checkpoint_storage=checkpoint_storage
    ):
        if isinstance(event, RequestInfoEvent):
            request_info_events.append(event)

    # Handle re-emitted pending request
    responses = {}
    for event in request_info_events:
        response = handle_request(event.data)
        responses[event.request_id] = response

    # Send response back to workflow
    async for event in workflow.send_responses_streaming(responses):
        print(f"Event: {event}")

Principais benefícios

Em comparação com o AutoGen, o ponto de verificação do Agent Framework fornece:

  • Persistência automática: nenhum gerenciamento de estado manual necessário
  • Recuperação granular: retomar de qualquer limite de superespasagem
  • Isolamento de estado: separar o estado local e compartilhado do executor
  • Integração humana no loop: pausa-retomada perfeita com entrada humana
  • Tolerância a falhas: recuperação robusta de falhas ou interrupções

Exemplos práticos

Para obter exemplos abrangentes de ponto de verificação, consulte:


Observability

O AutoGen e o Agent Framework fornecem recursos de observabilidade, mas com diferentes abordagens e recursos.

AutoGen Observabilidade

O AutoGen tem suporte nativo para OpenTelemetry com instrumentação para:

  • Rastreamento de runtime: SingleThreadedAgentRuntime e GrpcWorkerAgentRuntime
  • Execução da ferramenta: BaseTool com execute_tool intervalos seguindo convenções semânticas do GenAI
  • Operações do agente: BaseChatAgent com create_agent e invoke_agent intervalos
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from autogen_core import SingleThreadedAgentRuntime

# Configure OpenTelemetry
tracer_provider = TracerProvider()
trace.set_tracer_provider(tracer_provider)

# Pass to runtime
runtime = SingleThreadedAgentRuntime(tracer_provider=tracer_provider)

Observabilidade da Estrutura do Agente

O Agent Framework fornece uma observabilidade abrangente por meio de várias abordagens:

  • Configuração de código zero: instrumentação automática por meio de variáveis de ambiente
  • Configuração manual: configuração programática com parâmetros personalizados
  • Telemetria avançada: agentes, fluxos de trabalho e acompanhamento de execução de ferramentas
  • Saída do console: registro em log e visualização do console interno
from agent_framework import ChatAgent
from agent_framework.observability import setup_observability
from agent_framework.openai import OpenAIChatClient

# Zero-code setup via environment variables
# Set ENABLE_OTEL=true
# Set OTLP_ENDPOINT=http://localhost:4317

# Or manual setup
setup_observability(
    otlp_endpoint="http://localhost:4317"
)

# Create client for the example
client = OpenAIChatClient(model_id="gpt-5")

async def observability_example():
    # Observability is automatically applied to all agents and workflows
    agent = ChatAgent(name="assistant", chat_client=client)
    result = await agent.run("Hello")  # Automatically traced

Principais diferenças:

  • Complexidade de instalação: o Agent Framework oferece opções mais simples de instalação de código zero
  • Escopo: o Agent Framework fornece uma cobertura mais ampla, incluindo a observabilidade no nível do fluxo de trabalho
  • Visualização: o Agent Framework inclui saída de console interna e interface do usuário de desenvolvimento
  • Configuração: o Agent Framework oferece opções de configuração mais flexíveis

Para obter exemplos detalhados de observabilidade, consulte:


Conclusion

Este guia de migração fornece um mapeamento abrangente entre o AutoGen e o Microsoft Agent Framework, abrangendo tudo, desde a criação básica de agente até fluxos de trabalho complexos de vários agentes. Principais takeaways para migração:

  • A migração de agente único é simples, com APIs semelhantes e funcionalidades aprimoradas no Agent Framework
  • Os padrões de vários agentes exigem repensar sua abordagem de arquiteturas baseadas em fluxo de dados para eventos, mas se você já estiver familiarizado com o GraphFlow, a transição será mais fácil
  • O Agent Framework oferece recursos adicionais, como middleware, ferramentas hospedadas e fluxos de trabalho tipados

Para obter exemplos adicionais e diretrizes de implementação detalhadas, consulte o diretório de exemplos do Agent Framework .

Categorias de exemplo adicionais

A Estrutura do Agente fornece exemplos em várias outras áreas importantes:

Próximas etapas