Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Azure Databricks proporciona una supervisión integrada para las aplicaciones de Structured Streaming mediante la interfaz de usuario de Spark de la pestaña Streaming.
Cómo distinguir consultas de Structured Streaming en la UI de Spark
Para distinguir las métricas pertenecen a cada secuencia de manera sencilla mediante la UI de Spark, proporcione un nombre de consulta único a los flujos. Para ello, agregue el elemento .queryName(<query-name>) al código writeStream.
Inserción de métricas de Structured Streaming en servicios externos
Las métricas de streaming se pueden insertar en servicios externos para alertas o casos de uso de paneles mediante la interfaz del cliente de escucha de consultas de streaming de Apache Spark. En Databricks Runtime 11.3 LTS y versiones posteriores, StreamingQueryListener está disponible en Python y Scala.
Important
Las siguientes limitaciones se aplican a las cargas de trabajo mediante los modos de acceso a la computación habilitados para Unity Catalog:
-
StreamingQueryListenerrequiere Databricks Runtime 15.1 o superior para usar credenciales o interactuar con objetos gestionados por Unity Catalog en cómputo con el modo de acceso dedicado. -
StreamingQueryListenerrequiere Databricks Runtime 16.1 o superior para cargas de trabajo de Scala configuradas con el modo de acceso estándar (anteriormente modo de acceso compartido).
Note
La latencia de procesamiento con agentes de escucha puede afectar significativamente a las velocidades de procesamiento de consultas. Se recomienda limitar la lógica de procesamiento en estos agentes de escucha y optar por escribir en sistemas de respuesta rápida como Kafka para mejorar la eficacia.
Si la consulta no tiene datos disponibles en el origen y está esperando nuevos datos, se entrega un onQueryIdle mensaje al agente de escucha de consulta de streaming. Un onQueryProgress mensaje solo se entrega al final del lote de consulta de streaming. Si la consulta ha estado procesando datos durante mucho tiempo, es posible que no se envíen ni los onQueryIdleonQueryProgress eventos, pero la consulta sigue en buen estado y continúa procesando los datos.
En el código siguiente se proporcionan ejemplos básicos de la sintaxis para implementar un cliente de escucha:
Scala
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
* `onQueryStart` calls on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block this method, as it blocks your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] returns the
* latest status, regardless of when this method is called. The status of [[StreamingQuery]]
* may change before or when you process the event. For example, you may find [[StreamingQuery]]
* terminates when processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when the query is idle and waiting for new data to process.
*/
def onQueryIdle(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
Python
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
"""
Called when a query is started.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
The properties are available as the same as Scala API.
Notes
-----
This is called synchronously with
meth:`pyspark.sql.streaming.DataStreamWriter.start`,
that is, ``onQueryStart`` will be called on all listeners before
``DataStreamWriter.start()`` returns the corresponding
:class:`pyspark.sql.streaming.StreamingQuery`.
Do not block in this method as it will block your query.
"""
pass
def onQueryProgress(self, event):
"""
Called when there is some status update (ingestion rate updated, etc.)
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
The properties are available as the same as Scala API.
Notes
-----
This method is asynchronous. The status in
:class:`pyspark.sql.streaming.StreamingQuery` returns the
most recent status, regardless of when this method is called. The status
of :class:`pyspark.sql.streaming.StreamingQuery`.
may change before or when you process the event.
For example, you may find :class:`StreamingQuery`
terminates when processing `QueryProgressEvent`.
"""
pass
def onQueryIdle(self, event):
"""
Called when the query is idle and waiting for new data to process.
"""
pass
def onQueryTerminated(self, event):
"""
Called when a query is stopped, with or without error.
Parameters
----------
event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
The properties are available as the same as Scala API.
"""
pass
my_listener = MyListener()
Definición de métricas observables en Structured Streaming
Las métricas observables son funciones de agregado arbitrarias con nombre que se pueden definir en una consulta (DataFrame). En cuanto la ejecución de un DataFrame alcanza un punto de finalización (es decir, finaliza una consulta por lotes o llega a una época de streaming), se emite un evento con nombre que contiene las métricas de los datos procesados desde el último punto de finalización.
Para observar estas métricas, adjunte un cliente de escucha a la sesión de Spark. El cliente de escucha que debe usar dependerá del modo de ejecución:
Modo por lotes: use el cliente
QueryExecutionListener.El agente
QueryExecutionListenerrecibe una llamada cuando se completa una consulta. Acceda a las métricas mediante el mapaQueryExecution.observedMetrics.Streaming o microlote: use el agente
StreamingQueryListener.El agente
StreamingQueryListenerrecibe una llamada cuando la consulta de streaming completa una época. Acceda a las métricas mediante el mapaStreamingQueryProgress.observedMetrics. Azure Databricks no admite el modo de desencadenadorcontinuouspara streaming.
Por ejemplo:
Scala
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
Python
# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()
# Define my listener.
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print(f"'{event.name}' [{event.id}] got started!")
def onQueryProgress(self, event):
row = event.progress.observedMetrics.get("metric")
if row is not None:
if row.malformed / row.cnt > 0.5:
print("ALERT! Ouch! there are too many malformed "
f"records {row.malformed} out of {row.cnt}!")
else:
print(f"{row.cnt} rows processed!")
def onQueryTerminated(self, event):
print(f"{event.id} got terminated!")
# Add my listener.
spark.streams.addListener(MyListener())
Asignar identificadores de tabla de métricas de Unity Catalog, Delta Lake y Structured Streaming
Las métricas de Structured Streaming usan el reservoirId campo en varios lugares para la identidad única de una tabla Delta que se usa como origen para una consulta de streaming.
El campo reservoirId asigna el identificador único almacenado por la tabla Delta en el registro de transacciones de Delta. Este identificador no corresponde al tableId valor asignado por Unity Catalog y mostrado en Catalog Explorer.
Use la sintaxis siguiente para revisar el identificador de tabla de una tabla Delta. Esto funciona para las tablas administradas por Unity Catalog, las tablas externas de Unity Catalog y todas las tablas Delta del metastore de Hive.
DESCRIBE DETAIL <table-name>
El campo id que se muestra en los resultados es el identificador que se asigna a la reservoirId en las métricas de streaming.
Métricas del objeto StreamingQueryListener
| Fields | Description |
|---|---|
id |
Una id. de consulta único que persiste en los reinicios. |
runId |
Identificador de consulta único para cada inicio o reinicio. Consulte StreamingQuery.runId(). |
name |
El nombre especificado por el usuario de la consulta. El nombre es NULL si no se especifica ningún nombre. |
timestamp |
La marca de tiempo para la ejecución del microlote. |
batchId |
Una iId. único del lote actual de datos que se está procesando. En el caso de reintentos tras un error, se puede ejecutar un id. de lote determinado más de una vez. Del mismo modo, cuando no hay datos que se van a procesar, el id. de lote no se incrementa. |
batchDuration |
Duración del procesamiento de una operación por lotes, en milisegundos. |
numInputRows |
El número agregado (en todos los orígenes) de registros procesados en un desencadenador. |
inputRowsPerSecond |
La tasa de agregado (en todos los orígenes) de los datos que llegan. |
processedRowsPerSecond |
La tasa de agregado (en todos los orígenes) a la que Spark está procesando datos. |
StreamingQueryListener también define los siguientes campos que contienen objetos que puede examinar para ver las métricas del cliente y los detalles del progreso del origen:
| Fields | Description |
|---|---|
durationMs |
Escriba: ju.Map[String, JLong]. Vea el objeto durationMs. |
eventTime |
Escriba: ju.Map[String, String]. Vea el objeto eventTime. |
stateOperators |
Escriba: Array[StateOperatorProgress]. Vea el objeto stateOperators. |
sources |
Escriba: Array[SourceProgress]. Vea el objeto sources. |
sink |
Escriba: SinkProgress. Vea el objeto sink. |
observedMetrics |
Escriba: ju.Map[String, Row]. Funciones de agregado arbitrarias con nombre que se pueden definir en un dataframe o consulta (como df.observe). |
objeto durationMs
Tipo de objeto: ju.Map[String, JLong]
Información sobre el tiempo necesario para completar varias fases del proceso de ejecución de microlote.
| Fields | Description |
|---|---|
durationMs.addBatch |
El tiempo necesario para ejecutar el microlote. Esto excluye el tiempo que tarda Spark en planear el microlote. |
durationMs.getBatch |
El tiempo necesario para recuperar los metadatos sobre los desplazamientos del origen. |
durationMs.latestOffset |
El desplazamiento más reciente consumido para el microlote. Este objeto de progreso hace referencia al tiempo necesario para recuperar el desplazamiento más reciente de los orígenes. |
durationMs.queryPlanning |
El tiempo necesario para generar el plan de ejecución. |
durationMs.triggerExecution |
El tiempo necesario para planear y ejecutar el microlote. |
durationMs.walCommit |
El tiempo necesario para confirmar los nuevos desplazamientos disponibles. |
durationMs.commitBatch |
Tiempo necesario para confirmar los datos escritos en el receptor durante addBatch. Solo está presente para los receptores que admiten la confirmación. |
durationMs.commitOffsets |
Tiempo necesario para confirmar el lote en el registro de confirmación. |
objeto eventTime
Tipo de objeto: ju.Map[String, String]
Información sobre el valor del tiempo del evento visto dentro de los datos que se procesan en el microlote. Esta marca de agua usa estos datos para averiguar cómo recortar el estado para procesar agregaciones con estado definidas en el trabajo de flujo estructurado.
| Fields | Description |
|---|---|
eventTime.avg |
El promedio de tiempo del evento visto en el desencadenador. |
eventTime.max |
El tiempo máximo del evento visto en el desencadenador. |
eventTime.min |
El tiempo mínimo del evento visto en el desencadenador. |
eventTime.watermark |
El valor de la marca de agua utilizada en el desencadenador. |
objeto stateOperators
Tipo de objeto: Array[StateOperatorProgress] el stateOperators objeto contiene información sobre las operaciones con estado definidas en el trabajo Structured Streaming y las agregaciones que se generan a partir de ellas.
Para obtener más información sobre los operadores de estado de transmisión, consulte ¿Qué es el streaming con estado?.
| Fields | Description |
|---|---|
stateOperators.operatorName |
Nombre del operador con estado al que se relacionan las métricas, como symmetricHashJoin, dedupeo stateStoreSave. |
stateOperators.numRowsTotal |
El número total de filas en el estado como resultado del operador o agregación con estado. |
stateOperators.numRowsUpdated |
El número total de filas actualizadas en el estado como resultado del operador o agregación con estado. |
stateOperators.allUpdatesTimeMs |
Actualmente, Spark no puede medir esta métrica y está previsto quitarla en futuras actualizaciones. |
stateOperators.numRowsRemoved |
El número total de filas quitadas del estado como resultado del operador o agregación con estado. |
stateOperators.allRemovalsTimeMs |
Actualmente, Spark no puede medir esta métrica y está previsto quitarla en futuras actualizaciones. |
stateOperators.commitTimeMs |
El tiempo necesario para confirmar todas las actualizaciones (colocaciones y eliminaciones) y devolver una nueva versión. |
stateOperators.memoryUsedBytes |
Memoria usada por el almacén de estado. |
stateOperators.numRowsDroppedByWatermark |
El número de filas que se consideran demasiado tardías para incluirse en una agregación con estado. Agregaciones de streaming solo: el número de filas eliminadas después de la agregación (y no las filas de entrada sin procesar). Este número no es preciso, pero proporciona una indicación de que hay datos en tiempo de espera que se quiten. |
stateOperators.numShufflePartitions |
El número de particiones aleatorias para este operador con estado. |
stateOperators.numStateStoreInstances |
La instancia de almacén de estado real que el operador ha inicializado y mantenido. Para muchos operadores con estado, este es el mismo que el número de particiones. Sin embargo, las combinaciones stream-stream inicializan cuatro instancias de almacén de estado por partición. |
stateOperators.customMetrics |
Consulte stateOperators.customMetrics en este tema para obtener más detalles. |
Objeto StateOperatorProgress.customMetrics
Tipo de objeto: ju.Map[String, JLong]
StateOperatorProgress tiene un campo , , customMetricsque contiene las métricas específicas de la característica que usa al recopilar esas métricas.
| Feature | Description |
|---|---|
| Almacén de estado de RocksDB | Métricas del almacén de estado de RocksDB. |
| Almacén de estado de HDFS | Métricas para el almacén de estado de HDFS. |
| Desduplicación de flujos | Métricas para la desduplicación de filas. |
| Agregación de flujos | Métricas para la agregación de filas. |
| Operador stream join | Métricas para el operador de combinación de flujos. |
transformWithState |
Métricas para el operador transformWithState. |
Métricas personalizadas del almacén de estado de RocksDB
Información recopilada de RocksDB que captura métricas sobre su rendimiento y operaciones con respecto a los valores con estado que mantiene para el trabajo de flujo estructurado. Para más información, consulte Configuración del almacén de estado de RocksDB en Azure Databricks.
| Fields | Description |
|---|---|
customMetrics.rocksdbBytesCopied |
El número de bytes copiados según el seguimiento del administrador de archivos de RocksDB. |
customMetrics.rocksdbCommitCheckpointLatency |
El tiempo en milisegundos para tomar una instantánea de RocksDB nativo y escribirlo en un directorio local. |
customMetrics.rocksdbCompactLatency |
El tiempo en milisegundos para realizar la compactación (opcional) durante la confirmación del punto de comprobación. |
customMetrics.rocksdbCommitCompactLatency |
Tiempo de compactación durante la confirmación, en milisegundos. |
customMetrics.rocksdbCommitFileSyncLatencyMs |
El tiempo en milisegundos que sincroniza la instantánea nativa de RocksDB con el almacenamiento externo (la ubicación del punto de control). |
customMetrics.rocksdbCommitFlushLatency |
El tiempo en milisegundos que vacía los cambios en memoria de RocksDB en memoria en el disco local. |
customMetrics.rocksdbCommitPauseLatency |
Tiempo en milisegundos que detiene los subprocesos de trabajo en segundo plano como parte de la confirmación del punto de comprobación, como la compactación. |
customMetrics.rocksdbCommitWriteBatchLatency |
El tiempo en milisegundos aplicando las escrituras almacenadas provisionalmente en la estructura en memoria (WriteBatch) a RocksDB nativa. |
customMetrics.rocksdbFilesCopied |
El número de archivos copiados según el seguimiento del administrador de archivos de RocksDB. |
customMetrics.rocksdbFilesReused |
El número de archivos reutilizados según el seguimiento del administrador de archivos de RocksDB. |
customMetrics.rocksdbGetCount |
El número de llamadas get (no incluye gets de WriteBatch: lote en memoria que se usa para escrituras de almacenamiento provisional). |
customMetrics.rocksdbGetLatency |
El promedio de tiempo en nanosegundos para la llamada nativa RocksDB::Get subyacente. |
customMetrics.rocksdbReadBlockCacheHitCount |
Recuento de aciertos de caché de la caché de bloques en RocksDB. |
customMetrics.rocksdbReadBlockCacheMissCount |
Recuento de errores de caché de bloques en RocksDB. |
customMetrics.rocksdbSstFileSize |
Tamaño de todos los archivos de tabla ordenada estática (SST) en la instancia de RocksDB. |
customMetrics.rocksdbTotalBytesRead |
El número de bytes sin comprimir leídos por operaciones de get. |
customMetrics.rocksdbTotalBytesWritten |
El número total de bytes sin comprimir escritos por operaciones de put. |
customMetrics.rocksdbTotalBytesReadThroughIterator |
El número de bytes totales de los datos sin comprimir leídos mediante un iterador. Algunas operaciones con estado (por ejemplo, el procesamiento de tiempo de espera en FlatMapGroupsWithState y la marca de agua) requieren leer datos en Azure Databricks a través de un iterador. |
customMetrics.rocksdbTotalBytesReadByCompaction |
El número de bytes de lectura del proceso de compactación del disco. |
customMetrics.rocksdbTotalBytesWrittenByCompaction |
El número total de bytes que escribe el proceso de compactación en el disco. |
customMetrics.rocksdbTotalCompactionLatencyMs |
El tiempo en milisegundos para compactaciones de RocksDB, incluidas las compactaciones en segundo plano y la compactación opcional iniciada durante la confirmación. |
customMetrics.rocksdbTotalFlushLatencyMs |
El tiempo total de vaciado, incluido el vaciado en segundo plano. Las operaciones de vaciado son procesos por los que MemTable se vacía en el almacenamiento una vez que está lleno.
MemTables son el primer nivel donde se almacenan los datos en RocksDB. |
customMetrics.rocksdbZipFileBytesUncompressed |
Tamaño en bytes de los archivos ZIP sin comprimir, tal como lo informa el Administrador de archivos. El administrador de archivos administra el uso y eliminación del espacio en disco del archivo SST físico. |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
La versión más reciente de la instantánea de RocksDB guardada en la ubicación del punto de comprobación. Un valor de "-1" indica que nunca se ha guardado ninguna instantánea. Dado que las instantáneas son específicas de cada instancia de almacén de estado, esta métrica se aplica a un identificador de partición y un nombre de almacén de estado concretos. |
customMetrics.rocksdbPutLatency |
Latencia total de llamadas put. |
customMetrics.rocksdbPutCount |
Número de llamadas put. |
customMetrics.rocksdbWriterStallLatencyMs |
Tiempo de espera del escritor para que finalice la compactación o vaciado. |
customMetrics.rocksdbTotalBytesWrittenByFlush |
Bytes totales escritos por vaciado |
customMetrics.rocksdbPinnedBlocksMemoryUsage |
Uso de memoria para bloques anclados |
customMetrics.rocksdbNumInternalColFamiliesKeys |
Número de claves internas para familias de columnas internas |
customMetrics.rocksdbNumExternalColumnFamilies |
Número de familias de columnas externas |
customMetrics.rocksdbNumInternalColumnFamilies |
Número de familias de columnas internas |
Métricas personalizadas del almacén de estado de HDFS
Información recopilada sobre los comportamientos y las operaciones del proveedor del almacén de estado de HDFS.
| Fields | Description |
|---|---|
customMetrics.stateOnCurrentVersionSizeBytes |
Tamaño estimado de estado solo en la versión actual. |
customMetrics.loadedMapCacheHitCount |
Recuento de aciertos de caché en estados almacenados en caché en el proveedor. |
customMetrics.loadedMapCacheMissCount |
Recuento de fallos de caché en estados almacenados en caché en el proveedor. |
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> |
La última versión cargada de la instantánea para una instancia de almacén de estado específica. |
Métricas personalizadas de desduplicación
Información recopilada sobre los comportamientos y las operaciones de desduplicación.
| Fields | Description |
|---|---|
customMetrics.numDroppedDuplicateRows |
Número de filas duplicadas eliminadas. |
customMetrics.numRowsReadDuringEviction |
Número de filas de estado leídas durante la expulsión de estado. |
Métricas personalizadas de agregación
Información recopilada sobre los comportamientos y las operaciones de agregación.
| Fields | Description |
|---|---|
customMetrics.numRowsReadDuringEviction |
Número de filas de estado leídas durante la expulsión de estado. |
Métricas personalizadas de unión a secuencias
Información recopilada sobre los comportamientos y las operaciones de unión de secuencias.
| Fields | Description |
|---|---|
customMetrics.skippedNullValueCount |
Número de valores null omitidos, cuando spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled se establece en true. |
métricas personalizadas de transformWithState
Información recopilada sobre los comportamientos y las operaciones de transformWithState (TWS). Para obtener más información sobre transformWithState, consulte Compilación de una aplicación con estado personalizada.
| Fields | Description |
|---|---|
customMetrics.initialStateProcessingTimeMs |
Número de milisegundos realizados para procesar todo el estado inicial. |
customMetrics.numValueStateVars |
Número de variables de estado de valor. También está presente para transformWithStateInPandas. |
customMetrics.numListStateVars |
Número de variables de estado de lista. También está presente para transformWithStateInPandas. |
customMetrics.numMapStateVars |
Número de variables de estado de mapa. También está presente para transformWithStateInPandas. |
customMetrics.numDeletedStateVars |
Número de variables de estado eliminadas. También está presente para transformWithStateInPandas. |
customMetrics.timerProcessingTimeMs |
Número de milisegundos que se tardan en procesar todos los temporizadores |
customMetrics.numRegisteredTimers |
Número de temporizadores registrados. También está presente para transformWithStateInPandas. |
customMetrics.numDeletedTimers |
Número de temporizadores eliminados. También está presente para transformWithStateInPandas. |
customMetrics.numExpiredTimers |
Número de temporizadores expirados. También está presente para transformWithStateInPandas. |
customMetrics.numValueStateWithTTLVars |
Número de variables de estado de valor con TTL. También está presente para transformWithStateInPandas. |
customMetrics.numListStateWithTTLVars |
Número de variables de estado de lista con TTL. También está presente para transformWithStateInPandas. |
customMetrics.numMapStateWithTTLVars |
Número de variables de estado de mapa con TTL. También está presente para transformWithStateInPandas. |
customMetrics.numValuesRemovedDueToTTLExpiry |
Número de valores eliminados debido a la expiración de TTL. También está presente para transformWithStateInPandas. |
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry |
Número de valores eliminados de forma incremental debido a la expiración de TTL. |
objeto sources
Tipo de objeto: Array[SourceProgress]
El objeto sources contiene información y métricas para los orígenes de datos de streaming.
| Fields | Description |
|---|---|
description |
Descripción detallada de la tabla de origen de datos de streaming. |
startOffset |
Número de desplazamiento inicial dentro de la tabla de origen de datos en la que se inició el trabajo de streaming. |
endOffset |
El desplazamiento más reciente procesado por el microlote. |
latestOffset |
Desplazamiento más reciente procesado por el microlote. |
numInputRows |
El número de filas de entrada procesadas desde este origen. |
inputRowsPerSecond |
Velocidad, en segundos, a la que llegan los datos para su procesamiento desde este origen. |
processedRowsPerSecond |
La tasa a la que Spark está procesando datos desde este origen. |
metrics |
Escriba: ju.Map[String, String]. Contiene métricas personalizadas para un origen de datos específico. |
Azure Databricks proporciona la siguiente implementación de objetos de orígenes:
Note
En el caso de los campos definidos en el formulario sources.<startOffset / endOffset / latestOffset>.* (o alguna variación), interprete como uno de los campos posibles (hasta estos) 3 posibles, que contienen el campo secundario indicado:
sources.startOffset.<child-field>sources.endOffset.<child-field>sources.latestOffset.<child-field>
Objeto de orígenes de Delta Lake
Definiciones de métricas personalizadas usadas para orígenes de datos de streaming de tablas delta.
| Fields | Description |
|---|---|
sources.description |
La descripción del origen desde el que se lee la consulta de streaming. Por ejemplo: “DeltaSource[table]”. |
sources.<startOffset / endOffset>.sourceVersion |
La versión de la serialización con la que se codifica este desplazamiento. |
sources.<startOffset / endOffset>.reservoirId |
El id. de la tabla que se leerá. Se usa para detectar errores de configuración al reiniciar una consulta. Consulte Asignar identificadores de tabla de métricas de Unity Catalog, Delta Lake y Structured Streaming. |
sources.<startOffset / endOffset>.reservoirVersion |
La versión de la tabla que está procesando actualmente. |
sources.<startOffset / endOffset>.index |
El índice en la secuencia de AddFiles en esta versión. Esto se usa para dividir las confirmaciones grandes en varios lotes. Este índice se crea ordenando en modificationTimestamp y path. |
sources.<startOffset / endOffset>.isStartingVersion |
Identifica si el desplazamiento actual marca el inicio de una nueva consulta de streaming en lugar del procesamiento de los cambios que se produjeron después de procesar los datos iniciales. Al iniciar una nueva consulta, se procesan todos los datos presentes en la tabla al principio y, a continuación, los nuevos datos que hayan llegado. |
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis |
Tiempo de evento registrado para la ordenación de la hora del evento. Hora de evento de los datos de instantánea iniciales que están pendientes de procesarse. Se usa al procesar una instantánea inicial con el orden de tiempo del evento. |
sources.latestOffset |
El desplazamiento más reciente procesado por la consulta del microlote. |
sources.numInputRows |
El número de filas de entrada procesadas desde este origen. |
sources.inputRowsPerSecond |
La tasa a la que llegan los datos para su procesamiento desde este origen. |
sources.processedRowsPerSecond |
La tasa a la que Spark está procesando datos desde este origen. |
sources.metrics.numBytesOutstanding |
El tamaño combinado de los archivos pendientes (archivos rastreados por RocksDB). Esta es la métrica de trabajo pendiente para Delta y el cargador automático como origen de streaming. |
sources.metrics.numFilesOutstanding |
El número de archivos pendientes que se van a procesar. Esta es la métrica de trabajo pendiente para Delta y el cargador automático como origen de streaming. |
Objeto de orígenes de Apache Kafka
Definiciones de métricas personalizadas usadas para orígenes de datos de streaming de Apache Kafka.
| Fields | Description |
|---|---|
sources.description |
Descripción detallada del origen de Kafka, especificando el tema exacto de Kafka que se lee. Por ejemplo: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”. |
sources.startOffset |
El número de desplazamiento inicial en el tema de Kafka en el que se inició el trabajo de streaming. |
sources.endOffset |
El desplazamiento más reciente procesado por el microlote. Esto podría ser igual a latestOffset para una ejecución de microlote en curso. |
sources.latestOffset |
El desplazamiento más reciente calculado por el microlote. Es posible que el proceso de creación de microlotes no procese todos los desplazamientos cuando hay limitación, lo que da como resultado endOffset y latestOffset difiere. |
sources.numInputRows |
El número de filas de entrada procesadas desde este origen. |
sources.inputRowsPerSecond |
La tasa a la que llegan los datos para su procesamiento desde este origen. |
sources.processedRowsPerSecond |
La tasa a la que Spark está procesando datos desde este origen. |
sources.metrics.avgOffsetsBehindLatest |
El promedio del número de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos. |
sources.metrics.estimatedTotalBytesBehindLatest |
El número estimado de bytes que el proceso de consulta no ha consumido de los temas suscritos. |
sources.metrics.maxOffsetsBehindLatest |
El número máximo de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos. |
sources.metrics.minOffsetsBehindLatest |
El número mínimo de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos. |
Métricas de origen del cargador automático
Definiciones de métricas personalizadas usadas para orígenes de datos de streaming de Auto Loader.
| Fields | Description |
|---|---|
sources.<startOffset / endOffset / latestOffset>.seqNum |
Posición actual en la secuencia de archivos que se están procesando en el orden en que se detectaron los archivos. |
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
Versión de implementación del origen cloudFiles. |
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs |
Hora de inicio de la operación de reposición más reciente. |
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs |
Hora de finalización de la operación de reposición más reciente. |
sources.<startOffset / endOffset / latestOffset>.lastInputPath |
La última ruta de acceso de entrada proporcionada por el usuario de la secuencia antes de reiniciar la secuencia. |
sources.metrics.numFilesOutstanding |
Número de archivos en la retrocesión. |
sources.metrics.numBytesOutstanding |
Tamaño (bytes) de los archivos del trabajo pendiente |
sources.metrics.approximateQueueSize |
Tamaño aproximado de la cola de mensajes. Solo cuando la opción cloudFiles.useNotifications está habilitada. |
sources.numInputRows |
El número de filas de entrada procesadas desde este origen. Para el binaryFile formato de origen, numInputRows es igual al número de archivos. |
Métricas de orígenes de PubSub
Definiciones de métricas personalizadas usadas para orígenes de datos de streaming de PubSub. Para más información sobre la supervisión de orígenes de streaming de PubSub, consulte Supervisión de métricas de streaming.
| Fields | Description |
|---|---|
sources.<startOffset / endOffset / latestOffset>.sourceVersion |
La versión de implementación con la que se codifica este desplazamiento. |
sources.<startOffset / endOffset / latestOffset>.seqNum |
Número de secuencia persistente que se está procesando. |
sources.<startOffset / endOffset / latestOffset>.fetchEpoch |
La época de captura más grande que se está procesando. |
sources.metrics.numRecordsReadyToProcess |
Número de registros disponibles para su procesamiento en el trabajo pendiente actual. |
sources.metrics.sizeOfRecordsReadyToProcess |
Tamaño total en bytes, de datos no procesados en el trabajo pendiente actual. |
sources.metrics.numDuplicatesSinceStreamStart |
Recuento total de registros duplicados procesados por la secuencia desde que se inició. |
Métricas de orígenes de Pulsar
Definiciones de métricas personalizadas usadas para orígenes de datos de streaming de Pulsar.
| Fields | Description |
|---|---|
sources.metrics.numInputRows |
Número de filas procesadas en el microproceso actual. |
sources.metrics.numInputBytes |
Número total de bytes procesados en el microproceso actual. |
objeto sink
Tipo de objeto: SinkProgress
| Fields | Description |
|---|---|
sink.description |
Descripción del receptor, que detalla la implementación específica del receptor que se usa. |
sink.numOutputRows |
Número de filas de salida. Los distintos tipos de receptor pueden tener comportamientos o restricciones diferentes para los valores. Consulte los tipos admitidos específicos. |
sink.metrics |
ju.Map[String, String] de métricas de receptor. |
Actualmente, Azure Databricks proporciona dos implementaciones de objetos específicas sink :
| Tipo de sumidero | Details |
|---|---|
| Tabla delta | Vea Objeto receptor delta. |
| Tema de Apache Kafka | Consulte Objeto receptor Kafka. |
El campo sink.metrics se comporta igual para ambas variantes del objeto sink.
Objeto receptor de Delta Lake
| Fields | Description |
|---|---|
sink.description |
Descripción del receptor Delta, que detalla la implementación específica del receptor delta que se usa. Por ejemplo: “DeltaSink[table]”. |
sink.numOutputRows |
El número de filas es siempre -1 porque Spark no puede inferir filas de salida para receptores DSv1, que es la clasificación para el receptor de Delta Lake. |
Objeto receptor de Apache Kafka
| Fields | Description |
|---|---|
sink.description |
La descripción del receptor de Kafka en el que está escribiendo la consulta de streaming, detallando la implementación específica del receptor de Kafka que se está usando. Por ejemplo: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”. |
sink.numOutputRows |
El número de filas escritas en la tabla o receptor de salida como parte del microlote. En algunas situaciones, este valor puede ser "-1" y, por lo general, se puede interpretar como "desconocido". |
Examples
Ejemplo de evento StreamingQueryListener de Kafka a Kafka
{
"id" : "3574feba-646d-4735-83c4-66f657e52517",
"runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
"name" : "STREAMING_QUERY_NAME_UNIQUE",
"timestamp" : "2022-10-31T20:09:30.455Z",
"batchId" : 1377,
"numInputRows" : 687,
"inputRowsPerSecond" : 32.13433743393049,
"processedRowsPerSecond" : 34.067241892293964,
"durationMs" : {
"addBatch" : 18352,
"getBatch" : 0,
"latestOffset" : 31,
"queryPlanning" : 977,
"triggerExecution" : 20165,
"walCommit" : 342
},
"eventTime" : {
"avg" : "2022-10-31T20:09:18.070Z",
"max" : "2022-10-31T20:09:30.125Z",
"min" : "2022-10-31T20:09:09.793Z",
"watermark" : "2022-10-31T20:08:46.355Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 208,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 434,
"numRowsRemoved" : 76,
"allRemovalsTimeMs" : 515,
"commitTimeMs" : 0,
"memoryUsedBytes" : 167069743,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1370,
"SnapshotLastUploaded.partition_1_default" : 1370,
"SnapshotLastUploaded.partition_2_default" : 1362,
"SnapshotLastUploaded.partition_3_default" : 1370,
"SnapshotLastUploaded.partition_4_default" : 1356,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 222,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 165,
"rocksdbReadBlockCacheMissCount" : 41,
"rocksdbSstFileSize" : 232729,
"rocksdbTotalBytesRead" : 12844,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 161238,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "dedupe",
"numRowsTotal" : 2454744,
"numRowsUpdated" : 73,
"allUpdatesTimeMs" : 4155,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 0,
"memoryUsedBytes" : 137765341,
"numRowsDroppedByWatermark" : 34,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 20,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_default" : 1360,
"SnapshotLastUploaded.partition_1_default" : 1360,
"SnapshotLastUploaded.partition_2_default" : 1352,
"SnapshotLastUploaded.partition_3_default" : 1360,
"SnapshotLastUploaded.partition_4_default" : 1346,
"numDroppedDuplicateRows" : 193,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 146,
"rocksdbGetLatency" : 0,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3,
"rocksdbReadBlockCacheMissCount" : 3,
"rocksdbSstFileSize" : 78959140,
"rocksdbTotalBytesRead" : 0,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
}, {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 2583,
"numRowsUpdated" : 682,
"allUpdatesTimeMs" : 9645,
"numRowsRemoved" : 508,
"allRemovalsTimeMs" : 46,
"commitTimeMs" : 21,
"memoryUsedBytes" : 668544484,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 20,
"numStateStoreInstances" : 80,
"customMetrics" : {
"SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
"SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
"SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
"SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
"SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
"rocksdbBytesCopied" : 0,
"rocksdbCommitCheckpointLatency" : 0,
"rocksdbCommitCompactLatency" : 0,
"rocksdbCommitFileSyncLatencyMs" : 0,
"rocksdbCommitFlushLatency" : 0,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 0,
"rocksdbFilesCopied" : 0,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 4218,
"rocksdbGetLatency" : 3,
"rocksdbPutCount" : 0,
"rocksdbPutLatency" : 0,
"rocksdbReadBlockCacheHitCount" : 3425,
"rocksdbReadBlockCacheMissCount" : 149,
"rocksdbSstFileSize" : 742827,
"rocksdbTotalBytesRead" : 866864,
"rocksdbTotalBytesReadByCompaction" : 0,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWritten" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 0,
"rocksdbTotalCompactionLatencyMs" : 0,
"rocksdbTotalFlushLatencyMs" : 0,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 0
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706380
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_A" : {
"0" : 349706672
}
},
"numInputRows" : 292,
"inputRowsPerSecond" : 13.65826278123392,
"processedRowsPerSecond" : 14.479817514628582,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
"startOffset" : {
KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"endOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"latestOffset" : {
"KAFKA_TOPIC_NAME_INPUT_B" : {
"2" : 143147812,
"1" : 129288266,
"0" : 138102966
}
},
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
} ],
"sink" : {
"description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
"numOutputRows" : 76
}
}
Ejemplo de evento StreamingQueryListener de Delta Lake a Delta Lake
{
"id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
"runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
"name" : "silverTransformFromBronze",
"timestamp" : "2022-11-01T18:21:29.500Z",
"batchId" : 4,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"latestOffset" : 62,
"triggerExecution" : 62
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
"startOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
"reservoirVersion" : 3216,
"index" : 3214,
"isStartingVersion" : true
},
"latestOffset" : null,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
"numOutputRows" : -1
}
}
Ejemplo de evento StreamingQueryListener de Kinesis a Delta Lake
{
"id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
"runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
"name" : null,
"timestamp" : "2024-05-14T02:09:20.846Z",
"batchId" : 0,
"batchDuration" : 59322,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"durationMs" : {
"addBatch" : 5397,
"commitBatch" : 4429,
"commitOffsets" : 211,
"getBatch" : 5,
"latestOffset" : 21998,
"queryPlanning" : 12128,
"triggerExecution" : 59313,
"walCommit" : 220
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
"startOffset" : null,
"endOffset" : [ {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000000"
},
"firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
"lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
}, {
"shard" : {
"stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
"shardId" : "shardId-000000000001"
},
"firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
"lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
"closed" : false,
"msBehindLatest" : "0",
"lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
} ],
"latestOffset" : null,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.33714304979602844,
"metrics" : {
"avgMsBehindLatest" : "0.0",
"maxMsBehindLatest" : "0",
"minMsBehindLatest" : "0",
"mode" : "efo",
"numClosedShards" : "0",
"numProcessedBytes" : "30",
"numProcessedRecords" : "18",
"numRegisteredConsumers" : "1",
"numStreams" : "1",
"numTotalShards" : "2",
"totalPrefetchedBytes" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
"numOutputRows" : -1
}
}
Ejemplo de evento StreamingQueryListener de Kafka a Delta Lake
{
"id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"name" : null,
"timestamp" : "2024-05-15T21:57:50.782Z",
"batchId" : 0,
"batchDuration" : 3601,
"numInputRows" : 20,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 5.55401277422938,
"durationMs" : {
"addBatch" : 1544,
"commitBatch" : 686,
"commitOffsets" : 27,
"getBatch" : 12,
"latestOffset" : 577,
"queryPlanning" : 105,
"triggerExecution" : 3600,
"walCommit" : 34
},
"stateOperators" : [ {
"operatorName" : "symmetricHashJoin",
"numRowsTotal" : 20,
"numRowsUpdated" : 20,
"allUpdatesTimeMs" : 473,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 0,
"commitTimeMs" : 277,
"memoryUsedBytes" : 13120,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 5,
"numStateStoreInstances" : 20,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 5280
}
} ],
"sources" : [ {
"description" : "KafkaV2[Subscribe[topic-1]]",
"startOffset" : null,
"endOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"latestOffset" : {
"topic-1" : {
"1" : 5,
"0" : 5
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"avgOffsetsBehindLatest" : "0.0",
"estimatedTotalBytesBehindLatest" : "0.0",
"maxOffsetsBehindLatest" : "0",
"minOffsetsBehindLatest" : "0"
}
}, {
"description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"startOffset" : null,
"endOffset" : {
"sourceVersion" : 1,
"reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
"reservoirVersion" : 1,
"index" : -1,
"isStartingVersion" : false
},
"latestOffset" : null,
"numInputRows" : 10,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 2.77700638711469,
"metrics" : {
"numBytesOutstanding" : "0",
"numFilesOutstanding" : "0"
}
} ],
"sink" : {
"description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
"numOutputRows" : -1
}
}
Ejemplo de origen de tasa para el evento StreamingQueryListener de Delta Lake
{
"id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
"runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
"name" : "dataGen",
"timestamp" : "2022-11-01T18:28:20.332Z",
"batchId" : 279,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884,
"durationMs" : {
"addBatch" : 1771,
"commitOffsets" : 54,
"getBatch" : 0,
"latestOffset" : 0,
"queryPlanning" : 4,
"triggerExecution" : 1887,
"walCommit" : 58
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
"startOffset" : 560,
"endOffset" : 563,
"latestOffset" : 563,
"numInputRows" : 300,
"inputRowsPerSecond" : 114.15525114155251,
"processedRowsPerSecond" : 158.9825119236884
} ],
"sink" : {
"description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
"numOutputRows" : -1
}
}