Compartir a través de


Supervisión de consultas de Structured Streaming en Azure Databricks

Azure Databricks proporciona una supervisión integrada para las aplicaciones de Structured Streaming mediante la interfaz de usuario de Spark de la pestaña Streaming.

Cómo distinguir consultas de Structured Streaming en la UI de Spark

Para distinguir las métricas pertenecen a cada secuencia de manera sencilla mediante la UI de Spark, proporcione un nombre de consulta único a los flujos. Para ello, agregue el elemento .queryName(<query-name>) al código writeStream.

Inserción de métricas de Structured Streaming en servicios externos

Las métricas de streaming se pueden insertar en servicios externos para alertas o casos de uso de paneles mediante la interfaz del cliente de escucha de consultas de streaming de Apache Spark. En Databricks Runtime 11.3 LTS y versiones posteriores, StreamingQueryListener está disponible en Python y Scala.

Important

Las siguientes limitaciones se aplican a las cargas de trabajo mediante los modos de acceso a la computación habilitados para Unity Catalog:

  • StreamingQueryListener requiere Databricks Runtime 15.1 o superior para usar credenciales o interactuar con objetos gestionados por Unity Catalog en cómputo con el modo de acceso dedicado.
  • StreamingQueryListener requiere Databricks Runtime 16.1 o superior para cargas de trabajo de Scala configuradas con el modo de acceso estándar (anteriormente modo de acceso compartido).

Note

La latencia de procesamiento con agentes de escucha puede afectar significativamente a las velocidades de procesamiento de consultas. Se recomienda limitar la lógica de procesamiento en estos agentes de escucha y optar por escribir en sistemas de respuesta rápida como Kafka para mejorar la eficacia.

Si la consulta no tiene datos disponibles en el origen y está esperando nuevos datos, se entrega un onQueryIdle mensaje al agente de escucha de consulta de streaming. Un onQueryProgress mensaje solo se entrega al final del lote de consulta de streaming. Si la consulta ha estado procesando datos durante mucho tiempo, es posible que no se envíen ni los onQueryIdleonQueryProgress eventos, pero la consulta sigue en buen estado y continúa procesando los datos.

En el código siguiente se proporcionan ejemplos básicos de la sintaxis para implementar un cliente de escucha:

Scala

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

val myListener = new StreamingQueryListener {

  /**
    * Called when a query is started.
    * @note This is called synchronously with
    *       [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]].
    *       `onQueryStart` calls on all listeners before
    *       `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
    *        Do not block this method, as it blocks your query.
    */
  def onQueryStarted(event: QueryStartedEvent): Unit = {}

  /**
    * Called when there is some status update (ingestion rate updated, etc.)
    *
    * @note This method is asynchronous. The status in [[StreamingQuery]] returns the
    *       latest status, regardless of when this method is called. The status of [[StreamingQuery]]
    *       may change before or when you process the event. For example, you may find [[StreamingQuery]]
    *       terminates when processing `QueryProgressEvent`.
    */
  def onQueryProgress(event: QueryProgressEvent): Unit = {}

  /**
    * Called when the query is idle and waiting for new data to process.
    */
  def onQueryIdle(event: QueryProgressEvent): Unit = {}

  /**
    * Called when a query is stopped, with or without error.
    */
  def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

Python

class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        """
        Called when a query is started.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryStartedEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This is called synchronously with
        meth:`pyspark.sql.streaming.DataStreamWriter.start`,
        that is, ``onQueryStart`` will be called on all listeners before
        ``DataStreamWriter.start()`` returns the corresponding
        :class:`pyspark.sql.streaming.StreamingQuery`.
        Do not block in this method as it will block your query.
        """
        pass

    def onQueryProgress(self, event):
        """
        Called when there is some status update (ingestion rate updated, etc.)

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryProgressEvent`
            The properties are available as the same as Scala API.

        Notes
        -----
        This method is asynchronous. The status in
        :class:`pyspark.sql.streaming.StreamingQuery` returns the
        most recent status, regardless of when this method is called. The status
        of :class:`pyspark.sql.streaming.StreamingQuery`.
        may change before or when you process the event.
        For example, you may find :class:`StreamingQuery`
        terminates when processing `QueryProgressEvent`.
        """
        pass

    def onQueryIdle(self, event):
        """
        Called when the query is idle and waiting for new data to process.
        """
        pass

    def onQueryTerminated(self, event):
        """
        Called when a query is stopped, with or without error.

        Parameters
        ----------
        event: :class:`pyspark.sql.streaming.listener.QueryTerminatedEvent`
            The properties are available as the same as Scala API.
        """
        pass

my_listener = MyListener()

Definición de métricas observables en Structured Streaming

Las métricas observables son funciones de agregado arbitrarias con nombre que se pueden definir en una consulta (DataFrame). En cuanto la ejecución de un DataFrame alcanza un punto de finalización (es decir, finaliza una consulta por lotes o llega a una época de streaming), se emite un evento con nombre que contiene las métricas de los datos procesados desde el último punto de finalización.

Para observar estas métricas, adjunte un cliente de escucha a la sesión de Spark. El cliente de escucha que debe usar dependerá del modo de ejecución:

  • Modo por lotes: use el cliente QueryExecutionListener.

    El agente QueryExecutionListener recibe una llamada cuando se completa una consulta. Acceda a las métricas mediante el mapa QueryExecution.observedMetrics.

  • Streaming o microlote: use el agente StreamingQueryListener.

    El agente StreamingQueryListener recibe una llamada cuando la consulta de streaming completa una época. Acceda a las métricas mediante el mapa StreamingQueryProgress.observedMetrics. Azure Databricks no admite el modo de desencadenador continuous para streaming.

Por ejemplo:

Scala

// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()

// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    event.progress.observedMetrics.get("my_event").foreach { row =>
      // Trigger if the number of errors exceeds 5 percent
      val num_rows = row.getAs[Long]("rc")
      val num_error_rows = row.getAs[Long]("erc")
      val ratio = num_error_rows.toDouble / num_rows
      if (ratio > 0.05) {
        // Trigger alert
      }
    }
  }
})

