Partilhar via


read_pulsar Função de streaming com valor de tabela

Aplica-se a:Verificado Sim Databricks SQL Verificado Sim Databricks Runtime 14.1 ou superior

Importante

Esta funcionalidade está em Pré-visualização Pública.

Retorna uma tabela com registros lidos do Pulsar.

Esta função com valor de tabela suporta apenas streaming e não consulta em lote.

Sintaxe

read_pulsar ( { option_key => option_value } [, ...] )

Argumentos

Esta função requer a invocação de parâmetros nomeados para as chaves de opções.

As opções serviceUrl e topic são obrigatórias.

As descrições dos argumentos são breves aqui. Consulte a documentação estruturada do Pulsar de streaming para obter descrições estendidas.

Opção Tipo Predefinido Descrição
URL do serviço STRING Obrigatório O URI do serviço Pulsar.
tópico STRING Obrigatório O tópico sobre o qual ler.
subscrição predefinida STRING Nenhuma O nome de assinatura predefinido usado pelo conector para acompanhar o progresso da aplicação spark.
subscriptionPrefix STRING Nenhuma Um prefixo usado pelo conector para gerar uma assinatura aleatória para acompanhar o progresso da aplicação Spark.
pollTimeoutMs LONGO 120000 O tempo limite para ler mensagens do Pulsar em milissegundos.
falhaEmPerdaDeDados BOOLEANO verdadeiro Controla se uma consulta deve ser falhada quando os dados são perdidos (por exemplo, tópicos são excluídos ou mensagens são excluídas devido à política de retenção).
iniciandoOffsets STRING mais recente O ponto de partida quando uma consulta é iniciada, seja mais antigo, mais recente ou uma cadeia de caracteres JSON que especifica um offset específico. Se for o mais recente, o leitor lê os registos mais novos depois de iniciar. Se mais cedo, o leitor lê desde o primeiro offset. O usuário também pode especificar uma cadeia de caracteres JSON que especifica um deslocamento específico.
hora de início STRING Nenhuma Quando especificado, a fonte Pulsar lerá mensagens a partir da posição da hora de início especificada.

Os seguintes argumentos são usados para autenticação do cliente pulsar:

Opção Tipo Predefinido Descrição
pulsarClientAuthPluginClassName STRING Nenhuma Nome do plug-in de autenticação.
pulsarClientAuthParams STRING Nenhuma Parâmetros para o plug-in de autenticação.
pulsarClientUseKeyStoreTls STRING Nenhuma Se deve usar KeyStore para autenticação tls.
pulsarClientTlsTrustStoreType STRING Nenhuma Tipo de ficheiro TrustStore para autenticação TLS.
pulsarClientTlsTrustStorePath STRING Nenhuma Caminho do ficheiro TrustStore para autenticação TLS.
pulsarClientTlsTrustStorePassword STRING Nenhuma Senha para a TrustStore para autenticação TLS.

Esses argumentos são usados para a configuração e autenticação do controle de admissão do pulsar. A configuração do admin do pulsar é necessária apenas quando o controle de admissão está ativado (quando maxBytesPerTrigger é definido).

Opção Tipo Predefinido Descrição
maxBytesPerTrigger BIGINT Nenhuma Um limite suave do número máximo de bytes que queremos processar por microlote. Se isso for especificado, admin.url também precisa ser especificado.
adminUrl STRING Nenhuma A configuração do URL do serviço Pulsar. Só é necessário quando maxBytesPerTrigger é especificado.
pulsarAdminAuthPlugin STRING Nenhuma Nome do plug-in de autenticação.
pulsarAdminAuthParams STRING Nenhuma Parâmetros para o plug-in de autenticação.
pulsarClientUseKeyStoreTls STRING Nenhuma Se deve usar KeyStore para autenticação tls.
pulsarAdminTlsTrustStoreType STRING Nenhuma Tipo de ficheiro TrustStore para autenticação TLS.
pulsarAdminTlsTrustStorePath STRING Nenhuma Caminho do ficheiro TrustStore para autenticação TLS.
pulsarAdminTlsTrustStorePassword STRING Nenhuma Senha para a TrustStore para autenticação TLS.

Devoluções

Uma tabela de registos de pulsar com o esquema a seguir.

  • __key STRING NOT NULL: Chave de mensagem do Pulsar.

  • value BINARY NOT NULL: Valor da mensagem do Pulsar.

    Nota: Para tópicos com esquema Avro ou JSON, em vez de carregar conteúdo em um campo de valor binário, o conteúdo será expandido para preservar os nomes de campo e os tipos de campo do tópico Pulsar.

  • __topic STRING NOT NULL: Nome do tópico pulsar.

  • __messageId BINARY NOT NULL: ID da mensagem pulsar.

  • __publishTime TIMESTAMP NOT NULL: Tempo de publicação da mensagem Pulsar.

  • __eventTime TIMESTAMP NOT NULL: Hora do evento de mensagem Pulsar.

  • __messageProperties MAP<STRING, STRING>: Propriedades da mensagem Pulsar.

Exemplos

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.