Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Esta página fornece uma visão geral de como o tratamento de solicitações e respostas funciona no sistema de fluxo de trabalho do Microsoft Agent Framework.
Visão geral
Executores em um fluxo de trabalho podem enviar solicitações para fora do fluxo de trabalho e aguardar respostas. Isso é útil para cenários em que um executor precisa interagir com sistemas externos, como interações humanas no loop ou qualquer outra operação assíncrona.
Habilitar o tratamento de solicitações e respostas em um fluxo de trabalho
Solicitações e respostas são tratadas por meio de um tipo especial chamado InputPort.
// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");
Adicione a porta de entrada a um fluxo de trabalho.
var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
.AddEdge(inputPort, executorA)
.AddEdge(executorA, inputPort)
.Build<CustomRequestType>();
Agora, porque no fluxo de trabalho temos executorA conectado a inputPort em ambas as direções, executorA precisa ser capaz de enviar solicitações e receber respostas por meio do inputPort. Aqui está o que precisamos fazer SomeExecutor para enviar uma solicitação e receber uma resposta.
internal sealed class SomeExecutor() : Executor<CustomResponseType>("SomeExecutor")
{
public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Como alternativa, SomeExecutor pode separar o envio de solicitações e o tratamento de resposta em dois manipuladores.
internal sealed class SomeExecutor() : Executor("SomeExecutor")
{
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
{
return routeBuilder
.AddHandler<CustomResponseType>(this.HandleCustomResponseAsync)
.AddHandler<OtherDataType>(this.HandleOtherDataAsync);
}
public async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
}
public async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
{
// Process the message...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Os executores podem enviar solicitações usando ctx.request_info() e manipular respostas com @response_handler.
from agent_framework import response_handler, WorkflowBuilder
executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()
executor_a pode enviar solicitações e receber respostas diretamente usando recursos internos.
from agent_framework import (
Executor,
WorkflowContext,
handler,
response_handler,
)
class SomeExecutor(Executor):
@handler
async def handle_data(
self,
data: OtherDataType,
context: WorkflowContext,
):
# Process the message...
...
# Send a request using the API
await context.request_info(
request_data=CustomRequestType(...),
response_type=CustomResponseType
)
@response_handler
async def handle_response(
self,
original_request: CustomRequestType,
response: CustomResponseType,
context: WorkflowContext,
):
# Process the response...
...
O @response_handler decorador registra automaticamente o método para lidar com respostas para os tipos de solicitação e resposta especificados.
Manipulando solicitações e respostas
Um InputPort emite um RequestInfoEvent quando recebe uma solicitação. Você pode assinar esses eventos para lidar com solicitações de entrada do fluxo de trabalho. Quando você receber uma resposta de um sistema externo, envie-a de volta para o fluxo de trabalho usando o mecanismo de resposta. A estrutura roteia automaticamente a resposta para o executor que enviou a solicitação original.
StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent workflowOutputEvt:
// The workflow has completed successfully
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
Os executores podem enviar solicitações diretamente sem precisar de um componente separado. Quando um executor chama ctx.request_info(), o fluxo de trabalho emite um RequestInfoEvent. Você pode assinar esses eventos para lidar com solicitações de entrada do fluxo de trabalho. Quando você receber uma resposta de um sistema externo, envie-a de volta para o fluxo de trabalho usando o mecanismo de resposta. A estrutura roteia automaticamente a resposta para o método do executor @response_handler.
from agent_framework import RequestInfoEvent
while True:
request_info_events : list[RequestInfoEvent] = []
pending_responses : dict[str, CustomResponseType] = {}
stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)
async for event in stream:
if isinstance(event, RequestInfoEvent):
# Handle `RequestInfoEvent` from the workflow
request_info_events.append(event)
if not request_info_events:
break
for request_info_event in request_info_events:
# Handle `RequestInfoEvent` from the workflow
response = CustomResponseType(...)
pending_responses[request_info_event.request_id] = response
Pontos de verificação e solicitações
Para saber mais sobre pontos de verificação, consulte esta página.
Quando um ponto de verificação é criado, as solicitações pendentes também são salvas como parte do estado do ponto de verificação. Quando você restaurar de um ponto de verificação, todas as solicitações pendentes serão transmitidas como objetos RequestInfoEvent, permitindo que você as capture e responda. Não é possível fornecer respostas diretamente durante a operação de retomada. Em vez disso, você deve escutar os eventos remetidos e responder usando o mecanismo de resposta padrão.