Python

# Observe metric
observed_df = df.observe("metric", count(lit(1)).as("cnt"), count(col("error")).as("malformed"))
observed_df.writeStream.format("...").start()

# Define my listener.
class MyListener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print(f"'{event.name}' [{event.id}] got started!")
    def onQueryProgress(self, event):
        row = event.progress.observedMetrics.get("metric")
        if row is not None:
            if row.malformed / row.cnt > 0.5:
                print("ALERT! Ouch! there are too many malformed "
                      f"records {row.malformed} out of {row.cnt}!")
            else:
                print(f"{row.cnt} rows processed!")
    def onQueryTerminated(self, event):
        print(f"{event.id} got terminated!")

# Add my listener.
spark.streams.addListener(MyListener())

Asignar identificadores de tabla de métricas de Unity Catalog, Delta Lake y Structured Streaming

Las métricas de Structured Streaming usan el reservoirId campo en varios lugares para la identidad única de una tabla Delta que se usa como origen para una consulta de streaming.

El campo reservoirId asigna el identificador único almacenado por la tabla Delta en el registro de transacciones de Delta. Este identificador no corresponde al tableId valor asignado por Unity Catalog y mostrado en Catalog Explorer.

Use la sintaxis siguiente para revisar el identificador de tabla de una tabla Delta. Esto funciona para las tablas administradas por Unity Catalog, las tablas externas de Unity Catalog y todas las tablas Delta del metastore de Hive.

DESCRIBE DETAIL <table-name>

El campo id que se muestra en los resultados es el identificador que se asigna a la reservoirId en las métricas de streaming.

Métricas del objeto StreamingQueryListener

Fields Description
id Una id. de consulta único que persiste en los reinicios.
runId Identificador de consulta único para cada inicio o reinicio. Consulte StreamingQuery.runId().
name El nombre especificado por el usuario de la consulta. El nombre es NULL si no se especifica ningún nombre.
timestamp La marca de tiempo para la ejecución del microlote.
batchId Una iId. único del lote actual de datos que se está procesando. En el caso de reintentos tras un error, se puede ejecutar un id. de lote determinado más de una vez. Del mismo modo, cuando no hay datos que se van a procesar, el id. de lote no se incrementa.
batchDuration Duración del procesamiento de una operación por lotes, en milisegundos.
numInputRows El número agregado (en todos los orígenes) de registros procesados en un desencadenador.
inputRowsPerSecond La tasa de agregado (en todos los orígenes) de los datos que llegan.
processedRowsPerSecond La tasa de agregado (en todos los orígenes) a la que Spark está procesando datos.

StreamingQueryListener también define los siguientes campos que contienen objetos que puede examinar para ver las métricas del cliente y los detalles del progreso del origen:

Fields Description
durationMs Escriba: ju.Map[String, JLong]. Vea el objeto durationMs.
eventTime Escriba: ju.Map[String, String]. Vea el objeto eventTime.
stateOperators Escriba: Array[StateOperatorProgress]. Vea el objeto stateOperators.
sources Escriba: Array[SourceProgress]. Vea el objeto sources.
sink Escriba: SinkProgress. Vea el objeto sink.
observedMetrics Escriba: ju.Map[String, Row]. Funciones de agregado arbitrarias con nombre que se pueden definir en un dataframe o consulta (como df.observe).

objeto durationMs

Tipo de objeto: ju.Map[String, JLong]

Información sobre el tiempo necesario para completar varias fases del proceso de ejecución de microlote.

Fields Description
durationMs.addBatch El tiempo necesario para ejecutar el microlote. Esto excluye el tiempo que tarda Spark en planear el microlote.
durationMs.getBatch El tiempo necesario para recuperar los metadatos sobre los desplazamientos del origen.
durationMs.latestOffset El desplazamiento más reciente consumido para el microlote. Este objeto de progreso hace referencia al tiempo necesario para recuperar el desplazamiento más reciente de los orígenes.
durationMs.queryPlanning El tiempo necesario para generar el plan de ejecución.
durationMs.triggerExecution El tiempo necesario para planear y ejecutar el microlote.
durationMs.walCommit El tiempo necesario para confirmar los nuevos desplazamientos disponibles.
durationMs.commitBatch Tiempo necesario para confirmar los datos escritos en el receptor durante addBatch. Solo está presente para los receptores que admiten la confirmación.
durationMs.commitOffsets Tiempo necesario para confirmar el lote en el registro de confirmación.

objeto eventTime

Tipo de objeto: ju.Map[String, String]

Información sobre el valor del tiempo del evento visto dentro de los datos que se procesan en el microlote. Esta marca de agua usa estos datos para averiguar cómo recortar el estado para procesar agregaciones con estado definidas en el trabajo de flujo estructurado.

Fields Description
eventTime.avg El promedio de tiempo del evento visto en el desencadenador.
eventTime.max El tiempo máximo del evento visto en el desencadenador.
eventTime.min El tiempo mínimo del evento visto en el desencadenador.
eventTime.watermark El valor de la marca de agua utilizada en el desencadenador.

objeto stateOperators

