Compartilhar via


Log de eventos do pipeline

O log de eventos do pipeline contém todas as informações relacionadas a um pipeline, incluindo logs de auditoria, verificações de qualidade de dados, progresso do pipeline e rastreabilidade de dados. Use o log de eventos para acompanhar, entender e monitorar o estado dos seus pipelines de dados.

Você pode exibir entradas de log de eventos na interface do usuário de monitoramento de pipeline, na API REST do Pipelines ou consultando diretamente o log de eventos. Esta seção se concentra em consultar diretamente o log de eventos.

Você também pode definir ações personalizadas a serem executadas quando os eventos são registrados, por exemplo, enviando alertas, com ganchos de evento.

Importante

Não exclua o log de eventos ou o catálogo pai ou o esquema em que o log de eventos é publicado. A exclusão do log de eventos pode resultar na falha de atualização do pipeline durante execuções futuras.

Para obter detalhes completos do esquema de log de eventos, consulte o esquema de log de eventos do Pipeline.

Consultar o log de eventos

Observação

Esta seção descreve o comportamento e a sintaxe padrão para trabalhar com logs de eventos em pipelines configurados com o Unity Catalog e no modo de publicação padrão.

Por padrão, um pipeline grava o log de eventos em uma tabela Delta oculta no catálogo padrão e no esquema configurado para o pipeline. Embora oculta, a tabela ainda pode ser consultada por todos os usuários suficientemente privilegiados. Por padrão, somente o proprietário do pipeline pode consultar a tabela de log de eventos.

Para consultar o log de eventos como proprietário, use a ID do pipeline:

SELECT * FROM event_log(<pipelineId>);

Por padrão, o nome do log de eventos oculto é formatado como event_log_{pipeline_id}, em que o ID do pipeline é o UUID atribuído pelo sistema com traços substituídos por sublinhados.

Você pode publicar o log de eventos editando as configurações avançadas do pipeline. Para obter detalhes, consulte configuração de pipeline para o log de eventos. Ao publicar um log de eventos, especifique o nome do log de eventos e, opcionalmente, especifique um catálogo e um esquema, como no exemplo a seguir:

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

O local do log de eventos também serve como o local do esquema para qualquer consulta do Carregador Automático no pipeline. O Databricks recomenda criar uma exibição sobre a tabela de log de eventos antes de modificar os privilégios, pois algumas configurações de computação podem permitir que os usuários obtenham acesso aos metadados de esquema se a tabela de log de eventos for compartilhada diretamente. A sintaxe de exemplo a seguir cria uma exibição em uma tabela de log de eventos e é usada nas consultas de log de eventos de exemplo incluídas neste artigo. Substitua <catalog_name>.<schema_name>.<event_log_table_name> pelo nome totalmente qualificado da tabela do log de eventos do pipeline. Se você publicou o log de eventos, use o nome especificado ao publicar. Caso contrário, use event_log(<pipelineId>) onde a pipelineId é a ID do pipeline que você deseja consultar.

CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;

No Catálogo do Unity, as exibições dão suporte as consultas de streaming. O exemplo a seguir usa o Streaming Estruturado para consultar uma exibição definida na parte superior de uma tabela de log de eventos:

df = spark.readStream.table("event_log_raw")

Exemplos básicos de consulta

Os exemplos a seguir mostram como consultar o log de eventos para obter informações gerais sobre pipelines e ajudar a depurar cenários comuns.

Monitorar as atualizações do pipeline consultando as atualizações anteriores

O exemplo a seguir consulta as atualizações (ou execuções) do pipeline, mostrando a ID da atualização, o status, a hora de início, a hora de conclusão e a duração. Isso oferece uma visão geral das execuções do pipeline.

Pressupõe que você criou a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.

with last_status_per_update AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
        timestamp,
        ROW_NUMBER() OVER (
            PARTITION BY origin.update_id
            ORDER BY timestamp DESC
        ) AS rn
    FROM event_log_raw
    WHERE event_type = 'update_progress'
    QUALIFY rn = 1
),
update_durations AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        -- Capture the start of the update
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,

        -- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
        COALESCE(
            MAX(CASE
                WHEN event_type = 'update_progress'
                 AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
                THEN timestamp
            END),
            current_timestamp()
        ) AS end_time
    FROM event_log_raw
    WHERE event_type IN ('create_update', 'update_progress')
      AND origin.update_id IS NOT NULL
    GROUP BY pipeline_id, pipeline_name, pipeline_update_id
    HAVING start_time IS NOT NULL
)
SELECT
    s.pipeline_id,
    s.pipeline_name,
    s.pipeline_update_id,
    d.start_time,
    d.end_time,
    CASE
        WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
            ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
        ELSE NULL
    END AS duration_seconds,
    s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
  ON s.pipeline_id = d.pipeline_id
 AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;

