Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
O log de eventos do pipeline contém todas as informações relacionadas a um pipeline, incluindo logs de auditoria, verificações de qualidade de dados, progresso do pipeline e rastreabilidade de dados. Use o log de eventos para acompanhar, entender e monitorar o estado dos seus pipelines de dados.
Você pode exibir entradas de log de eventos na interface do usuário de monitoramento de pipeline, na API REST do Pipelines ou consultando diretamente o log de eventos. Esta seção se concentra em consultar diretamente o log de eventos.
Você também pode definir ações personalizadas a serem executadas quando os eventos são registrados, por exemplo, enviando alertas, com ganchos de evento.
Importante
Não exclua o log de eventos ou o catálogo pai ou o esquema em que o log de eventos é publicado. A exclusão do log de eventos pode resultar na falha de atualização do pipeline durante execuções futuras.
Para obter detalhes completos do esquema de log de eventos, consulte o esquema de log de eventos do Pipeline.
Consultar o log de eventos
Observação
Esta seção descreve o comportamento e a sintaxe padrão para trabalhar com logs de eventos em pipelines configurados com o Unity Catalog e no modo de publicação padrão.
- Para saber sobre o comportamento dos pipelines do Unity Catalog que usam o modo de publicação herdado, consulte Trabalhar com log de eventos para pipelines de modo de publicação herdado do Unity Catalog.
- Para comportamento e sintaxe de pipelines do metastore do Hive, consulte Trabalhar com o log de eventos para pipelines do metastore do Hive.
Por padrão, um pipeline grava o log de eventos em uma tabela Delta oculta no catálogo padrão e no esquema configurado para o pipeline. Embora oculta, a tabela ainda pode ser consultada por todos os usuários suficientemente privilegiados. Por padrão, somente o proprietário do pipeline pode consultar a tabela de log de eventos.
Para consultar o log de eventos como proprietário, use a ID do pipeline:
SELECT * FROM event_log(<pipelineId>);
Por padrão, o nome do log de eventos oculto é formatado como event_log_{pipeline_id}, em que o ID do pipeline é o UUID atribuído pelo sistema com traços substituídos por sublinhados.
Você pode publicar o log de eventos editando as configurações avançadas do pipeline. Para obter detalhes, consulte configuração de pipeline para o log de eventos. Ao publicar um log de eventos, especifique o nome do log de eventos e, opcionalmente, especifique um catálogo e um esquema, como no exemplo a seguir:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
O local do log de eventos também serve como o local do esquema para qualquer consulta do Carregador Automático no pipeline. O Databricks recomenda criar uma exibição sobre a tabela de log de eventos antes de modificar os privilégios, pois algumas configurações de computação podem permitir que os usuários obtenham acesso aos metadados de esquema se a tabela de log de eventos for compartilhada diretamente. A sintaxe de exemplo a seguir cria uma exibição em uma tabela de log de eventos e é usada nas consultas de log de eventos de exemplo incluídas neste artigo. Substitua <catalog_name>.<schema_name>.<event_log_table_name> pelo nome totalmente qualificado da tabela do log de eventos do pipeline. Se você publicou o log de eventos, use o nome especificado ao publicar. Caso contrário, use event_log(<pipelineId>) onde a pipelineId é a ID do pipeline que você deseja consultar.
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
No Catálogo do Unity, as exibições dão suporte as consultas de streaming. O exemplo a seguir usa o Streaming Estruturado para consultar uma exibição definida na parte superior de uma tabela de log de eventos:
df = spark.readStream.table("event_log_raw")
Exemplos básicos de consulta
Os exemplos a seguir mostram como consultar o log de eventos para obter informações gerais sobre pipelines e ajudar a depurar cenários comuns.
Monitorar as atualizações do pipeline consultando as atualizações anteriores
O exemplo a seguir consulta as atualizações (ou execuções) do pipeline, mostrando a ID da atualização, o status, a hora de início, a hora de conclusão e a duração. Isso oferece uma visão geral das execuções do pipeline.
Pressupõe que você criou a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.
with last_status_per_update AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY origin.update_id
ORDER BY timestamp DESC
) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
-- Capture the start of the update
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
-- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
COALESCE(
MAX(CASE
WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
THEN timestamp
END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update', 'update_progress')
AND origin.update_id IS NOT NULL
GROUP BY pipeline_id, pipeline_name, pipeline_update_id
HAVING start_time IS NOT NULL
)
SELECT
s.pipeline_id,
s.pipeline_name,
s.pipeline_update_id,
d.start_time,
d.end_time,
CASE
WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
ELSE NULL
END AS duration_seconds,
s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
ON s.pipeline_id = d.pipeline_id
AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
Depurar problemas de atualização incremental de exibição materializada
Este exemplo consulta todos os fluxos da atualização mais recente de um pipeline. Ele mostra se eles foram atualizados incrementalmente ou não, bem como outras informações de planejamento relevantes que são úteis para depurar por que uma atualização incremental não acontece.
Pressupõe que você criou a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
-- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.flow_name,
lu.latest_update_id,
from_json(
details:planning_information,
'struct<
technique_information: array<struct<
maintenance_type: string,
is_chosen: boolean,
is_applicable: boolean,
cost: double,
incrementalization_issues: array<struct<
issue_type: string,
prevent_incrementalization: boolean,
operator_name: string,
plan_not_incrementalizable_sub_type: string,
expression_name: string,
plan_not_deterministic_sub_type: string
>>
>>
>'
) AS parsed
FROM event_log_raw AS origin
JOIN latest_update lu
ON origin.update_id = lu.latest_update_id
WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
parsed.technique_information AS planning_information
FROM parsed_planning
)
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
chosen_technique.maintenance_type,
chosen_technique,
planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
Consultar o custo de uma atualização do pipeline
Este exemplo demonstra como consultar o uso de DBU para um pipeline, assim como identificar o usuário em uma execução específica do pipeline.
SELECT
sku_name,
billing_origin_product,
usage_date,
collect_set(identity_metadata.run_as) as users,
SUM(usage_quantity) AS `DBUs`
FROM
system.billing.usage
WHERE
usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
ALL;
Consultas avançadas
Os exemplos a seguir mostram como consultar o log de eventos para lidar com cenários menos comuns ou mais avançados.
Consultar métricas para todos os fluxos em um pipeline
Este exemplo mostra como consultar informações detalhadas sobre cada fluxo em um pipeline. Mostra o nome do fluxo, a duração da atualização, as métricas de qualidade dos dados e as informações sobre as linhas processadas (linhas de saída, registros excluídos, inseridos e descartados).
Pressupõe que você criou a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.
WITH flow_progress_raw AS (
SELECT
origin.pipeline_name AS pipeline_name,
origin.pipeline_id AS pipeline_id,
origin.flow_name AS table_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress.status AS status,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS num_output_rows,
TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT) AS num_upserted_rows,
TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT) AS num_deleted_rows,
TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
FROM_JSON(
details:flow_progress.data_quality.expectations,
SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
) AS expectations_array
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND origin.flow_name IS NOT NULL
AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
SELECT
pipeline_name,
pipeline_id,
update_id,
table_name,
MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
MAX_BY(status, timestamp) FILTER (
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
) AS final_status,
SUM(COALESCE(num_output_rows, 0)) AS total_output_records,
SUM(COALESCE(num_upserted_rows, 0)) AS total_upserted_records,
SUM(COALESCE(num_deleted_rows, 0)) AS total_deleted_records,
MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
MAX(expectations_array) AS total_expectations
FROM flow_progress_raw
GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
af.pipeline_name,
af.pipeline_id,
af.update_id,
af.table_name,
af.start_timestamp,
af.end_timestamp,
af.final_status,
CASE
WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
ELSE NULL
END AS duration_seconds,
af.total_output_records,
af.total_upserted_records,
af.total_deleted_records,
af.total_expectation_dropped_records,
af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
SELECT update_id
FROM aggregated_flows
ORDER BY end_timestamp DESC
LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
Consultar métricas de qualidade de dados ou expectativas
Se você definir expectativas em conjuntos de dados no seu pipeline, as métricas para o número de registros que atenderam ou falharam em relação a uma expectativa serão armazenadas no objeto details:flow_progress.data_quality.expectations. A métrica do número de registros descartados é armazenada no details:flow_progress.data_quality objeto. Eventos que contêm informações sobre a qualidade dos dados têm o tipo flow_progressde evento.
As métricas de qualidade de dados podem não estar disponíveis para alguns conjuntos de dados. Veja as limitações de expectativa.
As seguintes métricas de qualidade de dados estão disponíveis:
| Métrica | Description |
|---|---|
dropped_records |
O número de registros que foram descartados porque falharam em uma ou mais expectativas. |
passed_records |
O número de registros que passaram pelos critérios de expectativa. |
failed_records |
O número de registros que falharam nos critérios de expectativa. |
O exemplo a seguir mostra como consultar as métricas de qualidade de dados da última atualização do pipeline. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name;
Consultar informações de linhagem
Eventos que contêm informações sobre linhagem têm o tipo flow_definitionde evento. O details:flow_definition objeto contém o output_dataset e o input_datasets que definem cada relação no grafo.
Use a consulta a seguir para extrair os conjuntos de dados de entrada e saída para ver as informações de linhagem. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
details:flow_definition.output_dataset as flow_name,
details:flow_definition.input_datasets as input_flow_names,
details:flow_definition.flow_type as flow_type,
details:flow_definition.schema, -- the schema of the flow
details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
Monitorar a ingestão de arquivos de nuvem com o Carregador Automático
Os pipelines geram eventos quando o Carregador Automático processa arquivos. Para eventos do Carregador Automático, o event_type é operation_progress e o details:operation_progress:type é AUTO_LOADER_LISTING ou AUTO_LOADER_BACKFILL. O objeto details:operation_progress também inclui os campos status, duration_ms, auto_loader_details:source_path e auto_loader_details:num_files_listed.
O exemplo a seguir consulta eventos do Carregador Automático para a atualização mais recente. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest_update.id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
Monitorar a pendência de dados para otimizar a duração do streaming
Cada pipeline rastreia a quantidade de dados presentes no backlog no details:flow_progress.metrics.backlog_bytes objeto. Os eventos que contêm métricas de lista de pendências têm o tipo flow_progressde evento. O exemplo a seguir consulta as métricas da lista de pendências em busca da última atualização do pipeline. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id;
Observação
As métricas de backlog podem não estar disponíveis dependendo do tipo de fonte de dados do pipeline e da versão do Databricks Runtime.
Monitorar eventos de dimensionamento automático para otimizar a computação clássica
Para pipelines que usam computação clássica (em outras palavras, não usam computação sem servidor), o log de eventos captura redimensionamentos de cluster quando o dimensionamento automático aprimorado está habilitado em seus pipelines. Eventos que contêm informações sobre dimensionamento automático aprimorado têm o tipo autoscalede evento. As informações de solicitação de redimensionamento do cluster são armazenadas no details:autoscale objeto.
O exemplo a seguir consulta as solicitações aprimoradas de redimensionamento automático do cluster para a atualização mais recente do pipeline. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Monitorar a utilização de recursos de computação para computação clássica
cluster_resources os eventos fornecem métricas sobre o número de slots de tarefa no cluster, o quanto esses slots de tarefa são usados e quantas tarefas estão aguardando para serem agendadas.
Quando o dimensionamento automático aprimorado é habilitado, cluster_resources os eventos também contêm métricas para o algoritmo de dimensionamento automático, incluindo latest_requested_num_executorse optimal_num_executors. Os eventos também mostram o status do algoritmo como estados diferentes, como CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSe BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION.
Essas informações podem ser exibidas em conjunto com os eventos de dimensionamento automático para fornecer uma imagem geral do dimensionamento automático aprimorado.
O exemplo a seguir consulta o histórico do tamanho da fila de tarefas, o histórico de utilização da fila, o histórico de contagem de executores, além de outras métricas e o estado do dimensionamento automático na última atualização do pipeline. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
Double(details:cluster_resources.num_executors) as current_executors,
Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id;
Monitorar métricas de streaming de pipeline
Você pode exibir métricas sobre o progresso do fluxo em um pipeline. Pesquise por eventos de stream_progress para obter eventos muito semelhantes aos das métricas StreamingQueryListener criadas pelo Structured Streaming, com as seguintes exceções:
- As seguintes métricas estão presentes em
StreamingQueryListener, mas não emstream_progress:numInputRows,inputRowsPerSecondeprocessedRowsPerSecond. - Para fluxos de Kafka e Kinesis, os campos
startOffset,endOffsetelatestOffsetpodem ser muito grandes e são truncados. Para cada um desses campos, campos adicionais,...Truncated,startOffsetTruncated,endOffsetTruncated, elatestOffsetTruncated, são adicionados com um valor booliano indicando se os dados são truncados.
Para consultar stream_progress eventos, você pode usar uma consulta como a seguinte:
SELECT
parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';
Aqui está um exemplo de um evento, no JSON:
{
"id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
"sequence": {
"control_plane_seq_no": 1234567890123456
},
"origin": {
"cloud": "<cloud>",
"region": "<region>",
"org_id": 0123456789012345,
"pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"pipeline_type": "WORKSPACE",
"pipeline_name": "<pipeline name>",
"update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
},
"timestamp": "2025-06-17T03:18:14.018Z",
"message": "Completed a streaming update of 'flow_name'."
"level": "INFO",
"details": {
"stream_progress": {
"progress": {
"id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"name": "silverTransformFromBronze",
"timestamp": "2022-11-01T18:21:29.500Z",
"batchId": 4,
"durationMs": {
"latestOffset": 62,
"triggerExecution": 62
},
"stateOperators": [],
"sources": [
{
"description": "DeltaSource[dbfs:/path/to/table]",
"startOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"endOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"latestOffset": null,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
],
"sink": {
"description": "DeltaSink[dbfs:/path/to/sink]",
"numOutputRows": -1
}
}
}
},
"event_type": "stream_progress",
"maturity_level": "EVOLVING"
}
Este exemplo mostra registros não truncados em uma origem Kafka, com os campos ...Truncated definidos como false:
{
"description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffsetTruncated": false,
"startOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706380
}
},
"endOffsetTruncated": false,
"endOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"latestOffsetTruncated": false,
"latestOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"numInputRows": 292,
"inputRowsPerSecond": 13.65826278123392,
"processedRowsPerSecond": 14.479817514628582,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
}
Pipelines de auditoria
Você pode usar registros de log de eventos e outros logs de auditoria do Azure Databricks para obter uma visão completa de como os dados estão sendo atualizados em um pipeline.
O Lakeflow Spark Declarative Pipelines usa as credenciais do proprietário do pipeline para executar atualizações. Você pode alterar as credenciais usadas atualizando o proprietário do pipeline. O log de auditoria registra as ações do usuário no pipeline, incluindo a criação, edições na configuração e o acionamento de atualizações.
Confira Eventos do Catálogo do Unity para obter uma referência de eventos de auditoria do Catálogo do Unity.
Consultar ações do usuário no log de eventos
Você pode usar o log de eventos para auditar eventos, por exemplo, ações do usuário. Eventos que contêm informações sobre ações do usuário têm o tipo user_actionde evento.
Informações sobre a ação são armazenadas no user_action objeto no details campo. Use a consulta a seguir para construir um log de auditoria de eventos de usuário. Isso pressupõe que você tenha criado a visualização event_log_raw para o pipeline no qual você está interessado, conforme descrito em Consultar o log de eventos.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
|---|---|---|
| 2021-05-20T19:36:03.517+0000 | START |
user@company.com |
| 2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
| 2021-05-27T00:35:51.971+0000 | START |
user@company.com |
Informações de tempo de execução
Você pode exibir informações de runtime de uma atualização de pipeline, como a versão do Databricks Runtime utilizada na atualização. Isso pressupõe que você criou a visualização event_log_raw para o pipeline de seu interesse, conforme descrito em Consulta do log de eventos.
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
|---|
| 11.0 |