Tipo de objeto: Array[StateOperatorProgress] el stateOperators objeto contiene información sobre las operaciones con estado definidas en el trabajo Structured Streaming y las agregaciones que se generan a partir de ellas.

Para obtener más información sobre los operadores de estado de transmisión, consulte ¿Qué es el streaming con estado?.

Fields Description
stateOperators.operatorName Nombre del operador con estado al que se relacionan las métricas, como symmetricHashJoin, dedupeo stateStoreSave.
stateOperators.numRowsTotal El número total de filas en el estado como resultado del operador o agregación con estado.
stateOperators.numRowsUpdated El número total de filas actualizadas en el estado como resultado del operador o agregación con estado.
stateOperators.allUpdatesTimeMs Actualmente, Spark no puede medir esta métrica y está previsto quitarla en futuras actualizaciones.
stateOperators.numRowsRemoved El número total de filas quitadas del estado como resultado del operador o agregación con estado.
stateOperators.allRemovalsTimeMs Actualmente, Spark no puede medir esta métrica y está previsto quitarla en futuras actualizaciones.
stateOperators.commitTimeMs El tiempo necesario para confirmar todas las actualizaciones (colocaciones y eliminaciones) y devolver una nueva versión.
stateOperators.memoryUsedBytes Memoria usada por el almacén de estado.
stateOperators.numRowsDroppedByWatermark El número de filas que se consideran demasiado tardías para incluirse en una agregación con estado. Agregaciones de streaming solo: el número de filas eliminadas después de la agregación (y no las filas de entrada sin procesar). Este número no es preciso, pero proporciona una indicación de que hay datos en tiempo de espera que se quiten.
stateOperators.numShufflePartitions El número de particiones aleatorias para este operador con estado.
stateOperators.numStateStoreInstances La instancia de almacén de estado real que el operador ha inicializado y mantenido. Para muchos operadores con estado, este es el mismo que el número de particiones. Sin embargo, las combinaciones stream-stream inicializan cuatro instancias de almacén de estado por partición.
stateOperators.customMetrics Consulte stateOperators.customMetrics en este tema para obtener más detalles.

Objeto StateOperatorProgress.customMetrics

Tipo de objeto: ju.Map[String, JLong]

StateOperatorProgress tiene un campo , , customMetricsque contiene las métricas específicas de la característica que usa al recopilar esas métricas.

Feature Description
Almacén de estado de RocksDB Métricas del almacén de estado de RocksDB.
Almacén de estado de HDFS Métricas para el almacén de estado de HDFS.
Desduplicación de flujos Métricas para la desduplicación de filas.
Agregación de flujos Métricas para la agregación de filas.
Operador stream join Métricas para el operador de combinación de flujos.
transformWithState Métricas para el operador transformWithState.

Métricas personalizadas del almacén de estado de RocksDB

Información recopilada de RocksDB que captura métricas sobre su rendimiento y operaciones con respecto a los valores con estado que mantiene para el trabajo de flujo estructurado. Para más información, consulte Configuración del almacén de estado de RocksDB en Azure Databricks.

Fields Description
customMetrics.rocksdbBytesCopied El número de bytes copiados según el seguimiento del administrador de archivos de RocksDB.
customMetrics.rocksdbCommitCheckpointLatency El tiempo en milisegundos para tomar una instantánea de RocksDB nativo y escribirlo en un directorio local.
customMetrics.rocksdbCompactLatency El tiempo en milisegundos para realizar la compactación (opcional) durante la confirmación del punto de comprobación.
customMetrics.rocksdbCommitCompactLatency Tiempo de compactación durante la confirmación, en milisegundos.
customMetrics.rocksdbCommitFileSyncLatencyMs El tiempo en milisegundos que sincroniza la instantánea nativa de RocksDB con el almacenamiento externo (la ubicación del punto de control).
customMetrics.rocksdbCommitFlushLatency El tiempo en milisegundos que vacía los cambios en memoria de RocksDB en memoria en el disco local.
customMetrics.rocksdbCommitPauseLatency Tiempo en milisegundos que detiene los subprocesos de trabajo en segundo plano como parte de la confirmación del punto de comprobación, como la compactación.
customMetrics.rocksdbCommitWriteBatchLatency El tiempo en milisegundos aplicando las escrituras almacenadas provisionalmente en la estructura en memoria (WriteBatch) a RocksDB nativa.
customMetrics.rocksdbFilesCopied El número de archivos copiados según el seguimiento del administrador de archivos de RocksDB.
customMetrics.rocksdbFilesReused El número de archivos reutilizados según el seguimiento del administrador de archivos de RocksDB.
customMetrics.rocksdbGetCount El número de llamadas get (no incluye gets de WriteBatch: lote en memoria que se usa para escrituras de almacenamiento provisional).
customMetrics.rocksdbGetLatency El promedio de tiempo en nanosegundos para la llamada nativa RocksDB::Get subyacente.
customMetrics.rocksdbReadBlockCacheHitCount Recuento de aciertos de caché de la caché de bloques en RocksDB.
customMetrics.rocksdbReadBlockCacheMissCount Recuento de errores de caché de bloques en RocksDB.
customMetrics.rocksdbSstFileSize Tamaño de todos los archivos de tabla ordenada estática (SST) en la instancia de RocksDB.
customMetrics.rocksdbTotalBytesRead El número de bytes sin comprimir leídos por operaciones de get.
customMetrics.rocksdbTotalBytesWritten El número total de bytes sin comprimir escritos por operaciones de put.
customMetrics.rocksdbTotalBytesReadThroughIterator El número de bytes totales de los datos sin comprimir leídos mediante un iterador. Algunas operaciones con estado (por ejemplo, el procesamiento de tiempo de espera en FlatMapGroupsWithState y la marca de agua) requieren leer datos en Azure Databricks a través de un iterador.
customMetrics.rocksdbTotalBytesReadByCompaction El número de bytes de lectura del proceso de compactación del disco.
customMetrics.rocksdbTotalBytesWrittenByCompaction El número total de bytes que escribe el proceso de compactación en el disco.
customMetrics.rocksdbTotalCompactionLatencyMs El tiempo en milisegundos para compactaciones de RocksDB, incluidas las compactaciones en segundo plano y la compactación opcional iniciada durante la confirmación.
customMetrics.rocksdbTotalFlushLatencyMs El tiempo total de vaciado, incluido el vaciado en segundo plano. Las operaciones de vaciado son procesos por los que MemTable se vacía en el almacenamiento una vez que está lleno. MemTables son el primer nivel donde se almacenan los datos en RocksDB.
customMetrics.rocksdbZipFileBytesUncompressed Tamaño en bytes de los archivos ZIP sin comprimir, tal como lo informa el Administrador de archivos. El administrador de archivos administra el uso y eliminación del espacio en disco del archivo SST físico.
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> La versión más reciente de la instantánea de RocksDB guardada en la ubicación del punto de comprobación. Un valor de "-1" indica que nunca se ha guardado ninguna instantánea. Dado que las instantáneas son específicas de cada instancia de almacén de estado, esta métrica se aplica a un identificador de partición y un nombre de almacén de estado concretos.
customMetrics.rocksdbPutLatency Latencia total de llamadas put.
customMetrics.rocksdbPutCount Número de llamadas put.
customMetrics.rocksdbWriterStallLatencyMs Tiempo de espera del escritor para que finalice la compactación o vaciado.
customMetrics.rocksdbTotalBytesWrittenByFlush Bytes totales escritos por vaciado
customMetrics.rocksdbPinnedBlocksMemoryUsage Uso de memoria para bloques anclados
customMetrics.rocksdbNumInternalColFamiliesKeys Número de claves internas para familias de columnas internas
customMetrics.rocksdbNumExternalColumnFamilies Número de familias de columnas externas
customMetrics.rocksdbNumInternalColumnFamilies Número de familias de columnas internas