Depurar problemas de atualização incremental de exibição materializada

Este exemplo consulta todos os fluxos da atualização mais recente de um pipeline. Ele mostra se eles foram atualizados incrementalmente ou não, bem como outras informações de planejamento relevantes que são úteis para depurar por que uma atualização incremental não acontece.

Pressupõe que você criou a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  -- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
  SELECT
    origin.pipeline_name,
    origin.pipeline_id,
    origin.flow_name,
    lu.latest_update_id,
    from_json(
      details:planning_information,
      'struct<
        technique_information: array<struct<
          maintenance_type: string,
          is_chosen: boolean,
          is_applicable: boolean,
          cost: double,
          incrementalization_issues: array<struct<
            issue_type: string,
            prevent_incrementalization: boolean,
            operator_name: string,
            plan_not_incrementalizable_sub_type: string,
            expression_name: string,
            plan_not_deterministic_sub_type: string
          >>
        >>
      >'
    ) AS parsed
  FROM event_log_raw AS origin
  JOIN latest_update lu
    ON origin.update_id = lu.latest_update_id
  WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
  SELECT
    pipeline_name,
    pipeline_id,
    flow_name,
    latest_update_id,
    FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
    parsed.technique_information AS planning_information
  FROM parsed_planning
)
SELECT
  pipeline_name,
  pipeline_id,
  flow_name,
  latest_update_id,
  chosen_technique.maintenance_type,
  chosen_technique,
  planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;

Consultar o custo de uma atualização do pipeline

Este exemplo demonstra como consultar o uso de DBU para um pipeline, assim como identificar o usuário em uma execução específica do pipeline.

SELECT
  sku_name,
  billing_origin_product,
  usage_date,
  collect_set(identity_metadata.run_as) as users,
  SUM(usage_quantity) AS `DBUs`
FROM
  system.billing.usage
WHERE
  usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
  ALL;

Consultas avançadas

Os exemplos a seguir mostram como consultar o log de eventos para lidar com cenários menos comuns ou mais avançados.

Consultar métricas para todos os fluxos em um pipeline

Este exemplo mostra como consultar informações detalhadas sobre cada fluxo em um pipeline. Mostra o nome do fluxo, a duração da atualização, as métricas de qualidade dos dados e as informações sobre as linhas processadas (linhas de saída, registros excluídos, inseridos e descartados).

Pressupõe que você criou a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.

WITH flow_progress_raw AS (
  SELECT
    origin.pipeline_name         AS pipeline_name,
    origin.pipeline_id           AS pipeline_id,
    origin.flow_name             AS table_name,
    origin.update_id             AS update_id,
    timestamp,
    details:flow_progress.status AS status,
    TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT)      AS num_output_rows,
    TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT)    AS num_upserted_rows,
    TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT)     AS num_deleted_rows,
    TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
    FROM_JSON(
      details:flow_progress.data_quality.expectations,
      SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
    ) AS expectations_array

  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND origin.flow_name IS NOT NULL
    AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),

