Compartilhar via


Definir o monitoramento personalizado de pipelines com ganchos de evento

Importante

O suporte para ganchos de evento está em Visualização Pública.

Você pode usar ganchos de evento para adicionar funções de retorno de chamada personalizadas do Python que são executadas quando os eventos são persistidos no log de eventos de um pipeline. Você pode usar ganchos de evento para implementar soluções personalizadas de monitoramento e alertas. Por exemplo, você pode usar ganchos de evento para enviar emails ou gravar em um log quando ocorrem eventos específicos ou para se integrar a soluções de terceiros para monitorar eventos de pipeline.

Defina um gancho de evento com uma função python que aceita um único argumento, em que o argumento é um dicionário que representa um evento. Em seguida, inclua os ganchos de evento no código-fonte de um pipeline. Todos os ganchos de evento definidos em um pipeline tentarão processar todos os eventos gerados durante cada atualização de pipeline. Se o pipeline for composto por vários arquivos de código-fonte, todos os ganchos de evento definidos serão aplicados a todo o pipeline. Embora os ganchos de evento estejam incluídos no código-fonte do pipeline, eles não estão incluídos no gráfico do pipeline.

Você pode usar os ganchos de eventos com pipelines que são publicados no metastore do Hive ou no Catálogo do Unity.

Observação

  • O Python é o único idioma com suporte para definir ganchos de evento. Para definir funções personalizadas do Python que processam eventos em um pipeline implementado usando a interface SQL, adicione as funções personalizadas em um arquivo de origem python separado que é executado como parte do pipeline. As funções do Python são aplicadas a todo o pipeline quando o pipeline é executado.
  • Os ganchos de eventos são acionados apenas para eventos nos quais o nível de maturidade é STABLE.
  • Os ganchos de evento são executados de forma assíncrona a partir das atualizações do pipeline, mas de forma síncrona com outros ganchos de evento. Isso significa que apenas um único gancho de evento é executado por vez, enquanto outros ganchos de evento aguardam para serem executados até que o gancho de evento em execução seja concluído. Se um gancho de evento for executado indefinidamente, ele bloqueará todos os outros ganchos de evento.
  • O Lakeflow Spark Declarative Pipelines (SDP) tenta executar cada gancho de evento em cada evento emitido durante uma atualização de pipeline. Para ajudar a garantir que os ganchos de evento com atraso tenham tempo para processar todos os eventos na fila, o SDP aguarda um período fixo não configurável antes de encerrar a computação que executa o pipeline. Porém, não é garantido que todos os ganchos sejam disparados em todos os eventos antes que a computação seja encerrada.

Monitorar o processamento de ganchos de eventos

Use o hook_progress tipo de evento no log de eventos do pipeline para monitorar o estado dos ganchos de evento de uma atualização. Para evitar dependências circulares, os ganchos de eventos não são acionados nos eventos hook_progress.

Definir um gancho de evento

Para definir um gancho de evento, use o decorador on_event_hook:

@dp.on_event_hook(max_allowable_consecutive_failures=None)
def user_event_hook(event):
  # Python code defining the event hook

O max_allowable_consecutive_failures descreve o número máximo de vezes consecutivas que um gancho de evento pode falhar antes de ser desativado. Uma falha no gancho de evento é definida sempre que o gancho de evento lança uma exceção. Se um gancho de evento estiver desabilitado, ele não processará novos eventos até que o pipeline seja reiniciado.

max_allowable_consecutive_failures deve ser um inteiro maior ou igual a 0 ou None. Um valor de None (atribuído por padrão) significa que não há limite para o número de falhas consecutivas permitidas para o gancho de evento, e o gancho de evento nunca é desabilitado.

Falhas de gancho de evento e desabilitação de ganchos de evento podem ser monitoradas no log de eventos como eventos hook_progress.

A função de gancho de evento deve ser uma função Python que aceita exatamente um parâmetro: uma representação em dicionário do evento que disparou este gancho de evento. Qualquer valor de retorno da função de gancho de evento é ignorado.

Exemplo: selecionar eventos específicos para processamento

O exemplo a seguir demonstra um gancho de evento que seleciona eventos específicos para processamento. Especificamente, este exemplo aguarda até que os eventos de pipeline STOPPING sejam recebidos e, em seguida, gera uma mensagem para os logs stdoutde driver.

@dp.on_event_hook
def my_event_hook(event):
  if (
    event['event_type'] == 'update_progress' and
    event['details']['update_progress']['state'] == 'STOPPING'
  ):
    print('Received notification that update is stopping: ', event)

Exemplo: enviar todos os eventos para um canal do Slack

O exemplo a seguir implementa um gancho de evento que envia todos os eventos recebidos para um canal do Slack usando a API do Slack.

Este exemplo usa um segredo do Databricks para armazenar com segurança um token necessário para autenticar na API do Slack.

from pyspark import pipelines as dp
import requests

# Get a Slack API token from a Databricks secret scope.
API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")

@dp.on_event_hook
def write_events_to_slack(event):
  res = requests.post(
    url='https://slack.com/api/chat.postMessage',
    headers={
      'Content-Type': 'application/json',
      'Authorization': 'Bearer ' + API_TOKEN,
    },
    json={
      'channel': '<channel-id>',
      'text': 'Received event:\n' + event,
    }
  )

Exemplo: configurar um gancho de evento para desabilitar após quatro falhas consecutivas

O exemplo a seguir demonstra como configurar um gancho de evento que é desabilitado se ele falhar consecutivamente quatro vezes.

from pyspark import pipelines as dp
import random

def run_failing_operation():
   raise Exception('Operation has failed')

# Allow up to 3 consecutive failures. After a 4th consecutive
# failure, this hook is disabled.
@dp.on_event_hook(max_allowable_consecutive_failures=3)
def non_reliable_event_hook(event):
  run_failing_operation()

Exemplo: Pipeline com um gancho de evento

O exemplo a seguir demonstra a adição de um gancho de evento ao código-fonte de um pipeline. Este é um exemplo simples, mas completo, de como utilizar ganchos de eventos em um pipeline.

from pyspark import pipelines as dp
import requests
import json
import time

API_TOKEN = dbutils.secrets.get(scope="<secret-scope>", key="<token-key>")
SLACK_POST_MESSAGE_URL = 'https://slack.com/api/chat.postMessage'
DEV_CHANNEL = 'CHANNEL'
SLACK_HTTPS_HEADER_COMMON = {
 'Content-Type': 'application/json',
 'Authorization': 'Bearer ' + API_TOKEN
}

# Create a single dataset.
@dp.table
def test_dataset():
 return spark.range(5)

# Definition of event hook to send events to a Slack channel.
@dp.on_event_hook
def write_events_to_slack(event):
  res = requests.post(url=SLACK_POST_MESSAGE_URL, headers=SLACK_HTTPS_HEADER_COMMON, json = {
    'channel': DEV_CHANNEL,
    'text': 'Event hook triggered by event: ' + event['event_type'] + ' event.'
  })