Métricas personalizadas del almacén de estado de HDFS

Información recopilada sobre los comportamientos y las operaciones del proveedor del almacén de estado de HDFS.

Fields Description
customMetrics.stateOnCurrentVersionSizeBytes Tamaño estimado de estado solo en la versión actual.
customMetrics.loadedMapCacheHitCount Recuento de aciertos de caché en estados almacenados en caché en el proveedor.
customMetrics.loadedMapCacheMissCount Recuento de fallos de caché en estados almacenados en caché en el proveedor.
customMetrics.SnapshotLastUploaded.partition_<partition-id>_<state-store-name> La última versión cargada de la instantánea para una instancia de almacén de estado específica.

Métricas personalizadas de desduplicación

Información recopilada sobre los comportamientos y las operaciones de desduplicación.

Fields Description
customMetrics.numDroppedDuplicateRows Número de filas duplicadas eliminadas.
customMetrics.numRowsReadDuringEviction Número de filas de estado leídas durante la expulsión de estado.

Métricas personalizadas de agregación

Información recopilada sobre los comportamientos y las operaciones de agregación.

Fields Description
customMetrics.numRowsReadDuringEviction Número de filas de estado leídas durante la expulsión de estado.

Métricas personalizadas de unión a secuencias

Información recopilada sobre los comportamientos y las operaciones de unión de secuencias.

Fields Description
customMetrics.skippedNullValueCount Número de valores null omitidos, cuando spark.sql.streaming.stateStore.skipNullsForStreamStreamJoins.enabled se establece en true.

métricas personalizadas de transformWithState

Información recopilada sobre los comportamientos y las operaciones de transformWithState (TWS). Para obtener más información sobre transformWithState, consulte Compilación de una aplicación con estado personalizada.

Fields Description
customMetrics.initialStateProcessingTimeMs Número de milisegundos realizados para procesar todo el estado inicial.
customMetrics.numValueStateVars Número de variables de estado de valor. También está presente para transformWithStateInPandas.
customMetrics.numListStateVars Número de variables de estado de lista. También está presente para transformWithStateInPandas.
customMetrics.numMapStateVars Número de variables de estado de mapa. También está presente para transformWithStateInPandas.
customMetrics.numDeletedStateVars Número de variables de estado eliminadas. También está presente para transformWithStateInPandas.
customMetrics.timerProcessingTimeMs Número de milisegundos que se tardan en procesar todos los temporizadores
customMetrics.numRegisteredTimers Número de temporizadores registrados. También está presente para transformWithStateInPandas.
customMetrics.numDeletedTimers Número de temporizadores eliminados. También está presente para transformWithStateInPandas.
customMetrics.numExpiredTimers Número de temporizadores expirados. También está presente para transformWithStateInPandas.
customMetrics.numValueStateWithTTLVars Número de variables de estado de valor con TTL. También está presente para transformWithStateInPandas.
customMetrics.numListStateWithTTLVars Número de variables de estado de lista con TTL. También está presente para transformWithStateInPandas.
customMetrics.numMapStateWithTTLVars Número de variables de estado de mapa con TTL. También está presente para transformWithStateInPandas.
customMetrics.numValuesRemovedDueToTTLExpiry Número de valores eliminados debido a la expiración de TTL. También está presente para transformWithStateInPandas.
customMetrics.numValuesIncrementallyRemovedDueToTTLExpiry Número de valores eliminados de forma incremental debido a la expiración de TTL.

