Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Aplica-se a:
Databricks SQL
Databricks Runtime 14.1 ou superior
Importante
Esta funcionalidade está em Pré-visualização Pública.
Retorna uma tabela com registros lidos do Pulsar.
Esta função com valor de tabela suporta apenas streaming e não consulta em lote.
Sintaxe
read_pulsar ( { option_key => option_value } [, ...] )
Argumentos
Esta função requer a invocação de parâmetros nomeados para as chaves de opções.
As opções serviceUrl e topic são obrigatórias.
As descrições dos argumentos são breves aqui. Consulte a documentação estruturada do Pulsar de streaming para obter descrições estendidas.
| Opção | Tipo | Predefinido | Descrição |
|---|---|---|---|
| URL do serviço | STRING | Obrigatório | O URI do serviço Pulsar. |
| tópico | STRING | Obrigatório | O tópico sobre o qual ler. |
| subscrição predefinida | STRING | Nenhuma | O nome de assinatura predefinido usado pelo conector para acompanhar o progresso da aplicação spark. |
| subscriptionPrefix | STRING | Nenhuma | Um prefixo usado pelo conector para gerar uma assinatura aleatória para acompanhar o progresso da aplicação Spark. |
| pollTimeoutMs | LONGO | 120000 | O tempo limite para ler mensagens do Pulsar em milissegundos. |
| falhaEmPerdaDeDados | BOOLEANO | verdadeiro | Controla se uma consulta deve ser falhada quando os dados são perdidos (por exemplo, tópicos são excluídos ou mensagens são excluídas devido à política de retenção). |
| iniciandoOffsets | STRING | mais recente | O ponto de partida quando uma consulta é iniciada, seja mais antigo, mais recente ou uma cadeia de caracteres JSON que especifica um offset específico. Se for o mais recente, o leitor lê os registos mais novos depois de iniciar. Se mais cedo, o leitor lê desde o primeiro offset. O usuário também pode especificar uma cadeia de caracteres JSON que especifica um deslocamento específico. |
| hora de início | STRING | Nenhuma | Quando especificado, a fonte Pulsar lerá mensagens a partir da posição da hora de início especificada. |
Os seguintes argumentos são usados para autenticação do cliente pulsar:
| Opção | Tipo | Predefinido | Descrição |
|---|---|---|---|
| pulsarClientAuthPluginClassName | STRING | Nenhuma | Nome do plug-in de autenticação. |
| pulsarClientAuthParams | STRING | Nenhuma | Parâmetros para o plug-in de autenticação. |
| pulsarClientUseKeyStoreTls | STRING | Nenhuma | Se deve usar KeyStore para autenticação tls. |
| pulsarClientTlsTrustStoreType | STRING | Nenhuma | Tipo de ficheiro TrustStore para autenticação TLS. |
| pulsarClientTlsTrustStorePath | STRING | Nenhuma | Caminho do ficheiro TrustStore para autenticação TLS. |
| pulsarClientTlsTrustStorePassword | STRING | Nenhuma | Senha para a TrustStore para autenticação TLS. |
Esses argumentos são usados para a configuração e autenticação do controle de admissão do pulsar. A configuração do admin do pulsar é necessária apenas quando o controle de admissão está ativado (quando maxBytesPerTrigger é definido).
| Opção | Tipo | Predefinido | Descrição |
|---|---|---|---|
| maxBytesPerTrigger | BIGINT | Nenhuma | Um limite suave do número máximo de bytes que queremos processar por microlote. Se isso for especificado, admin.url também precisa ser especificado. |
| adminUrl | STRING | Nenhuma | A configuração do URL do serviço Pulsar. Só é necessário quando maxBytesPerTrigger é especificado. |
| pulsarAdminAuthPlugin | STRING | Nenhuma | Nome do plug-in de autenticação. |
| pulsarAdminAuthParams | STRING | Nenhuma | Parâmetros para o plug-in de autenticação. |
| pulsarClientUseKeyStoreTls | STRING | Nenhuma | Se deve usar KeyStore para autenticação tls. |
| pulsarAdminTlsTrustStoreType | STRING | Nenhuma | Tipo de ficheiro TrustStore para autenticação TLS. |
| pulsarAdminTlsTrustStorePath | STRING | Nenhuma | Caminho do ficheiro TrustStore para autenticação TLS. |
| pulsarAdminTlsTrustStorePassword | STRING | Nenhuma | Senha para a TrustStore para autenticação TLS. |
Devoluções
Uma tabela de registos de pulsar com o esquema a seguir.
__key STRING NOT NULL: Chave de mensagem do Pulsar.value BINARY NOT NULL: Valor da mensagem do Pulsar.Nota: Para tópicos com esquema Avro ou JSON, em vez de carregar conteúdo em um campo de valor binário, o conteúdo será expandido para preservar os nomes de campo e os tipos de campo do tópico Pulsar.
__topic STRING NOT NULL: Nome do tópico pulsar.__messageId BINARY NOT NULL: ID da mensagem pulsar.__publishTime TIMESTAMP NOT NULL: Tempo de publicação da mensagem Pulsar.__eventTime TIMESTAMP NOT NULL: Hora do evento de mensagem Pulsar.__messageProperties MAP<STRING, STRING>: Propriedades da mensagem Pulsar.
Exemplos
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.