Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este tutorial demonstra como lidar com solicitações e respostas em fluxos de trabalho usando fluxos de trabalho do Agent Framework. Você aprenderá a criar fluxos de trabalho interativos que podem pausar a execução para solicitar entrada de fontes externas (como humanos ou outros sistemas) e, em seguida, retomar quando uma resposta for fornecida.
Conceitos abordados
No .NET, fluxos de trabalho com intervenção humana utilizam RequestPort e o processamento de solicitações externas para suspender a execução e coletar entradas dos usuários. Esse padrão permite fluxos de trabalho interativos em que o sistema pode solicitar informações de fontes externas durante a execução.
Pré-requisitos
- SDK do .NET 8.0 ou posterior.
- Ponto de extremidade de serviço e implantação do Azure OpenAI configurado.
- CLI do Azure instalada e autenticada (para autenticação de credenciais do Azure).
- Compreensão básica de C# e programação assíncrona.
- Um novo aplicativo de console.
Instalar os pacotes NuGet
Primeiro, instale os pacotes necessários para seu projeto do .NET:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
Componentes principais
RequestPort e Solicitações Externas
Um RequestPort atua como uma ponte entre o fluxo de trabalho e fontes de entrada externas. Quando o fluxo de trabalho precisa de entrada, ele gera um RequestInfoEvent que seu aplicativo manipula:
// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Tipos de sinal
Defina tipos de sinal para comunicar diferentes tipos de solicitação:
/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
Init, // Initial guess request
Above, // Previous guess was too high
Below, // Previous guess was too low
}
Executor de fluxo de trabalho
Crie executores que processam a entrada do usuário e forneçam comentários:
/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
_targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
{
_tries++;
if (message == _targetNumber)
{
await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
.ConfigureAwait(false);
}
else if (message < _targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
}
}
}
Compilando o fluxo de trabalho
Conecte o RequestPort e o executor em um loop de feedback.
internal static class WorkflowHelper
{
internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
{
// Create the executors
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
JudgeExecutor judgeExecutor = new(42);
// Build the workflow by connecting executors in a loop
return new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.BuildAsync<NumberSignal>();
}
}
Executando o fluxo de trabalho interativo
Manipular solicitações externas durante a execução do fluxo de trabalho:
private static async Task Main()
{
// Create the workflow
var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);
// Execute the workflow
await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle human input request from the workflow
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
}
Tratamento de solicitações
Processar diferentes tipos de solicitações de entrada:
private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
switch (request.DataAs<NumberSignal?>())
{
case NumberSignal.Init:
int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
return request.CreateResponse(initialGuess);
case NumberSignal.Above:
int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
return request.CreateResponse(lowerGuess);
case NumberSignal.Below:
int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
return request.CreateResponse(higherGuess);
default:
throw new ArgumentException("Unexpected request type.");
}
}
private static int ReadIntegerFromConsole(string prompt)
{
while (true)
{
Console.Write(prompt);
string? input = Console.ReadLine();
if (int.TryParse(input, out int value))
{
return value;
}
Console.WriteLine("Invalid input. Please enter a valid integer.");
}
}
Conceitos de implementação
Fluxo RequestInfoEvent
- Execução do fluxo de trabalho: o fluxo de trabalho é processado até precisar de entrada externa
-
Geração de solicitação: RequestPort gera um
RequestInfoEventcom os detalhes da solicitação - Tratamento Externo: seu aplicativo captura o evento e coleta a entrada do usuário
-
Envio de resposta: enviar um
ExternalResponsede volta para continuar o fluxo de trabalho - Retomada do fluxo de trabalho: o fluxo de trabalho continua o processamento com a entrada fornecida
Ciclo de vida do fluxo de trabalho
-
Execução de streaming: use
StreamAsyncpara monitorar eventos em tempo real -
Tratamento de eventos: processo
RequestInfoEventpara solicitações de entrada eWorkflowOutputEventpara conclusão - Coordenação de Resposta: Corresponder respostas a solicitações usando o mecanismo de tratamento de resposta do fluxo de trabalho
Fluxo de Implementação
Inicialização do fluxo de trabalho: o fluxo de trabalho começa enviando um
NumberSignal.Initpara o RequestPort.Geração de solicitação: o RequestPort gera uma
RequestInfoEventsolicitação de um palpite inicial do usuário.Pausa de fluxo de trabalho: o fluxo de trabalho pausa e aguarda a entrada externa enquanto o aplicativo manipula a solicitação.
Resposta humana: o aplicativo externo coleta a entrada do usuário e envia um
ExternalResponsede volta para o fluxo de trabalho.Processamento e Comentários: O
JudgeExecutorprocessa o palpite e conclui o processo ou envia um novo sinal (Acima/Abaixo) para solicitar outro palpite.Continuação de loop: o processo é repetido até que o número correto seja adivinhado.
Benefícios do Framework
- Segurança de Tipo: digitação forte garante que os contratos de solicitação-resposta sejam mantidos
- Orientado a eventos: O sistema de eventos rico oferece visibilidade da execução de fluxos de trabalho
- Execução pausada: os fluxos de trabalho podem pausar indefinidamente enquanto aguardam a entrada externa
- Gerenciamento de Estado: o estado do fluxo de trabalho é preservado entre ciclos de pausa e retomada
- Integração flexível: RequestPorts pode se integrar a qualquer fonte de entrada externa (interface do usuário, API, console etc.)
Exemplo completo
Para obter a implementação funcional completa, consulte o Human-in-the-Loop - exemplo básico.
Esse padrão permite a criação de aplicativos interativos sofisticados em que os usuários podem fornecer entrada nos principais pontos de decisão em fluxos de trabalho automatizados.
O que você criará
Você criará um fluxo de trabalho de jogo de adivinhação de número interativo que demonstra padrões de solicitação-resposta:
- Um agente de IA que faz suposições inteligentes
- Executores que podem enviar solicitações diretamente usando a
request_infoAPI - Um gerenciador de turnos que coordena entre o agente e as interações humanas usando
@response_handler - Entrada/saída interativa do console para comentários em tempo real
Pré-requisitos
- Python 3.10 ou posterior
- Implantação do Azure OpenAI configurada
- Autenticação da CLI do Azure configurada (
az login) - Compreensão básica da programação assíncrona do Python
Conceitos Principais
Capacidades de Solicitações-e-Respostas
Os executores têm funções embutidas de solicitações e respostas que permitem interações humanas no circuito.
- Ligue
ctx.request_info(request_data=request_data, response_type=response_type)para enviar solicitações - Usar o
@response_handlerdecorador para lidar com respostas - Definir tipos personalizados de solicitação/resposta sem requisitos de herança
Fluxo de Requisição-Resposta
Os executores podem enviar solicitações diretamente usando ctx.request_info() e manipular respostas usando o @response_handler decorador:
- Chamadas do executor
ctx.request_info(request_data=request_data, response_type=response_type) - O fluxo de trabalho emite um
RequestInfoEventcom os dados da solicitação - O sistema externo (humano, API etc.) processa a solicitação
- A resposta é enviada de volta por meio de
send_responses_streaming() - O fluxo de trabalho é retomado e a resposta é entregue ao método do executor
@response_handler.
Configurando o ambiente
Primeiro, instale os pacotes necessários:
pip install agent-framework-core --pre
pip install azure-identity
Definir modelos de solicitação e resposta
Comece definindo as estruturas de dados para comunicação solicitação-resposta:
import asyncio
from dataclasses import dataclass
from pydantic import BaseModel
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
Executor,
RequestInfoEvent,
Role,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
WorkflowRunState,
WorkflowStatusEvent,
handler,
response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
@dataclass
class HumanFeedbackRequest:
"""Request message for human feedback in the guessing game."""
prompt: str = ""
guess: int | None = None
class GuessOutput(BaseModel):
"""Structured output from the AI agent with response_format enforcement."""
guess: int
É HumanFeedbackRequest uma classe de dados simples para cargas de solicitação estruturadas:
- Digitação forte para conteúdos de solicitação
- Validação de compatibilidade futura
- Esclarecer a semântica de correlação com respostas
- Campos contextuais (como o palpite anterior) para prompts de interface do usuário avançados
Criar o Gerenciador de Turnos
O gerenciador de turnos coordena o fluxo entre o agente de IA e o humano:
class TurnManager(Executor):
"""Coordinates turns between the AI agent and human player.
Responsibilities:
- Start the game by requesting the agent's first guess
- Process agent responses and request human feedback
- Handle human feedback and continue the game or finish
"""
def __init__(self, id: str | None = None):
super().__init__(id=id or "turn_manager")
@handler
async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Start the game by asking the agent for an initial guess."""
user = ChatMessage(Role.USER, text="Start by making your first guess.")
await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))
@handler
async def on_agent_response(
self,
result: AgentExecutorResponse,
ctx: WorkflowContext,
) -> None:
"""Handle the agent's guess and request human guidance."""
# Parse structured model output (defensive default if agent didn't reply)
text = result.agent_run_response.text or ""
last_guess = GuessOutput.model_validate_json(text).guess if text else None
# Craft a clear human prompt that defines higher/lower relative to agent's guess
prompt = (
f"The agent guessed: {last_guess if last_guess is not None else text}. "
"Type one of: higher (your number is higher than this guess), "
"lower (your number is lower than this guess), correct, or exit."
)
# Send a request using the request_info API
await ctx.request_info(
request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
response_type=str
)
@response_handler
async def on_human_feedback(
self,
original_request: HumanFeedbackRequest,
feedback: str,
ctx: WorkflowContext[AgentExecutorRequest, str],
) -> None:
"""Continue the game or finish based on human feedback."""
reply = feedback.strip().lower()
# Use the correlated request's guess to avoid extra state reads
last_guess = original_request.guess
if reply == "correct":
await ctx.yield_output(f"Guessed correctly: {last_guess}")
return
# Provide feedback to the agent for the next guess
user_msg = ChatMessage(
Role.USER,
text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
)
await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))
Criar o fluxo de trabalho
Crie o fluxo de trabalho principal que conecta todos os componentes:
async def main() -> None:
# Create the chat agent with structured output enforcement
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
agent = chat_client.create_agent(
instructions=(
"You guess a number between 1 and 10. "
"If the user says 'higher' or 'lower', adjust your next guess. "
'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
"No explanations or additional text."
),
response_format=GuessOutput,
)
# Create workflow components
turn_manager = TurnManager(id="turn_manager")
agent_exec = AgentExecutor(agent=agent, id="agent")
# Build the workflow graph
workflow = (
WorkflowBuilder()
.set_start_executor(turn_manager)
.add_edge(turn_manager, agent_exec) # Ask agent to make/adjust a guess
.add_edge(agent_exec, turn_manager) # Agent's response goes back to coordinator
.build()
)
# Execute the interactive workflow
await run_interactive_workflow(workflow)
async def run_interactive_workflow(workflow):
"""Run the workflow with human-in-the-loop interaction."""
pending_responses: dict[str, str] | None = None
completed = False
workflow_output: str | None = None
print("🎯 Number Guessing Game")
print("Think of a number between 1 and 10, and I'll try to guess it!")
print("-" * 50)
while not completed:
# First iteration uses run_stream("start")
# Subsequent iterations use send_responses_streaming with pending responses
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("start")
)
# Collect events for this turn
events = [event async for event in stream]
pending_responses = None
# Process events to collect requests and detect completion
requests: list[tuple[str, str]] = [] # (request_id, prompt)
for event in events:
if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
# RequestInfoEvent for our HumanFeedbackRequest
requests.append((event.request_id, event.data.prompt))
elif isinstance(event, WorkflowOutputEvent):
# Capture workflow output when yielded
workflow_output = str(event.data)
completed = True
# Check workflow status
pending_status = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
for e in events
)
idle_with_requests = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
for e in events
)
if pending_status:
print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
if idle_with_requests:
print("⏸️ State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")
# Handle human requests if any
if requests and not completed:
responses: dict[str, str] = {}
for req_id, prompt in requests:
print(f"\n🤖 {prompt}")
answer = input("👤 Enter higher/lower/correct/exit: ").lower()
if answer == "exit":
print("👋 Exiting...")
return
responses[req_id] = answer
pending_responses = responses
# Show final result
print(f"\n🎉 {workflow_output}")
Executando o exemplo
Para obter a implementação funcional completa, consulte o exemplo de Jogo de Adivinhação com Humano no Loop.
Como funciona
Inicialização do fluxo de trabalho: o fluxo de trabalho começa com a
TurnManagersolicitação de uma estimativa inicial do agente de IA.Resposta do agente: o agente de IA faz um palpite e retorna JSON estruturado, que flui de volta para o
TurnManager.Solicitação Humana:
TurnManagerprocessa a suposição do agente e chamactx.request_info()com umHumanFeedbackRequest.Pausa do Fluxo de Trabalho: o fluxo de trabalho emite um
RequestInfoEvente continua até que nenhuma outra ação possa ser tomada, e então aguarda a entrada humana.Resposta humana: o aplicativo externo coleta a entrada humana e envia respostas de volta usando
send_responses_streaming().Retomar e Continuar: O fluxo de trabalho é retomado, o método
TurnManagerdo@response_handlerprocessa o feedback humano e encerra o jogo ou envia outra solicitação ao agente.
Principais benefícios
- Comunicação Estruturada: modelos de solicitação e resposta com segurança de tipo impedem erros de runtime
- Correlação: IDs de solicitação garantem que as respostas sejam correspondidas às solicitações corretas
- Execução pausada: os fluxos de trabalho podem pausar indefinidamente enquanto aguardam a entrada externa
- Preservação do Estado: o estado do fluxo de trabalho é mantido entre ciclos de pausa e retomada
- Orientado a eventos: o sistema de eventos avançado fornece visibilidade sobre o status e as transições do fluxo de trabalho
Esse padrão permite a criação de aplicativos interativos sofisticados em que agentes de IA e humanos colaboram perfeitamente em fluxos de trabalho estruturados.