objeto sources

Tipo de objeto: Array[SourceProgress]

El objeto sources contiene información y métricas para los orígenes de datos de streaming.

Fields Description
description Descripción detallada de la tabla de origen de datos de streaming.
startOffset Número de desplazamiento inicial dentro de la tabla de origen de datos en la que se inició el trabajo de streaming.
endOffset El desplazamiento más reciente procesado por el microlote.
latestOffset Desplazamiento más reciente procesado por el microlote.
numInputRows El número de filas de entrada procesadas desde este origen.
inputRowsPerSecond Velocidad, en segundos, a la que llegan los datos para su procesamiento desde este origen.
processedRowsPerSecond La tasa a la que Spark está procesando datos desde este origen.
metrics Escriba: ju.Map[String, String]. Contiene métricas personalizadas para un origen de datos específico.

Azure Databricks proporciona la siguiente implementación de objetos de orígenes:

Note

En el caso de los campos definidos en el formulario sources.<startOffset / endOffset / latestOffset>.* (o alguna variación), interprete como uno de los campos posibles (hasta estos) 3 posibles, que contienen el campo secundario indicado:

  • sources.startOffset.<child-field>
  • sources.endOffset.<child-field>
  • sources.latestOffset.<child-field>

Objeto de orígenes de Delta Lake

Definiciones de métricas personalizadas usadas para orígenes de datos de streaming de tablas delta.

Fields Description
sources.description La descripción del origen desde el que se lee la consulta de streaming. Por ejemplo: “DeltaSource[table]”.
sources.<startOffset / endOffset>.sourceVersion La versión de la serialización con la que se codifica este desplazamiento.
sources.<startOffset / endOffset>.reservoirId El id. de la tabla que se leerá. Se usa para detectar errores de configuración al reiniciar una consulta. Consulte Asignar identificadores de tabla de métricas de Unity Catalog, Delta Lake y Structured Streaming.
sources.<startOffset / endOffset>.reservoirVersion La versión de la tabla que está procesando actualmente.
sources.<startOffset / endOffset>.index El índice en la secuencia de AddFiles en esta versión. Esto se usa para dividir las confirmaciones grandes en varios lotes. Este índice se crea ordenando en modificationTimestamp y path.
sources.<startOffset / endOffset>.isStartingVersion Identifica si el desplazamiento actual marca el inicio de una nueva consulta de streaming en lugar del procesamiento de los cambios que se produjeron después de procesar los datos iniciales. Al iniciar una nueva consulta, se procesan todos los datos presentes en la tabla al principio y, a continuación, los nuevos datos que hayan llegado.
sources.<startOffset / endOffset / latestOffset>.eventTimeMillis Tiempo de evento registrado para la ordenación de la hora del evento. Hora de evento de los datos de instantánea iniciales que están pendientes de procesarse. Se usa al procesar una instantánea inicial con el orden de tiempo del evento.
sources.latestOffset El desplazamiento más reciente procesado por la consulta del microlote.
sources.numInputRows El número de filas de entrada procesadas desde este origen.
sources.inputRowsPerSecond La tasa a la que llegan los datos para su procesamiento desde este origen.
sources.processedRowsPerSecond La tasa a la que Spark está procesando datos desde este origen.
sources.metrics.numBytesOutstanding El tamaño combinado de los archivos pendientes (archivos rastreados por RocksDB). Esta es la métrica de trabajo pendiente para Delta y el cargador automático como origen de streaming.
sources.metrics.numFilesOutstanding El número de archivos pendientes que se van a procesar. Esta es la métrica de trabajo pendiente para Delta y el cargador automático como origen de streaming.

Objeto de orígenes de Apache Kafka

Definiciones de métricas personalizadas usadas para orígenes de datos de streaming de Apache Kafka.

Fields Description
sources.description Descripción detallada del origen de Kafka, especificando el tema exacto de Kafka que se lee. Por ejemplo: “KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]”.
sources.startOffset El número de desplazamiento inicial en el tema de Kafka en el que se inició el trabajo de streaming.
sources.endOffset El desplazamiento más reciente procesado por el microlote. Esto podría ser igual a latestOffset para una ejecución de microlote en curso.
sources.latestOffset El desplazamiento más reciente calculado por el microlote. Es posible que el proceso de creación de microlotes no procese todos los desplazamientos cuando hay limitación, lo que da como resultado endOffset y latestOffset difiere.
sources.numInputRows El número de filas de entrada procesadas desde este origen.
sources.inputRowsPerSecond La tasa a la que llegan los datos para su procesamiento desde este origen.
sources.processedRowsPerSecond La tasa a la que Spark está procesando datos desde este origen.
sources.metrics.avgOffsetsBehindLatest El promedio del número de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos.
sources.metrics.estimatedTotalBytesBehindLatest El número estimado de bytes que el proceso de consulta no ha consumido de los temas suscritos.
sources.metrics.maxOffsetsBehindLatest El número máximo de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos.
sources.metrics.minOffsetsBehindLatest El número mínimo de desplazamientos que la consulta de streaming está detrás del desplazamiento disponible más reciente entre todos los temas suscritos.

Métricas de origen del cargador automático

Definiciones de métricas personalizadas usadas para orígenes de datos de streaming de Auto Loader.

