Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este documento fornece uma visão geral de como usar fluxos de trabalho como agentes no Microsoft Agent Framework.
Visão geral
Às vezes, você criou um fluxo de trabalho sofisticado com vários agentes, executores personalizados e lógica complexa , mas você deseja usá-lo como qualquer outro agente. É exatamente isso que os agentes de fluxo de trabalho permitem que você faça. Ao encapsular seu fluxo de trabalho como um Agent, você pode interagir com ele por meio da mesma API familiar que usaria para um agente de chat simples.
Principais benefícios
- Interface Unificada: interagir com fluxos de trabalho complexos usando a mesma API que agentes simples
- Compatibilidade da API: integrar fluxos de trabalho a sistemas existentes que dão suporte à interface do Agente
- Composabilidade: usar agentes de fluxo de trabalho como blocos de construção em sistemas de agentes maiores ou outros fluxos de trabalho
- Gerenciamento de threads: aproveite os threads do agente para estado de conversa, ponto de verificação e retomada
- Suporte de streaming: obter atualizações em tempo real à medida que o fluxo de trabalho é executado
Como funciona
Quando você converte um fluxo de trabalho em um agente:
- O fluxo de trabalho é validado para garantir que seu executor inicial possa aceitar mensagens de chat
- Um thread é criado para gerenciar o estado da conversa e os pontos de verificação
- As mensagens de entrada são roteadas para o executor inicial do fluxo de trabalho
- Eventos de fluxo de trabalho são convertidos em atualizações de resposta do agente
- As solicitações de entrada externas (de
RequestInfoExecutor) são exibidas como chamadas de função
Requirements
Para usar um fluxo de trabalho como um agente, o executor inicial do fluxo de trabalho deve ser capaz de lidar IEnumerable<ChatMessage> como entrada. Isso é atendido automaticamente ao usar ChatClientAgent ou outros executores baseados em agente.
Criando um agente de fluxo de trabalho
Use o AsAgent() método de extensão para converter qualquer fluxo de trabalho compatível em um agente:
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI;
using Microsoft.Extensions.AI;
// First, build your workflow
var workflow = AgentWorkflowBuilder
.CreateSequentialPipeline(researchAgent, writerAgent, reviewerAgent)
.Build();
// Convert the workflow to an agent
AIAgent workflowAgent = workflow.AsAgent(
id: "content-pipeline",
name: "Content Pipeline Agent",
description: "A multi-agent workflow that researches, writes, and reviews content"
);
AsAgent Parameters
| Parâmetro | Tipo | Description |
|---|---|---|
id |
string? |
Identificador exclusivo opcional para o agente. Gerado automaticamente se não for fornecido. |
name |
string? |
Nome de exibição opcional para o agente. |
description |
string? |
Descrição opcional da finalidade do agente. |
checkpointManager |
CheckpointManager? |
Gerenciador de ponto de verificação opcional para persistência entre sessões. |
executionEnvironment |
IWorkflowExecutionEnvironment? |
Ambiente de execução opcional. Define-se como padrão InProcessExecution.OffThread ou InProcessExecution.Concurrent com base na configuração do fluxo de trabalho. |
Usando agentes de fluxo de trabalho
Criando um thread
Cada conversa com um agente de fluxo de trabalho requer um thread para gerenciar o estado:
// Create a new thread for the conversation
AgentThread thread = workflowAgent.GetNewThread();
Execução sem streaming
Para casos de uso simples em que você deseja a resposta completa:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
AgentRunResponse response = await workflowAgent.RunAsync(messages, thread);
foreach (ChatMessage message in response.Messages)
{
Console.WriteLine($"{message.AuthorName}: {message.Text}");
}
Execução de streaming
Para atualizações em tempo real à medida que o fluxo de trabalho é executado:
var messages = new List<ChatMessage>
{
new(ChatRole.User, "Write an article about renewable energy trends in 2025")
};
await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
// Process streaming updates from each agent in the workflow
if (!string.IsNullOrEmpty(update.Text))
{
Console.Write(update.Text);
}
}
Lidando com solicitações de entrada externas
Quando um fluxo de trabalho contém executores que solicitam entrada externa (usando RequestInfoExecutor), essas solicitações são exibidas como chamadas de função na resposta do agente:
await foreach (AgentRunResponseUpdate update in workflowAgent.RunStreamingAsync(messages, thread))
{
// Check for function call requests
foreach (AIContent content in update.Contents)
{
if (content is FunctionCallContent functionCall)
{
// Handle the external input request
Console.WriteLine($"Workflow requests input: {functionCall.Name}");
Console.WriteLine($"Request data: {functionCall.Arguments}");
// Provide the response in the next message
}
}
}
Serialização e retomada de threads
Os threads do agente de fluxo de trabalho podem ser serializados para persistência e retomados posteriormente:
// Serialize the thread state
JsonElement serializedThread = thread.Serialize();
// Store serializedThread to your persistence layer...
// Later, resume the thread
AgentThread resumedThread = workflowAgent.DeserializeThread(serializedThread);
// Continue the conversation
await foreach (var update in workflowAgent.RunStreamingAsync(newMessages, resumedThread))
{
Console.Write(update.Text);
}
Ponto de verificação com agentes de fluxo de trabalho
Habilite o ponto de verificação para persistir o estado do fluxo de trabalho entre as reinicializações do processo:
// Create a checkpoint manager with your storage backend
var checkpointManager = new CheckpointManager(new FileCheckpointStorage("./checkpoints"));
// Create workflow agent with checkpointing enabled
AIAgent workflowAgent = workflow.AsAgent(
id: "persistent-workflow",
name: "Persistent Workflow Agent",
checkpointManager: checkpointManager
);
Requirements
Para usar um fluxo de trabalho como um agente, o executor inicial do fluxo de trabalho deve ser capaz de lidar list[ChatMessage] como entrada. Isso é atendido automaticamente ao usar ChatAgent ou AgentExecutor.
Criando um agente de fluxo de trabalho
Chame as_agent() em qualquer fluxo de trabalho compatível para convertê-lo em um agente:
from agent_framework import WorkflowBuilder, ChatAgent
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
# Create your chat client and agents
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
researcher = ChatAgent(
name="Researcher",
instructions="Research and gather information on the given topic.",
chat_client=chat_client,
)
writer = ChatAgent(
name="Writer",
instructions="Write clear, engaging content based on research.",
chat_client=chat_client,
)
# Build your workflow
workflow = (
WorkflowBuilder()
.set_start_executor(researcher)
.add_edge(researcher, writer)
.build()
)
# Convert the workflow to an agent
workflow_agent = workflow.as_agent(name="Content Pipeline Agent")
Parâmetros de as_agent
| Parâmetro | Tipo | Description |
|---|---|---|
name |
str | None |
Nome de exibição opcional para o agente. Gerado automaticamente se não for fornecido. |
Usando agentes de fluxo de trabalho
Criando um thread
Cada conversa com um agente de fluxo de trabalho requer um thread para gerenciar o estado:
# Create a new thread for the conversation
thread = workflow_agent.get_new_thread()
Execução sem streaming
Para casos de uso simples em que você deseja a resposta completa:
from agent_framework import ChatMessage, Role
messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]
response = await workflow_agent.run(messages, thread=thread)
for message in response.messages:
print(f"{message.author_name}: {message.text}")
Execução de streaming
Para atualizações em tempo real à medida que o fluxo de trabalho é executado:
messages = [ChatMessage(role=Role.USER, content="Write an article about AI trends")]
async for update in workflow_agent.run_stream(messages, thread=thread):
# Process streaming updates from each agent in the workflow
if update.text:
print(update.text, end="", flush=True)
Lidando com solicitações de entrada externas
Quando um fluxo de trabalho contém executores que solicitam entrada externa (usando RequestInfoExecutor), essas solicitações são exibidas como chamadas de função. O agente de fluxo de trabalho rastreia solicitações pendentes e espera respostas antes de continuar:
from agent_framework import (
FunctionCallContent,
FunctionApprovalRequestContent,
FunctionApprovalResponseContent,
)
async for update in workflow_agent.run_stream(messages, thread=thread):
for content in update.contents:
if isinstance(content, FunctionApprovalRequestContent):
# The workflow is requesting external input
request_id = content.id
function_call = content.function_call
print(f"Workflow requests input: {function_call.name}")
print(f"Request data: {function_call.arguments}")
# Store the request_id to provide a response later
# Check for pending requests
if workflow_agent.pending_requests:
print(f"Pending requests: {list(workflow_agent.pending_requests.keys())}")
Fornecendo respostas a solicitações pendentes
Para continuar a execução do fluxo de trabalho após uma solicitação de entrada externa:
# Create a response for the pending request
response_content = FunctionApprovalResponseContent(
id=request_id,
function_call=function_call,
approved=True,
)
response_message = ChatMessage(
role=Role.USER,
contents=[response_content],
)
# Continue the workflow with the response
async for update in workflow_agent.run_stream([response_message], thread=thread):
if update.text:
print(update.text, end="", flush=True)
Exemplo completo
Aqui está um exemplo completo demonstrando um agente de fluxo de trabalho com saída de streaming:
import asyncio
from agent_framework import (
ChatAgent,
ChatMessage,
Role,
)
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework._workflows import SequentialBuilder
from azure.identity import AzureCliCredential
async def main():
# Set up the chat client
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
# Create specialized agents
researcher = ChatAgent(
name="Researcher",
instructions="Research the given topic and provide key facts.",
chat_client=chat_client,
)
writer = ChatAgent(
name="Writer",
instructions="Write engaging content based on the research provided.",
chat_client=chat_client,
)
reviewer = ChatAgent(
name="Reviewer",
instructions="Review the content and provide a final polished version.",
chat_client=chat_client,
)
# Build a sequential workflow
workflow = (
SequentialBuilder()
.add_agents([researcher, writer, reviewer])
.build()
)
# Convert to a workflow agent
workflow_agent = workflow.as_agent(name="Content Creation Pipeline")
# Create a thread and run the workflow
thread = workflow_agent.get_new_thread()
messages = [ChatMessage(role=Role.USER, content="Write about quantum computing")]
print("Starting workflow...")
print("=" * 60)
current_author = None
async for update in workflow_agent.run_stream(messages, thread=thread):
# Show when different agents are responding
if update.author_name and update.author_name != current_author:
if current_author:
print("\n" + "-" * 40)
print(f"\n[{update.author_name}]:")
current_author = update.author_name
if update.text:
print(update.text, end="", flush=True)
print("\n" + "=" * 60)
print("Workflow completed!")
if __name__ == "__main__":
asyncio.run(main())
Noções básicas sobre a conversão de eventos
Quando um fluxo de trabalho é executado como um agente, os eventos de fluxo de trabalho são convertidos em respostas do agente. O tipo de resposta depende de qual método você usa:
-
run(): retorna umAgentRunResponseque contém o resultado completo após a conclusão do fluxo de trabalho -
run_stream(): produzAgentRunResponseUpdateobjetos à medida que o fluxo de trabalho é executado, fornecendo atualizações em tempo real
Durante a execução, os eventos de fluxo de trabalho internos são mapeados para respostas do agente da seguinte maneira:
| Evento de fluxo de trabalho | Resposta do agente |
|---|---|
AgentRunUpdateEvent |
Passado como AgentRunResponseUpdate (streaming) ou agregado em AgentRunResponse (não streaming) |
RequestInfoEvent |
Convertido em FunctionCallContent e FunctionApprovalRequestContent |
| Outros eventos | Incluído em raw_representation para observabilidade |
Essa conversão permite que você use a interface do agente padrão enquanto ainda tem acesso a informações detalhadas de fluxo de trabalho quando necessário.
Casos de uso
Pipelines de Agente Complexos
Encapsular um fluxo de trabalho de vários agentes como um único agente para uso em aplicativos:
User Request --> [Workflow Agent] --> Final Response
|
+-- Researcher Agent
+-- Writer Agent
+-- Reviewer Agent
2. Composição do agente
Use agentes de fluxo de trabalho como componentes em sistemas maiores:
- Um agente de fluxo de trabalho pode ser usado como uma ferramenta por outro agente
- Vários agentes de fluxo de trabalho podem ser orquestrados juntos
- Os agentes de fluxo de trabalho podem ser aninhados em outros fluxos de trabalho
3. Integração de API
Exponha fluxos de trabalho complexos por meio de APIs que esperam a interface padrão do Agente, permitindo:
- Interfaces de chat que usam fluxos de trabalho de back-end sofisticados
- Integração com sistemas existentes baseados em agente
- Migração gradual de agentes simples para fluxos de trabalho complexos
Próximas etapas
- Saiba como lidar com solicitações e respostas em fluxos de trabalho
- Saiba como gerenciar o estado em fluxos de trabalho
- Saiba como criar pontos de verificação e retomar a partir deles
- Saiba como monitorar fluxos de trabalho
- Saiba mais sobre o isolamento de estado em fluxos de trabalho
- Saiba como visualizar fluxos de trabalho