Compartilhar via


Fluxos de trabalho do Microsoft Agent Framework – Usando fluxos de trabalho como agentes

Este documento fornece uma visão geral de como usar fluxos de trabalho como agentes no Microsoft Agent Framework.

Visão geral

Às vezes, você criou um fluxo de trabalho sofisticado com vários agentes, executores personalizados e lógica complexa , mas você deseja usá-lo como qualquer outro agente. É exatamente isso que os agentes de fluxo de trabalho permitem que você faça. Ao encapsular seu fluxo de trabalho como um Agent, você pode interagir com ele por meio da mesma API familiar que usaria para um agente de chat simples.

Principais benefícios

  • Interface Unificada: interagir com fluxos de trabalho complexos usando a mesma API que agentes simples
  • Compatibilidade da API: integrar fluxos de trabalho a sistemas existentes que dão suporte à interface do Agente
  • Composabilidade: usar agentes de fluxo de trabalho como blocos de construção em sistemas de agentes maiores ou outros fluxos de trabalho
  • Gerenciamento de threads: aproveite os threads do agente para estado de conversa, ponto de verificação e retomada
  • Suporte de streaming: obter atualizações em tempo real à medida que o fluxo de trabalho é executado

Como funciona

Quando você converte um fluxo de trabalho em um agente:

  1. O fluxo de trabalho é validado para garantir que seu executor inicial possa aceitar mensagens de chat
  2. Um thread é criado para gerenciar o estado da conversa e os pontos de verificação
  3. As mensagens de entrada são roteadas para o executor inicial do fluxo de trabalho
  4. Eventos de fluxo de trabalho são convertidos em atualizações de resposta do agente
  5. As solicitações de entrada externas (de RequestInfoExecutor) são exibidas como chamadas de função

Requirements

Para usar um fluxo de trabalho como um agente, o executor inicial do fluxo de trabalho deve ser capaz de lidar IEnumerable<ChatMessage> como entrada. Isso é atendido automaticamente ao usar ChatClientAgent ou outros executores baseados em agente.

Criando um agente de fluxo de trabalho

Use o AsAgent() método de extensão para converter qualquer fluxo de trabalho compatível em um agente:

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

// First, build your workflow
var workflow = AgentWorkflowBuilder
    .CreateSequentialPipeline(researchAgent, writerAgent, reviewerAgent)
    .Build();

// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAgent(
    id: "content-pipeline",
    name: "Content Pipeline Agent",
    description: "A multi-agent workflow that researches, writes, and reviews content"
);

AsAgent Parameters

Parâmetro Tipo Description
id string? Identificador exclusivo opcional para o agente. Gerado automaticamente se não for fornecido.
name string? Nome de exibição opcional para o agente.
description string? Descrição opcional da finalidade do agente.
checkpointManager CheckpointManager? Gerenciador de ponto de verificação opcional para persistência entre sessões.
executionEnvironment IWorkflowExecutionEnvironment? Ambiente de execução opcional. Define-se como padrão InProcessExecution.OffThread ou InProcessExecution.Concurrent com base na configuração do fluxo de trabalho.

Usando agentes de fluxo de trabalho

Criando um thread

Cada conversa com um agente de fluxo de trabalho requer um thread para gerenciar o estado:

// Create a new thread for the conversation
AgentThread thread = workflowAgent.GetNewThread();

Execução sem streaming

Para casos de uso simples em que você deseja a resposta completa:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

AgentRunResponse response = await workflowAgent.RunAsync(messages, thread);

foreach (ChatMessage message in response.Messages)
{
    Console.WriteLine($"{message.AuthorName}: {message.Text}");
}

Execução de streaming

Para atualizações em tempo real à medida que o fluxo de trabalho é executado:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Process streaming updates from each agent in the workflow
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

Lidando com solicitações de entrada externas

Quando um fluxo de trabalho contém executores que solicitam entrada externa (usando RequestInfoExecutor), essas solicitações são exibidas como chamadas de função na resposta do agente:

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Check for function call requests
    foreach (AIContent content in update.Contents)
    {
        if (content is FunctionCallContent functionCall)
        {
            // Handle the external input request
            Console.WriteLine($"Workflow requests input: {functionCall.Name}");
            Console.WriteLine($"Request data: {functionCall.Arguments}");

            // Provide the response in the next message
        }
    }
}

