Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
função de streaming com valor de tabela
Aplica-se a:
SQL do Databricks
Databricks Runtime 13.3 LTS e versões posteriores
Retorna uma tabela com registros lidos do Kinesis de um ou mais fluxos.
Sintaxe
read_kinesis ( { parameter => value } [, ...] )
Argumentos
read_kinesis requer invocação de parâmetro nomeado.
O único argumento necessário é streamName. Todos os outros argumentos são opcionais.
As descrições dos argumentos são breves aqui. Para obter mais detalhes, confira a documentação do Amazon Kinesis.
Há várias maneiras de se conectar e autenticar com o AWS.
A abordagem recomendada é criar uma credencial de serviço do Databricks e especificá-la usando a opção serviceCredential .
Como alternativa, você pode autenticar usando awsAccessKey e awsSecretKey.
Essas opções podem ser especificadas nos argumentos de função usando a secret função, definidas manualmente nos argumentos ou configuradas como variáveis de ambiente, conforme indicado abaixo.
roleArn, roleExternalID e roleSessionName também podem ser usados para autenticar com a AWS usando perfis de instância.
Se nenhum deles for especificado, será usada a cadeia de provedores padrão da AWS.
| Parâmetro | Tipo | Descrição |
|---|---|---|
streamName |
STRING |
Obrigatório, lista separada por vírgulas de um ou mais fluxos do Kinesis. |
serviceCredential |
STRING |
O nome da sua credencial de serviço do Databricks. |
awsAccessKey |
STRING |
A chave de acesso da AWS, se houver. Também pode ser especificada por meio das várias opções compatíveis com a cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_ACCESS_KEY_ID) e um arquivo de perfis de credenciais. |
awsSecretKey |
STRING |
A chave secreta que corresponde à chave de acesso. Também pode ser especificada por meio das várias opções compatíveis com a cadeia de provedores de credenciais padrão da AWS, incluindo variáveis de ambiente (AWS_SECRET_KEY ou AWS_SECRET_ACCESS_KEY) e um arquivo de perfis de credenciais. |
roleArn |
STRING |
O nome de recurso da Amazon da função a ser assumida ao acessar o Kinesis. |
roleExternalId |
STRING |
Usado ao delegar acesso à conta da AWS. |
roleSessionName |
STRING |
Nome da sessão de função da AWS. |
stsEndpoint |
STRING |
Um ponto de extremidade para solicitar credenciais de acesso temporário. |
region |
STRING |
Região dos fluxos a serem especificados. O padrão é a região resolvida localmente. |
endpoint |
STRING |
ponto de extremidade regional para fluxos de dados do Kinesis. O padrão é a região resolvida localmente. |
initialPosition |
STRING |
Posição inicial para leitura no fluxo. Um dos seguintes: 'latest' (padrão), 'trim_horizon', 'earliest' ou 'at_timestamp'. |
consumerMode |
STRING |
Um deles: 'sondagem' (padrão) ou 'EFO' (enhanced-fan-out, avançado). |
consumerName |
STRING |
O nome do consumidor. Todos os consumidores têm o prefixo 'databricks_'. O padrão é uma cadeia de caracteres vazia. |
registerConsumerTimeoutInterval |
STRING |
o tempo limite máximo para aguardar que o consumidor de EFO do Kinesis seja registrado no fluxo do Kinesis antes de gerar um erro. O padrão é '300s'. |
requireConsumerDeregistration |
BOOLEAN |
true para cancelar o registro do consumidor EFO no término da consulta. O padrão é false. |
deregisterConsumerTimeoutInterval |
STRING |
O tempo limite máximo a ser esperado para que o consumidor de EFO do Kinesis tenha o registro cancelado com o fluxo do Kinesis antes de gerar um erro. O padrão é '300s'. |
consumerRefreshInterval |
STRING |
O intervalo no qual o consumidor é verificado e atualizado. O padrão é '300s'. |
Os seguintes argumentos são usados para controlar a taxa de transferência de leitura e a latência do Kinesis:
| Parâmetro | Tipo | Descrição |
|---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Opcional, com um padrão de 10.000, registros a serem lidos por solicitação de API para o Kinesis. |
maxFetchRate |
STRING |
A velocidade da pré-busca de dados por fragmento. Um valor entre '1.0' e '2.0' que é medido em MB/s. O padrão é '1.0'. |
minFetchPeriod |
STRING |
O tempo de espera máximo entre tentativas de pré-busca consecutivas. O padrão é '400ms'. |
maxFetchDuration |
STRING |
A duração máxima para armazenar os novos dados pré-buscados. O padrão é '10s'. |
fetchBufferSize |
STRING |
A quantidade de dados para o próximo gatilho. O padrão é '20gb'. |
shardsPerTask |
INTEGER (>0) |
O número de fragmentos do Kinesis a serem pré-buscados em paralelo por tarefa do Spark. O padrão é 5. |
shardFetchinterval |
STRING |
Com que frequência pesquisar para recriar fragmentos. O padrão é '1 segundos'. |
coalesceThresholdBlockSize |
INTEGER (>0) |
O limite no qual a união automática ocorre. O padrão é 10.000.000. |
coalesce |
BOOLEAN |
true para unir solicitações pré-buscadas. O padrão é true. |
coalesceBinSize |
INTEGER (>0) |
O tamanho do bloco aproximado após a união. O padrão é 128.000.000. |
reuseKinesisClient |
BOOLEAN |
true para reutilizar o cliente Kinesis armazenado no cache. O padrão é true exceto em um cluster PE. |
clientRetries |
INTEGER (>0) |
O número de repetições no cenário de repetição. O padrão é 5. |
Retornos
Uma tabela de registros Kinesis com o seguinte esquema:
| Nome | Tipo de dados | Anulável | Standard | Descrição |
|---|---|---|---|---|
partitionKey |
STRING |
Não | Uma chave usada para distribuir dados entre os fragmentos de um fluxo. Todos os registros de dados com a mesma chave de partição serão lidos do mesmo fragmento. | |
data |
BINARY |
Não | O conteúdo de dados do Kinesis, codificado em base 64. | |
stream |
STRING |
Não | O nome do fluxo do qual os dados foram lidos. | |
shardId |
STRING |
Não | Um identificador exclusivo para o fragmento do qual os dados foram lidos. | |
sequenceNumber |
BIGINT |
Não | O identificador exclusivo do registro dentro de seu fragmento. | |
approximateArrivalTimestamp |
TIMESTAMP |
Não | A hora aproximada em que o registro foi inserido no fluxo. |
As colunas (stream, shardId, sequenceNumber) constituem uma chave primária.
Exemplos
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret('test-databricks', 'awsAccessKey'),
awsSecretKey => secret('test-databricks', 'awsSecretKey'),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');