aggregated_flows AS (
  SELECT
    pipeline_name,
    pipeline_id,
    update_id,
    table_name,
    MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
    MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
    MAX_BY(status, timestamp) FILTER (
      WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
    ) AS final_status,
    SUM(COALESCE(num_output_rows, 0))              AS total_output_records,
    SUM(COALESCE(num_upserted_rows, 0))            AS total_upserted_records,
    SUM(COALESCE(num_deleted_rows, 0))             AS total_deleted_records,
    MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
    MAX(expectations_array)                        AS total_expectations

  FROM flow_progress_raw
  GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
  af.pipeline_name,
  af.pipeline_id,
  af.update_id,
  af.table_name,
  af.start_timestamp,
  af.end_timestamp,
  af.final_status,
  CASE
    WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
      ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
    ELSE NULL
  END AS duration_seconds,

  af.total_output_records,
  af.total_upserted_records,
  af.total_deleted_records,
  af.total_expectation_dropped_records,
  af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
  SELECT update_id
  FROM aggregated_flows
  ORDER BY end_timestamp DESC
  LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;

Consultar métricas de qualidade de dados ou expectativas

Se você definir expectativas em conjuntos de dados no seu pipeline, as métricas para o número de registros que atenderam ou falharam em relação a uma expectativa serão armazenadas no objeto details:flow_progress.data_quality.expectations. A métrica do número de registros descartados é armazenada no details:flow_progress.data_quality objeto. Eventos que contêm informações sobre a qualidade dos dados têm o tipo flow_progressde evento.

As métricas de qualidade de dados podem não estar disponíveis para alguns conjuntos de dados. Veja as limitações de expectativa.

As seguintes métricas de qualidade de dados estão disponíveis:

Métrica Description
dropped_records O número de registros que foram descartados porque falharam em uma ou mais expectativas.
passed_records O número de registros que passaram pelos critérios de expectativa.
failed_records O número de registros que falharam nos critérios de expectativa.

O exemplo a seguir mostra como consultar as métricas de qualidade de dados da última atualização do pipeline. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details:flow_progress:data_quality:expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name;

Consultar informações de linhagem

Eventos que contêm informações sobre linhagem têm o tipo flow_definitionde evento. O details:flow_definition objeto contém o output_dataset e o input_datasets que definem cada relação no grafo.

Use a consulta a seguir para extrair os conjuntos de dados de entrada e saída para ver as informações de linhagem. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  details:flow_definition.output_dataset as flow_name,
  details:flow_definition.input_datasets as input_flow_names,
  details:flow_definition.flow_type as flow_type,
  details:flow_definition.schema, -- the schema of the flow
  details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;

Monitorar a ingestão de arquivos de nuvem com o Carregador Automático

Os pipelines geram eventos quando o Carregador Automático processa arquivos. Para eventos do Carregador Automático, o event_type é operation_progress e o details:operation_progress:type é AUTO_LOADER_LISTING ou AUTO_LOADER_BACKFILL. O objeto details:operation_progress também inclui os campos status, duration_ms, auto_loader_details:source_path e auto_loader_details:num_files_listed.

O exemplo a seguir consulta eventos do Carregador Automático para a atualização mais recente. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest_update.id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');

Monitorar a pendência de dados para otimizar a duração do streaming

Cada pipeline rastreia a quantidade de dados presentes no backlog no details:flow_progress.metrics.backlog_bytes objeto. Os eventos que contêm métricas de lista de pendências têm o tipo flow_progressde evento. O exemplo a seguir consulta as métricas da lista de pendências em busca da última atualização do pipeline. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id;

Observação

As métricas de backlog podem não estar disponíveis dependendo do tipo de fonte de dados do pipeline e da versão do Databricks Runtime.

Monitorar eventos de dimensionamento automático para otimizar a computação clássica

Para pipelines que usam computação clássica (em outras palavras, não usam computação sem servidor), o log de eventos captura redimensionamentos de cluster quando o dimensionamento automático aprimorado está habilitado em seus pipelines. Eventos que contêm informações sobre dimensionamento automático aprimorado têm o tipo autoscalede evento. As informações de solicitação de redimensionamento do cluster são armazenadas no details:autoscale objeto.

O exemplo a seguir consulta as solicitações aprimoradas de redimensionamento automático do cluster para a atualização mais recente do pipeline. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Monitorar a utilização de recursos de computação para computação clássica

cluster_resources os eventos fornecem métricas sobre o número de slots de tarefa no cluster, o quanto esses slots de tarefa são usados e quantas tarefas estão aguardando para serem agendadas.

Quando o dimensionamento automático aprimorado é habilitado, cluster_resources os eventos também contêm métricas para o algoritmo de dimensionamento automático, incluindo latest_requested_num_executorse optimal_num_executors. Os eventos também mostram o status do algoritmo como estados diferentes, como CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSe BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Essas informações podem ser exibidas em conjunto com os eventos de dimensionamento automático para fornecer uma imagem geral do dimensionamento automático aprimorado.

O exemplo a seguir consulta o histórico do tamanho da fila de tarefas, o histórico de utilização da fila, o histórico de contagem de executores, além de outras métricas e o estado do dimensionamento automático na última atualização do pipeline. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
  Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
  Double(details:cluster_resources.num_executors) as current_executors,
  Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id;

Monitorar métricas de streaming de pipeline

Você pode exibir métricas sobre o progresso do fluxo em um pipeline. Pesquise por eventos de stream_progress para obter eventos muito semelhantes aos das métricas StreamingQueryListener criadas pelo Structured Streaming, com as seguintes exceções:

  • As seguintes métricas estão presentes em StreamingQueryListener, mas não em stream_progress: numInputRows, inputRowsPerSeconde processedRowsPerSecond.
  • Para fluxos de Kafka e Kinesis, os campos startOffset, endOffset e latestOffset podem ser muito grandes e são truncados. Para cada um desses campos, campos adicionais, ...Truncated, startOffsetTruncated, endOffsetTruncated, e latestOffsetTruncated, são adicionados com um valor booliano indicando se os dados são truncados.

Para consultar stream_progress eventos, você pode usar uma consulta como a seguinte:

SELECT
  parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';

Aqui está um exemplo de um evento, no JSON:

{
  "id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
  "sequence": {
    "control_plane_seq_no": 1234567890123456
  },
  "origin": {
    "cloud": "<cloud>",
    "region": "<region>",
    "org_id": 0123456789012345,
    "pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
    "pipeline_type": "WORKSPACE",
    "pipeline_name": "<pipeline name>",
    "update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
    "request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
  },
  "timestamp": "2025-06-17T03:18:14.018Z",
  "message": "Completed a streaming update of 'flow_name'."
  "level": "INFO",
  "details": {
    "stream_progress": {
      "progress": {
        "id": "abcdef12-abcd-3456-7890-abcd1234ef56",
        "runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
        "name": "silverTransformFromBronze",
        "timestamp": "2022-11-01T18:21:29.500Z",
        "batchId": 4,
        "durationMs": {
          "latestOffset": 62,
          "triggerExecution": 62
        },
        "stateOperators": [],
        "sources": [
          {
            "description": "DeltaSource[dbfs:/path/to/table]",
            "startOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "endOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "latestOffset": null,
            "metrics": {
              "numBytesOutstanding": "0",
              "numFilesOutstanding": "0"
            }
          }
        ],
        "sink": {
          "description": "DeltaSink[dbfs:/path/to/sink]",
          "numOutputRows": -1
        }
      }
    }
  },
  "event_type": "stream_progress",
  "maturity_level": "EVOLVING"
}

Este exemplo mostra registros não truncados em uma origem Kafka, com os campos ...Truncated definidos como false:

{
  "description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
  "startOffsetTruncated": false,
  "startOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706380
    }
  },
  "endOffsetTruncated": false,
  "endOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "latestOffsetTruncated": false,
  "latestOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "numInputRows": 292,
  "inputRowsPerSecond": 13.65826278123392,
  "processedRowsPerSecond": 14.479817514628582,
  "metrics": {
    "avgOffsetsBehindLatest": "0.0",
    "estimatedTotalBytesBehindLatest": "0.0",
    "maxOffsetsBehindLatest": "0",
    "minOffsetsBehindLatest": "0"
  }
}

Pipelines de auditoria

Você pode usar registros de log de eventos e outros logs de auditoria do Azure Databricks para obter uma visão completa de como os dados estão sendo atualizados em um pipeline.

O Lakeflow Spark Declarative Pipelines usa as credenciais do proprietário do pipeline para executar atualizações. Você pode alterar as credenciais usadas atualizando o proprietário do pipeline. O log de auditoria registra as ações do usuário no pipeline, incluindo a criação, edições na configuração e o acionamento de atualizações.

Confira Eventos do Catálogo do Unity para obter uma referência de eventos de auditoria do Catálogo do Unity.

Consultar ações do usuário no log de eventos

Você pode usar o log de eventos para auditar eventos, por exemplo, ações do usuário. Eventos que contêm informações sobre ações do usuário têm o tipo user_actionde evento.

Informações sobre a ação são armazenadas no user_action objeto no details campo. Use a consulta a seguir para construir um log de auditoria de eventos de usuário. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

Informações de tempo de execução

Você pode exibir informações de runtime de uma atualização de pipeline, como a versão do Databricks Runtime utilizada na atualização. Isso pressupõe que você criou a visualização event_log_raw para o pipeline de seu interesse, conforme descrito em Consulta do log de eventos.

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0