Fields Description
sources.<startOffset / endOffset / latestOffset>.seqNum Posición actual en la secuencia de archivos que se están procesando en el orden en que se detectaron los archivos.
sources.<startOffset / endOffset / latestOffset>.sourceVersion Versión de implementación del origen cloudFiles.
sources.<startOffset / endOffset / latestOffset>.lastBackfillStartTimeMs Hora de inicio de la operación de reposición más reciente.
sources.<startOffset / endOffset / latestOffset>.lastBackfillFinishTimeMs Hora de finalización de la operación de reposición más reciente.
sources.<startOffset / endOffset / latestOffset>.lastInputPath La última ruta de acceso de entrada proporcionada por el usuario de la secuencia antes de reiniciar la secuencia.
sources.metrics.numFilesOutstanding Número de archivos en la retrocesión.
sources.metrics.numBytesOutstanding Tamaño (bytes) de los archivos del trabajo pendiente
sources.metrics.approximateQueueSize Tamaño aproximado de la cola de mensajes. Solo cuando la opción cloudFiles.useNotifications está habilitada.
sources.numInputRows El número de filas de entrada procesadas desde este origen. Para el binaryFile formato de origen, numInputRows es igual al número de archivos.

Métricas de orígenes de PubSub

Definiciones de métricas personalizadas usadas para orígenes de datos de streaming de PubSub. Para más información sobre la supervisión de orígenes de streaming de PubSub, consulte Supervisión de métricas de streaming.

Fields Description
sources.<startOffset / endOffset / latestOffset>.sourceVersion La versión de implementación con la que se codifica este desplazamiento.
sources.<startOffset / endOffset / latestOffset>.seqNum Número de secuencia persistente que se está procesando.
sources.<startOffset / endOffset / latestOffset>.fetchEpoch La época de captura más grande que se está procesando.
sources.metrics.numRecordsReadyToProcess Número de registros disponibles para su procesamiento en el trabajo pendiente actual.
sources.metrics.sizeOfRecordsReadyToProcess Tamaño total en bytes, de datos no procesados en el trabajo pendiente actual.
sources.metrics.numDuplicatesSinceStreamStart Recuento total de registros duplicados procesados por la secuencia desde que se inició.

Métricas de orígenes de Pulsar

Definiciones de métricas personalizadas usadas para orígenes de datos de streaming de Pulsar.

Fields Description
sources.metrics.numInputRows Número de filas procesadas en el microproceso actual.
sources.metrics.numInputBytes Número total de bytes procesados en el microproceso actual.

objeto sink

Tipo de objeto: SinkProgress

Fields Description
sink.description Descripción del receptor, que detalla la implementación específica del receptor que se usa.
sink.numOutputRows Número de filas de salida. Los distintos tipos de receptor pueden tener comportamientos o restricciones diferentes para los valores. Consulte los tipos admitidos específicos.
sink.metrics ju.Map[String, String] de métricas de receptor.

Actualmente, Azure Databricks proporciona dos implementaciones de objetos específicas sink :

Tipo de sumidero Details
Tabla delta Vea Objeto receptor delta.
Tema de Apache Kafka Consulte Objeto receptor Kafka.

El campo sink.metrics se comporta igual para ambas variantes del objeto sink.

Objeto receptor de Delta Lake

Fields Description
sink.description Descripción del receptor Delta, que detalla la implementación específica del receptor delta que se usa. Por ejemplo: “DeltaSink[table]”.
sink.numOutputRows El número de filas es siempre -1 porque Spark no puede inferir filas de salida para receptores DSv1, que es la clasificación para el receptor de Delta Lake.

Objeto receptor de Apache Kafka

Fields Description
sink.description La descripción del receptor de Kafka en el que está escribiendo la consulta de streaming, detallando la implementación específica del receptor de Kafka que se está usando. Por ejemplo: “org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100”.
sink.numOutputRows El número de filas escritas en la tabla o receptor de salida como parte del microlote. En algunas situaciones, este valor puede ser "-1" y, por lo general, se puede interpretar como "desconocido".

Examples

Ejemplo de evento StreamingQueryListener de Kafka a Kafka

