Partilhar via


Fluxos de Trabalho do Microsoft Agent Framework - Utilização de Fluxos de Trabalho como Agentes

Este documento fornece uma visão geral de como usar fluxos de trabalho como agentes no Microsoft Agent Framework.

Visão geral

Por vezes, você construiu um fluxo de trabalho sofisticado com múltiplos agentes, executores personalizados e lógica complexa, mas quer usá-lo como qualquer outro agente. É exatamente isso que os agentes de workflow permitem fazer. Ao envolver o seu fluxo de trabalho como um Agent, pode interagir com ele através da mesma API familiar que usaria para um simples agente de chat.

Principais Benefícios

  • Interface Unificada: Interagir com fluxos de trabalho complexos usando a mesma API que agentes simples
  • Compatibilidade de API: Integrar fluxos de trabalho com sistemas existentes que suportem a interface do Agente
  • Componibilidade: Utilize agentes de workflow como blocos de construção em sistemas de agentes maiores ou outros fluxos de trabalho
  • Gestão de Threads: Utilize as threads dos agentes para o estado da conversa, verificação de ponto de controle e retoma
  • Suporte a Streaming: Receba atualizações em tempo real à medida que o fluxo de trabalho é executado

Como funciona

Quando convertes um fluxo de trabalho para agente:

  1. O fluxo de trabalho é validado para garantir que o executor de início pode aceitar mensagens de chat
  2. Cria-se uma thread para gerir o estado da conversa e os pontos de verificação
  3. As mensagens de entrada são encaminhadas para o executor inicial do fluxo de trabalho
  4. Os eventos do fluxo de trabalho são convertidos em atualizações de resposta do agente
  5. Pedidos de entrada externos (de RequestInfoExecutor) são apresentados como chamadas de função

Requerimentos

Para usar um fluxo de trabalho como agente, o executor inicial do fluxo de trabalho deve ser capaz de tratar IEnumerable<ChatMessage> como entrada. Isto é automaticamente satisfeito ao utilizar ChatClientAgent ou outros executores baseados em agentes.

Criando um agente de fluxo de trabalho

Use o método de AsAgent() extensão para converter qualquer fluxo de trabalho compatível num agente:

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;

// First, build your workflow
var workflow = AgentWorkflowBuilder
    .CreateSequentialPipeline(researchAgent, writerAgent, reviewerAgent)
    .Build();

// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAgent(
    id: "content-pipeline",
    name: "Content Pipeline Agent",
    description: "A multi-agent workflow that researches, writes, and reviews content"
);

Parâmetros AsAgent

Parâmetro Tipo Description
id string? Identificador único opcional para o agente. Gerado automaticamente se não for fornecido.
name string? Nome de exibição opcional para o agente.
description string? Descrição opcional do propósito do agente.
checkpointManager CheckpointManager? Gestor opcional de checkpoints para persistência entre sessões.
executionEnvironment IWorkflowExecutionEnvironment? Ambiente de execução opcional. Por defeito, InProcessExecution.OffThread ou InProcessExecution.Concurrent com base na configuração do fluxo de trabalho.

Utilização de Agentes de Fluxo de Trabalho

Criação de um Tópico

Cada conversa com um agente de workflow requer um thread para gerir o estado.

// Create a new thread for the conversation
AgentThread thread = workflowAgent.GetNewThread();

Execução Não-Streaming

Para casos de uso simples onde quer a resposta completa:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

AgentRunResponse response = await workflowAgent.RunAsync(messages, thread);

foreach (ChatMessage message in response.Messages)
{
    Console.WriteLine($"{message.AuthorName}: {message.Text}");
}

Execução em Streaming

Para atualizações em tempo real à medida que o fluxo de trabalho é executado:

var messages = new List<ChatMessage>
{
    new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Process streaming updates from each agent in the workflow
    if (!string.IsNullOrEmpty(update.Text))
    {
        Console.Write(update.Text);
    }
}

Gestão de Pedidos de Entrada Externos

Quando um fluxo de trabalho contém executores que solicitam entrada externa (usando RequestInfoExecutor), estes pedidos são apresentados como chamadas de função na resposta do agente:

