Partilhar via


Consultar bases de dados utilizando JDBC

O Azure Databricks dá suporte à conexão com bancos de dados externos usando JDBC. Este artigo fornece a sintaxe básica para configurar e usar essas conexões com exemplos em Python, SQL e Scala.

Importante

A documentação de federação de consultas herdada foi desativada e pode não ser atualizada no futuro. As configurações mencionadas neste conteúdo não são oficialmente endossadas ou testadas pela Databricks. Se a Lakehouse Federation oferecer suporte ao seu banco de dados de origem, o Databricks recomenda usá-lo.

O Partner Connect fornece integrações otimizadas para sincronizar dados com muitas fontes de dados externas externas. Consulte O que é o Databricks Partner Connect?.

Importante

Os exemplos neste artigo não incluem nomes de usuário e senhas em URLs JDBC. O Databricks recomenda o uso de segredos para armazenar suas credenciais de banco de dados. Por exemplo:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Escala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Para fazer referência a segredos do Databricks com SQL, você deve configurar uma propriedade de configuração do Spark durante a iniciação do cluster.

Para obter um exemplo completo de gerenciamento de segredos, consulte Tutorial: Criar e usar um segredo do Databricks.

Ler dados com JDBC

Você deve definir várias configurações para ler dados usando JDBC. Observe que cada banco de dados usa um formato diferente para o <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Escala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

O Spark lê automaticamente o esquema da tabela do banco de dados e mapeia seus tipos de volta para os tipos SQL do Spark.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Escala

employees_table.printSchema

Você pode executar consultas nesta tabela JDBC:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Escala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Gravar dados com JDBC

Salvar dados em tabelas com JDBC usa configurações semelhantes à leitura. Veja o seguinte exemplo:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Escala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

O comportamento padrão tenta criar uma nova tabela e lança um erro se uma tabela com esse nome já existir.

Você pode acrescentar dados a uma tabela existente usando a seguinte sintaxe:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Escala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Você pode substituir uma tabela existente usando a seguinte sintaxe:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Escala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Controlar paralelismo para consultas JDBC

Por padrão, o driver JDBC consulta o banco de dados de origem com apenas um único thread. Para melhorar o desempenho de leituras, você precisa especificar várias opções para controlar quantas consultas simultâneas o Azure Databricks faz ao seu banco de dados. Para clusters pequenos, definir a numPartitions opção igual ao número de núcleos executores no cluster garante que todos os nós consultem dados em paralelo.

Advertência

A configuração numPartitions de um valor alto em um cluster grande pode resultar em desempenho negativo para o banco de dados remoto, pois muitas consultas simultâneas podem sobrecarregar o serviço. Isso é especialmente problemático para bancos de dados de aplicativos. Tenha cuidado ao definir este valor acima de 50.

Observação

Acelere as consultas selecionando uma coluna com um índice calculado no banco de dados de origem para o partitionColumn.

O exemplo de código a seguir demonstra a configuração do paralelismo para um cluster com oito núcleos:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Escala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Observação

O Azure Databricks suporta todas as opções do Apache Spark para configurar o JDBC.

Ao gravar em bancos de dados usando JDBC, o Apache Spark usa o número de partições na memória para controlar o paralelismo. Você pode reparticionar dados antes de gravar para controlar o paralelismo. Evite um grande número de partições em clusters grandes para evitar sobrecarregar seu banco de dados remoto. O exemplo a seguir demonstra o reparticionamento para oito partições antes de escrever:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Escala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Empurrar uma consulta para o mecanismo de banco de dados

Você pode enviar uma consulta inteira para o banco de dados e retornar apenas o resultado. O table parâmetro identifica a tabela JDBC a ser lida. Você pode usar qualquer coisa que seja válida em uma cláusula de consulta FROM SQL.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Escala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Controlar o número de linhas obtidas por consulta

Os drivers JDBC têm um fetchSize parâmetro que controla o número de linhas buscadas por vez no banco de dados remoto.

Configurações Resultado
Muito baixo Alta latência devido a muitas viagens de ida e volta (poucas linhas retornadas por consulta)
Muito alto Erro de falta de memória (muitos dados retornados em uma consulta)

O valor ideal depende da carga de trabalho. As considerações incluem:

  • Quantas colunas são retornadas pela consulta?
  • Que tipos de dados são retornados?
  • Qual é o comprimento das cadeias de caracteres em cada coluna retornada?

Os sistemas podem ter uma configuração padrão muito limitada e beneficiar-se da otimização. Por exemplo: o padrão fetchSize da Oracle é 10. O aumento para 100 reduz o número total de consultas que precisam ser executadas por um fator de 10. Os resultados JDBC são tráfego de rede, portanto, evite números muito grandes, mas os valores ideais podem estar na casa dos milhares para muitos conjuntos de dados.

Use a opção fetchSize, como no exemplo a seguir:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Escala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()