Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Aplica-se a:
SQL do Databricks
Databricks Runtime 14.1 e superior
Importante
Esse recurso está em uma versão prévia.
Retorna uma tabela com registros lidos do Pulsar.
Essa função de valor de tabela só dá suporte a consultas de streaming e não em lote.
Sintaxe
read_pulsar ( { option_key => option_value } [, ...] )
Argumentos
Essa função requer invocação de parâmetro nomeada para as chaves de opção.
As opções serviceUrl e topic são obrigatórias.
As descrições dos argumentos são breves aqui. Consulte a documentação do Pulsar de fluxo estruturado para obter descrições detalhadas.
| Opção | Tipo | Padrão | Descrição |
|---|---|---|---|
| serviceUrl | STRING | Obrigatório | O URI do serviço Pulsar. |
| tópico | STRING | Obrigatório | O tópico a ser lido. |
| predefinedSubscription | STRING | Nenhum | O nome de assinatura predefinido usado pelo conector para acompanhar o progresso do aplicativo Spark. |
| subscriptionPrefix | STRING | Nenhum | Um prefixo usado pelo conector para gerar uma assinatura aleatória para acompanhar o progresso do aplicativo Spark. |
| pollTimeoutMs | LONG | 120000 | O tempo limite para ler mensagens do Pulsar em milissegundos. |
| failOnDataLoss | BOOLEAN | true | Controla se uma consulta deve falhar quando os dados são perdidos (por exemplo, os tópicos são excluídos ou as mensagens são excluídas devido à política de retenção). |
| startingOffsets | STRING | mais recente | O ponto inicial quando uma consulta é iniciada, seja o mais antigo, o mais recente ou uma cadeia de caracteres JSON que especifica um deslocamento específico. Se for mais recente, o leitor lerá os registros mais recentes depois de iniciar a execução. Se for o mais antigo, o leitor lerá a partir do deslocamento mais antigo. O usuário também pode especificar uma cadeia de caracteres JSON que especifica um deslocamento específico. |
| startingTime | STRING | Nenhum | Quando especificado, a origem Pulsar fará a leitura das mensagens a partir da posição do startingTime especificado. |
Os argumentos a seguir são usados para autenticação do cliente do Pulsar:
| Opção | Tipo | Padrão | Descrição |
|---|---|---|---|
| pulsarClientAuthPluginClassName | STRING | Nenhum | Nome do plug-in de autenticação. |
| pulsarClientAuthParams | STRING | Nenhum | Parâmetros para o plug-in de autenticação. |
| pulsarClientUseKeyStoreTls | STRING | Nenhum | Se deve usar o KeyStore para autenticação tls. |
| pulsarClientTlsTrustStoreType | STRING | Nenhum | Tipo de arquivo TrustStore para autenticação tls. |
| pulsarClientTlsTrustStorePath | STRING | Nenhum | Caminho do arquivo TrustStore para autenticação tls. |
| pulsarClientTlsTrustStorePassword | STRING | Nenhum | Senha do TrustStore para autenticação tls. |
Esses argumentos são usados para configuração e autenticação do controle de admissão do pulsar. A configuração do administrador do pulsar só é necessária quando o controle de admissão está habilitado (quando maxBytesPerTrigger está definido)
| Opção | Tipo | Padrão | Descrição |
|---|---|---|---|
| maxBytesPerTrigger | bigint | Nenhum | Um limite flexível do número máximo de bytes que queremos processar por microlote. Se isso for especificado, a admin.url também precisará ser especificada. |
| adminUrl | STRING | Nenhum | A configuração serviceHttpUrl do Pulsar. Necessário somente quando o maxBytesPerTrigger é especificado. |
| pulsarAdminAuthPlugin | STRING | Nenhum | Nome do plug-in de autenticação. |
| pulsarAdminAuthParams | STRING | Nenhum | Parâmetros para o plug-in de autenticação. |
| pulsarClientUseKeyStoreTls | STRING | Nenhum | Se deve usar o KeyStore para autenticação tls. |
| pulsarAdminTlsTrustStoreType | STRING | Nenhum | Tipo de arquivo TrustStore para autenticação tls. |
| pulsarAdminTlsTrustStorePath | STRING | Nenhum | Caminho do arquivo TrustStore para autenticação tls. |
| pulsarAdminTlsTrustStorePassword | STRING | Nenhum | Senha do TrustStore para autenticação tls. |
Retornos
Uma tabela de registros de pulsar com o seguinte esquema.
__key STRING NOT NULL: Chave de mensagem do pulsar.value BINARY NOT NULL: Valor da mensagem do pulsar.Observação: para tópicos com esquema Avro ou JSON, em vez de carregar o conteúdo em um campo de valor binário, o conteúdo será expandido para preservar os nomes e os tipos de campo do tópico do Pulsar.
__topic STRING NOT NULL: Nome do tópico do pulsar.__messageId BINARY NOT NULL: ID da mensagem do pulsar.__publishTime TIMESTAMP NOT NULL: Hora de publicação da mensagem do pulsar.__eventTime TIMESTAMP NOT NULL: Hora do evento da mensagem do pulsar.__messageProperties MAP<STRING, STRING>: Propriedades da mensagem do pulsar.
Exemplos
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.