{
  "id" : "3574feba-646d-4735-83c4-66f657e52517",
  "runId" : "38a78903-9e55-4440-ad81-50b591e4746c",
  "name" : "STREAMING_QUERY_NAME_UNIQUE",
  "timestamp" : "2022-10-31T20:09:30.455Z",
  "batchId" : 1377,
  "numInputRows" : 687,
  "inputRowsPerSecond" : 32.13433743393049,
  "processedRowsPerSecond" : 34.067241892293964,
  "durationMs" : {
    "addBatch" : 18352,
    "getBatch" : 0,
    "latestOffset" : 31,
    "queryPlanning" : 977,
    "triggerExecution" : 20165,
    "walCommit" : 342
  },
  "eventTime" : {
    "avg" : "2022-10-31T20:09:18.070Z",
    "max" : "2022-10-31T20:09:30.125Z",
    "min" : "2022-10-31T20:09:09.793Z",
    "watermark" : "2022-10-31T20:08:46.355Z"
  },
  "stateOperators" : [ {
    "operatorName" : "stateStoreSave",
    "numRowsTotal" : 208,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 434,
    "numRowsRemoved" : 76,
    "allRemovalsTimeMs" : 515,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 167069743,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1370,
      "SnapshotLastUploaded.partition_1_default" : 1370,
      "SnapshotLastUploaded.partition_2_default" : 1362,
      "SnapshotLastUploaded.partition_3_default" : 1370,
      "SnapshotLastUploaded.partition_4_default" : 1356,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 222,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 165,
      "rocksdbReadBlockCacheMissCount" : 41,
      "rocksdbSstFileSize" : 232729,
      "rocksdbTotalBytesRead" : 12844,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 161238,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "dedupe",
    "numRowsTotal" : 2454744,
    "numRowsUpdated" : 73,
    "allUpdatesTimeMs" : 4155,
    "numRowsRemoved" : 0,
    "allRemovalsTimeMs" : 0,
    "commitTimeMs" : 0,
    "memoryUsedBytes" : 137765341,
    "numRowsDroppedByWatermark" : 34,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 20,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_default" : 1360,
      "SnapshotLastUploaded.partition_1_default" : 1360,
      "SnapshotLastUploaded.partition_2_default" : 1352,
      "SnapshotLastUploaded.partition_3_default" : 1360,
      "SnapshotLastUploaded.partition_4_default" : 1346,
      "numDroppedDuplicateRows" : 193,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 146,
      "rocksdbGetLatency" : 0,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3,
      "rocksdbReadBlockCacheMissCount" : 3,
      "rocksdbSstFileSize" : 78959140,
      "rocksdbTotalBytesRead" : 0,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  }, {
    "operatorName" : "symmetricHashJoin",
    "numRowsTotal" : 2583,
    "numRowsUpdated" : 682,
    "allUpdatesTimeMs" : 9645,
    "numRowsRemoved" : 508,
    "allRemovalsTimeMs" : 46,
    "commitTimeMs" : 21,
    "memoryUsedBytes" : 668544484,
    "numRowsDroppedByWatermark" : 0,
    "numShufflePartitions" : 20,
    "numStateStoreInstances" : 80,
    "customMetrics" : {
      "SnapshotLastUploaded.partition_0_left-keyToNumValues" : 1310,
      "SnapshotLastUploaded.partition_1_left-keyWithIndexToValue" : 1318,
      "SnapshotLastUploaded.partition_2_left-keyToNumValues" : 1305,
      "SnapshotLastUploaded.partition_2_right-keyWithIndexToValue" : 1306,
      "SnapshotLastUploaded.partition_4_left-keyWithIndexToValue" : 1310,
      "rocksdbBytesCopied" : 0,
      "rocksdbCommitCheckpointLatency" : 0,
      "rocksdbCommitCompactLatency" : 0,
      "rocksdbCommitFileSyncLatencyMs" : 0,
      "rocksdbCommitFlushLatency" : 0,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 0,
      "rocksdbFilesCopied" : 0,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 4218,
      "rocksdbGetLatency" : 3,
      "rocksdbPutCount" : 0,
      "rocksdbPutLatency" : 0,
      "rocksdbReadBlockCacheHitCount" : 3425,
      "rocksdbReadBlockCacheMissCount" : 149,
      "rocksdbSstFileSize" : 742827,
      "rocksdbTotalBytesRead" : 866864,
      "rocksdbTotalBytesReadByCompaction" : 0,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWritten" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 0,
      "rocksdbTotalCompactionLatencyMs" : 0,
      "rocksdbTotalFlushLatencyMs" : 0,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 0
    }
  } ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
    "startOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706380
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_A" : {
        "0" : 349706672
      }
    },
    "numInputRows" : 292,
    "inputRowsPerSecond" : 13.65826278123392,
    "processedRowsPerSecond" : 14.479817514628582,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "estimatedTotalBytesBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  }, {
    "description" : "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_B]]",
    "startOffset" : {
      KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "endOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "latestOffset" : {
      "KAFKA_TOPIC_NAME_INPUT_B" : {
        "2" : 143147812,
        "1" : 129288266,
        "0" : 138102966
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "avgOffsetsBehindLatest" : "0.0",
      "maxOffsetsBehindLatest" : "0",
      "minOffsetsBehindLatest" : "0"
    }
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@e04b100",
    "numOutputRows" : 76
  }
}

Ejemplo de evento StreamingQueryListener de Delta Lake a Delta Lake

{
  "id" : "aeb6bc0f-3f7d-4928-a078-ba2b304e2eaf",
  "runId" : "35d751d9-2d7c-4338-b3de-6c6ae9ebcfc2",
  "name" : "silverTransformFromBronze",
  "timestamp" : "2022-11-01T18:21:29.500Z",
  "batchId" : 4,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "latestOffset" : 62,
    "triggerExecution" : 62
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "DeltaSource[dbfs:/FileStore/max.fisher@databricks.com/ctc/stateful-trade-analysis-demo/table]",
    "startOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "endOffset" : {
      "sourceVersion" : 1,
      "reservoirId" : "84590dac-da51-4e0f-8eda-6620198651a9",
      "reservoirVersion" : 3216,
      "index" : 3214,
      "isStartingVersion" : true
    },
    "latestOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_silver_delta_demo2]",
    "numOutputRows" : -1
  }
}

Ejemplo de evento StreamingQueryListener de Kinesis a Delta Lake