await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
    // Check for function call requests
    foreach (AIContent content in update.Contents)
    {
        if (content is FunctionCallContent functionCall)
        {
            // Handle the external input request
            Console.WriteLine($"Workflow requests input: {functionCall.Name}");
            Console.WriteLine($"Request data: {functionCall.Arguments}");

            // Provide the response in the next message
        }
    }
}

Serialização e Retomada de Threads

As threads de agentes de workflow podem ser serializadas para persistência e retomadas posteriormente:

// Serialize the thread state
JsonElement serializedThread = thread.Serialize();

// Store serializedThread to your persistence layer...

// Later, resume the thread
AgentThread resumedThread = workflowAgent.DeserializeThread(serializedThread);

// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedThread))
{
    Console.Write(update.Text);
}

Checkpointing com Agentes de Workflow

Ativar o checkpointing para manter o estado de workflow entre reinícios de processos.

// Create a checkpoint manager with your storage backend
var checkpointManager = new CheckpointManager(new FileCheckpointStorage("./checkpoints"));

// Create workflow agent with checkpointing enabled
AIAgent workflowAgent = workflow.AsAgent(
    id: "persistent-workflow",
    name: "Persistent Workflow Agent",
    checkpointManager: checkpointManager
);

Requerimentos

Para usar um fluxo de trabalho como agente, o executor inicial do fluxo de trabalho deve ser capaz de tratar list[ChatMessage] como entrada. Isto é automaticamente satisfeito ao usar ChatAgent ou AgentExecutor.

Criando um agente de fluxo de trabalho

Recorra as_agent() a qualquer fluxo de trabalho compatível para o converter num agente:

from agent_framework import WorkflowBuilder, ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

# Create your chat client and agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

researcher = ChatAgent(
    name="Researcher",
    instructions="Research and gather information on the given topic.",
    chat_client=chat_client,
)

writer = ChatAgent(
    name="Writer", 
    instructions="Write clear, engaging content based on research.",
    chat_client=chat_client,
)

# Build your workflow
workflow = (
    WorkflowBuilder()
    .set_start_executor(researcher)
    .add_edge(researcher, writer)
    .build()
)

# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")

as_agent Parâmetros

Parâmetro Tipo Description
name str | None Nome de exibição opcional para o agente. Gerado automaticamente se não for fornecido.

Utilização de Agentes de Fluxo de Trabalho

Criação de um Tópico

Cada conversação com um agente de workflow requer um thread para gerir o estado.

# Create a new thread for the conversation
thread = workflow_agent.get_new_thread()

Execução Não-Streaming

Para casos de uso simples onde quer a resposta completa:

from agent_framework import ChatMessage, Role

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

response = await workflow_agent.run(messages, thread=thread)

for message in response.messages:
    print(f"{message.author_name}: {message.text}")

Execução em Streaming

Para atualizações em tempo real à medida que o fluxo de trabalho é executado:

messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]

async for update in workflow_agent.run_stream(messages, thread=thread):
    # Process streaming updates from each agent in the workflow
    if update.text:
        print(update.text, end="", flush=True)

Gestão de Pedidos de Entrada Externos

Quando um fluxo de trabalho contém executores que solicitam entrada externa (usando RequestInfoExecutor), estes pedidos são apresentados como chamadas de função. O agente de workflow acompanha pedidos pendentes e espera respostas antes de continuar:

from agent_framework import (
    FunctionCallContent,
    FunctionApprovalRequestContent,
    FunctionApprovalResponseContent,
)

async for update in workflow_agent.run_stream(messages, thread=thread):
    for content in update.contents:
        if isinstance(content, FunctionApprovalRequestContent):
            # The workflow is requesting external input
            request_id = content.id
            function_call = content.function_call

            print(f"Workflow requests input: {function_call.name}")
            print(f"Request data: {function_call.arguments}")

            # Store the request_id to provide a response later

# Check for pending requests
if workflow_agent.pending_requests:
    print(f"Pending requests: {list(workflow_agent.pending_requests.keys())}")

Fornecer Respostas a Pedidos Pendentes

Para continuar a execução do fluxo de trabalho após um pedido de entrada externo:

# Create a response for the pending request
response_content = FunctionApprovalResponseContent(
    id=request_id,
    function_call=function_call,
    approved=True,
)

response_message = ChatMessage(
    role=Role.USER,
    contents=[response_content],
)

# Continue the workflow with the response
async for update in workflow_agent.run_stream([response_message], thread=thread):
    if update.text:
        print(update.text, end="", flush=True)

Exemplo completo

Aqui está um exemplo completo que demonstra um agente de workflow com saída de streaming:

import asyncio
from agent_framework import (
    ChatAgent,
    ChatMessage,
    Role,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import SequentialBuilder
from azure.identity import AzureCliCredential


async def main():
    # Set up the chat client
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())

    # Create specialized agents
    researcher = ChatAgent(
        name="Researcher",
        instructions="Research the given topic and provide key facts.",
        chat_client=chat_client,
    )

    writer = ChatAgent(
        name="Writer",
        instructions="Write engaging content based on the research provided.",
        chat_client=chat_client,
    )

    reviewer = ChatAgent(
        name="Reviewer",
        instructions="Review the content and provide a final polished version.",
        chat_client=chat_client,
    )

    # Build a sequential workflow
    workflow = (
        SequentialBuilder()
        .add_agents([researcher, writer, reviewer])
        .build()
    )

    # Convert to a workflow agent
    workflow_agent = workflow.as_agent(name="Content Creation Pipeline")

    # Create a thread and run the workflow
    thread = workflow_agent.get_new_thread()
    messages = [ChatMessage(role=Role.USER, content="Write about quantum computing")]

    print("Starting workflow...")
    print("=" * 60)

    current_author = None
    async for update in workflow_agent.run_stream(messages, thread=thread):
        # Show when different agents are responding
        if update.author_name and update.author_name != current_author:
            if current_author:
                print("\n" + "-" * 40)
            print(f"\n[{update.author_name}]:")
            current_author = update.author_name

        if update.text:
            print(update.text, end="", flush=True)

    print("\n" + "=" * 60)
    print("Workflow completed!")


if __name__ == "__main__":
    asyncio.run(main())

Compreender a Conversão de Eventos

Quando um fluxo de trabalho é executado como agente, os eventos do fluxo de trabalho são convertidos em respostas de agentes. O tipo de resposta depende do método que utiliza:

  • run(): Devolve um AgentRunResponse contendo o resultado completo após o fim do fluxo de trabalho
  • run_stream(): Gera AgentRunResponseUpdate objetos à medida que o fluxo de trabalho é executado, fornecendo atualizações em tempo real

Durante a execução, os eventos internos do fluxo de trabalho são mapeados para as respostas dos agentes da seguinte forma:

Evento de Fluxo de Trabalho Resposta do Agente
AgentRunUpdateEvent Transmitido como AgentRunResponseUpdate (streaming) ou agregado em AgentRunResponse (não-streaming)
RequestInfoEvent Convertido em FunctionCallContent e FunctionApprovalRequestContent
Outros eventos Incluído em raw_representation para observabilidade

Esta conversão permite-lhe usar a interface padrão do agente, mantendo ainda assim acesso a informações detalhadas do fluxo de trabalho quando necessário.

Casos de uso

1. Pipelines de Agentes Complexos

Envolva um fluxo de trabalho multi-agente como um único agente para uso em aplicações:

User Request --> [Workflow Agent] --> Final Response
                      |
                      +-- Researcher Agent
                      +-- Writer Agent  
                      +-- Reviewer Agent

2. Composição do Agente

Use agentes de fluxo de trabalho como componentes em sistemas maiores:

  • Um agente de workflow pode ser usado como ferramenta por outro agente
  • Múltiplos agentes de fluxo de trabalho podem ser orquestrados em conjunto
  • Os agentes de workflow podem ser aninhados dentro de outros fluxos de trabalho

3. Integração de API

Expor fluxos de trabalho complexos através de APIs que esperam a interface padrão do Agente, possibilitando:

  • Interfaces de chat que utilizam fluxos de trabalho sofisticados no backend
  • Integração com sistemas baseados em agentes existentes
  • Migração gradual de agentes simples para fluxos de trabalho complexos

Próximas Etapas