Serialização e retomada de threads

Os threads do agente de fluxo de trabalho podem ser serializados para persistência e retomados posteriormente:

// Serialize the thread state
JsonElement serializedThread = thread.Serialize();

// Store serializedThread to your persistence layer...

// Later, resume the thread
AgentThread resumedThread = workflowAgent.DeserializeThread(serializedThread);

// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedThread))
{
    Console.Write(update.Text);
}

Ponto de verificação com agentes de fluxo de trabalho

Habilite o ponto de verificação para persistir o estado do fluxo de trabalho entre as reinicializações do processo:

// Create a checkpoint manager with your storage backend
var checkpointManager = new CheckpointManager(new FileCheckpointStorage("./checkpoints"));

// Create workflow agent with checkpointing enabled
AIAgent workflowAgent = workflow.AsAgent(
    id: "persistent-workflow",
    name: "Persistent Workflow Agent",
    checkpointManager: checkpointManager
);

Requirements

Para usar um fluxo de trabalho como um agente, o executor inicial do fluxo de trabalho deve ser capaz de lidar list[ChatMessage] como entrada. Isso é atendido automaticamente ao usar ChatAgent ou AgentExecutor.

Criando um agente de fluxo de trabalho

Chame as_agent() em qualquer fluxo de trabalho compatível para convertê-lo em um agente:

from agent_framework import WorkflowBuilder, ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

# Create your chat client and agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

researcher = ChatAgent(
    name="Researcher",
    instructions="Research and gather information on the given topic.",
    chat_client=chat_client,
)

writer = ChatAgent(
    name="Writer", 
    instructions="Write clear, engaging content based on research.",
    chat_client=chat_client,
)

# Build your workflow
workflow = (
    WorkflowBuilder()
    .set_start_executor(researcher)
    .add_edge(researcher, writer)
    .build()
)

# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")

Parâmetros de as_agent

Parâmetro Tipo Description
name str | None Nome de exibição opcional para o agente. Gerado automaticamente se não for fornecido.

Usando agentes de fluxo de trabalho

Criando um thread

Cada conversa com um agente de fluxo de trabalho requer um thread para gerenciar o estado:

# Create a new thread for the conversation
thread = workflow_agent.get_new_thread()

Execução sem streaming

Para casos de uso simples em que você deseja a resposta completa:

from agent_framework import ChatMessage, Role

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

response = await workflow_agent.run(messages, thread=thread)

for message in response.messages:
    print(f"{message.author_name}: {message.text}")

Execução de streaming

Para atualizações em tempo real à medida que o fluxo de trabalho é executado:

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

async for update in workflow_agent.run_stream(messages, thread=thread):
    # Process streaming updates from each agent in the workflow
    if update.text:
        print(update.text, end="", flush=True)

Lidando com solicitações de entrada externas

Quando um fluxo de trabalho contém executores que solicitam entrada externa (usando RequestInfoExecutor), essas solicitações são exibidas como chamadas de função. O agente de fluxo de trabalho rastreia solicitações pendentes e espera respostas antes de continuar:

from agent_framework import (
    FunctionCallContent,
    FunctionApprovalRequestContent,
    FunctionApprovalResponseContent,
)

async for update in workflow_agent.run_stream(messages, thread=thread):
    for content in update.contents:
        if isinstance(content, FunctionApprovalRequestContent):
            # The workflow is requesting external input
            request_id = content.id
            function_call = content.function_call

            print(f"Workflow requests input: {function_call.name}")
            print(f"Request data: {function_call.arguments}")

            # Store the request_id to provide a response later

# Check for pending requests
if workflow_agent.pending_requests:
    print(f"Pending requests: {list(workflow_agent.pending_requests.keys())}")

Fornecendo respostas a solicitações pendentes

Para continuar a execução do fluxo de trabalho após uma solicitação de entrada externa:

# Create a response for the pending request
response_content = FunctionApprovalResponseContent(
    id=request_id,
    function_call=function_call,
    approved=True,
)

response_message = ChatMessage(
    role=Role.USER,
    contents=[response_content],
)

# Continue the workflow with the response
async for update in workflow_agent.run_stream([response_message], thread=thread):
    if update.text:
        print(update.text, end="", flush=True)

Exemplo completo

Aqui está um exemplo completo demonstrando um agente de fluxo de trabalho com saída de streaming:

import asyncio
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Role,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import SequentialBuilder
from azure.identity import AzureCliCredential


async def main():
    # Set up the chat client
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Create specialized agents
    researcher = ChatAgent(
        name="Researcher",
        instructions="Research the given topic and provide key facts.",
        chat_client=chat_client,
    )

    writer = ChatAgent(
        name="Writer",
        instructions="Write engaging content based on the research provided.",
        chat_client=chat_client,
    )

    reviewer = ChatAgent(
        name="Reviewer",
        instructions="Review the content and provide a final polished version.",
        chat_client=chat_client,
    )

    # Build a sequential workflow
    workflow = (
        SequentialBuilder()
        .add_agents([researcher, writer, reviewer])
        .build()
    )

    # Convert to a workflow agent
    workflow_agent = workflow.as_agent(name="Content Creation Pipeline")

    # Create a thread and run the workflow
    thread = workflow_agent.get_new_thread()
    messages = [ChatMessage(role=Role.USER, content="Write about quantum computing")]

    print("Starting workflow...")
    print("=" * 60)

    current_author = None
    async for update in workflow_agent.run_stream(messages, thread=thread):
        # Show when different agents are responding
        if update.author_name and update.author_name != current_author:
            if current_author:
                print("\n" + "-" * 40)
            print(f"\n[{update.author_name}]:")
            current_author = update.author_name

        if update.text:
            print(update.text, end="", flush=True)

    print("\n" + "=" * 60)
    print("Workflow completed!")


if __name__ == "__main__":
    asyncio.run(main())

Noções básicas sobre a conversão de eventos

Quando um fluxo de trabalho é executado como um agente, os eventos de fluxo de trabalho são convertidos em respostas do agente. O tipo de resposta depende de qual método você usa:

  • run(): retorna um AgentRunResponse que contém o resultado completo após a conclusão do fluxo de trabalho
  • run_stream(): produz AgentRunResponseUpdate objetos à medida que o fluxo de trabalho é executado, fornecendo atualizações em tempo real

Durante a execução, os eventos de fluxo de trabalho internos são mapeados para respostas do agente da seguinte maneira:

Evento de fluxo de trabalho Resposta do agente
AgentRunUpdateEvent Passado como AgentRunResponseUpdate (streaming) ou agregado em AgentRunResponse (não streaming)
RequestInfoEvent Convertido em FunctionCallContent e FunctionApprovalRequestContent
Outros eventos Incluído em raw_representation para observabilidade

Essa conversão permite que você use a interface do agente padrão enquanto ainda tem acesso a informações detalhadas de fluxo de trabalho quando necessário.

Casos de uso

Pipelines de Agente Complexos

Encapsular um fluxo de trabalho de vários agentes como um único agente para uso em aplicativos:

User Request --> [Workflow Agent] --> Final Response
                      |
                      +-- Researcher Agent
                      +-- Writer Agent  
                      +-- Reviewer Agent

2. Composição do agente

Use agentes de fluxo de trabalho como componentes em sistemas maiores:

  • Um agente de fluxo de trabalho pode ser usado como uma ferramenta por outro agente
  • Vários agentes de fluxo de trabalho podem ser orquestrados juntos
  • Os agentes de fluxo de trabalho podem ser aninhados em outros fluxos de trabalho

3. Integração de API

Exponha fluxos de trabalho complexos por meio de APIs que esperam a interface padrão do Agente, permitindo:

  • Interfaces de chat que usam fluxos de trabalho de back-end sofisticados
  • Integração com sistemas existentes baseados em agente
  • Migração gradual de agentes simples para fluxos de trabalho complexos

Próximas etapas