{
  "id" : "3ce9bd93-da16-4cb3-a3b6-e97a592783b5",
  "runId" : "fe4a6bda-dda2-4067-805d-51260d93260b",
  "name" : null,
  "timestamp" : "2024-05-14T02:09:20.846Z",
  "batchId" : 0,
  "batchDuration" : 59322,
  "numInputRows" : 20,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.33714304979602844,
  "durationMs" : {
    "addBatch" : 5397,
    "commitBatch" : 4429,
    "commitOffsets" : 211,
    "getBatch" : 5,
    "latestOffset" : 21998,
    "queryPlanning" : 12128,
    "triggerExecution" : 59313,
    "walCommit" : 220
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KinesisV2[KinesisTestUtils-7199466178786508570-at-1715652545256]",
    "startOffset" : null,
    "endOffset" : [ {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000000"
      },
      "firstSeqNum" : "49652022592149344892294981243280420130985816456924495874",
      "lastSeqNum" : "49652022592149344892294981243290091537542733559041622018",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592149344892294981243290091537542733559041622018"
    }, {
      "shard" : {
        "stream" : "KinesisTestUtils-7199466178786508570-at-1715652545256",
        "shardId" : "shardId-000000000001"
      },
      "firstSeqNum" : "49652022592171645637493511866421955849258464818430476306",
      "lastSeqNum" : "49652022592171645637493511866434045107454611178897014802",
      "closed" : false,
      "msBehindLatest" : "0",
      "lastRecordSeqNum" : "49652022592171645637493511866434045107454611178897014802"
    } ],
    "latestOffset" : null,
    "numInputRows" : 20,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.33714304979602844,
    "metrics" : {
      "avgMsBehindLatest" : "0.0",
      "maxMsBehindLatest" : "0",
      "minMsBehindLatest" : "0",
      "mode" : "efo",
      "numClosedShards" : "0",
      "numProcessedBytes" : "30",
      "numProcessedRecords" : "18",
      "numRegisteredConsumers" : "1",
      "numStreams" : "1",
      "numTotalShards" : "2",
      "totalPrefetchedBytes" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/streaming/test/KinesisToDeltaServerlessLiteSuite/2024-05-14-01-58-14-76eb7e51/56b9426c-3492-4ac5-8fe8-3d00efe20be5/deltaTable]",
    "numOutputRows" : -1
  }
}

Ejemplo de evento StreamingQueryListener de Kafka a Delta Lake

{
 "id" : "210f4746-7caa-4a51-bd08-87cabb45bdbe",
 "runId" : "42a2f990-c463-4a9c-9aae-95d6990e63f4",
 "name" : null,
 "timestamp" : "2024-05-15T21:57:50.782Z",
 "batchId" : 0,
 "batchDuration" : 3601,
 "numInputRows" : 20,
 "inputRowsPerSecond" : 0.0,
 "processedRowsPerSecond" : 5.55401277422938,
 "durationMs" : {
  "addBatch" : 1544,
  "commitBatch" : 686,
  "commitOffsets" : 27,
  "getBatch" : 12,
  "latestOffset" : 577,
  "queryPlanning" : 105,
  "triggerExecution" : 3600,
  "walCommit" : 34
 },
 "stateOperators" : [ {
  "operatorName" : "symmetricHashJoin",
  "numRowsTotal" : 20,
  "numRowsUpdated" : 20,
  "allUpdatesTimeMs" : 473,
  "numRowsRemoved" : 0,
  "allRemovalsTimeMs" : 0,
  "commitTimeMs" : 277,
  "memoryUsedBytes" : 13120,
  "numRowsDroppedByWatermark" : 0,
  "numShufflePartitions" : 5,
  "numStateStoreInstances" : 20,
  "customMetrics" : {
   "loadedMapCacheHitCount" : 0,
   "loadedMapCacheMissCount" : 0,
   "stateOnCurrentVersionSizeBytes" : 5280
  }
 } ],
 "sources" : [ {
  "description" : "KafkaV2[Subscribe[topic-1]]",
  "startOffset" : null,
  "endOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "latestOffset" : {
   "topic-1" : {
    "1" : 5,
    "0" : 5
   }
  },
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "avgOffsetsBehindLatest" : "0.0",
   "estimatedTotalBytesBehindLatest" : "0.0",
   "maxOffsetsBehindLatest" : "0",
   "minOffsetsBehindLatest" : "0"
  }
 }, {
  "description" : "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
  "startOffset" : null,
  "endOffset" : {
   "sourceVersion" : 1,
   "reservoirId" : "b207a1cd-0fbe-4652-9c8f-e5cc467ae84f",
   "reservoirVersion" : 1,
   "index" : -1,
   "isStartingVersion" : false
  },
  "latestOffset" : null,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 2.77700638711469,
  "metrics" : {
   "numBytesOutstanding" : "0",
   "numFilesOutstanding" : "0"
  }
 } ],
 "sink" : {
  "description" : "DeltaSink[/tmp/spark-d445c92a-4640-4827-a9bd-47246a30bb04]",
  "numOutputRows" : -1
 }
}

Ejemplo de origen de tasa para el evento StreamingQueryListener de Delta Lake

{
  "id" : "912ebdc1-edf2-48ec-b9fb-1a9b67dd2d9e",
  "runId" : "85de73a5-92cc-4b7f-9350-f8635b0cf66e",
  "name" : "dataGen",
  "timestamp" : "2022-11-01T18:28:20.332Z",
  "batchId" : 279,
  "numInputRows" : 300,
  "inputRowsPerSecond" : 114.15525114155251,
  "processedRowsPerSecond" : 158.9825119236884,
  "durationMs" : {
    "addBatch" : 1771,
    "commitOffsets" : 54,
    "getBatch" : 0,
    "latestOffset" : 0,
    "queryPlanning" : 4,
    "triggerExecution" : 1887,
    "walCommit" : 58
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "RateStreamV2[rowsPerSecond=100, rampUpTimeSeconds=0, numPartitions=default",
    "startOffset" : 560,
    "endOffset" : 563,
    "latestOffset" : 563,
    "numInputRows" : 300,
    "inputRowsPerSecond" : 114.15525114155251,
    "processedRowsPerSecond" : 158.9825119236884
  } ],
  "sink" : {
    "description" : "DeltaSink[dbfs:/user/hive/warehouse/maxfisher.db/trade_history_bronze_delta_demo]",
    "numOutputRows" : -1
  }
}