Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Observação
Este artigo aborda o Databricks Connect for Databricks Runtime 14.1 e superior.
O Databricks Connect for Scala suporta a execução de funções definidas pelo usuário (UDFs) em clusters Databricks a partir do seu ambiente de desenvolvimento local.
Esta página descreve como executar funções definidas pelo usuário com o Databricks Connect for Scala.
Para a versão Python deste artigo, consulte funções definidas pelo usuário no Databricks Connect for Python.
Carregar classe compilada e JARs
Para que as UDFs funcionem, as classes compiladas e os JARs devem ser carregados no cluster usando a addCompiledArtifacts() API.
Observação
O Scala usado pelo cliente deve corresponder à versão do Scala no cluster do Azure Databricks. Para verificar a versão do Scala do cluster, consulte a seção "Ambiente do sistema" para obter a versão do Databricks Runtime do cluster em Versões e compatibilidade das notas de versão do Databricks Runtime.
O programa Scala a seguir configura um UDF simples que quadra valores em uma coluna.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val spark = getSession()
val squared = udf((x: Long) => x * x)
spark.range(3)
.withColumn("squared", squared(col("id")))
.select("squared")
.show()
}
}
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
// On a Databricks cluster — reuse the active session
SparkSession.active
} else {
// Locally with Databricks Connect — upload local JARs and classes
DatabricksSession
.builder()
.addCompiledArtifacts(
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
)
.getOrCreate()
}
}
}
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI aponta para o mesmo local que a saída compilada do projeto (por exemplo, destino/classes ou o JAR construído). Todas as classes compiladas são enviadas para o Databricks, não apenas Main.
target/scala-2.13/classes/
├── com/
│ ├── examples/
│ │ ├── Main.class
│ │ └── MyUdfs.class
│ └── utils/
│ └── Helper.class
Quando a sessão do Spark já estiver inicializada, outras classes compiladas e JARs poderão ser carregados usando a API spark.addArtifact().
Observação
Ao carregar JARs, é necessário incluir todos os JARs de dependência transitiva para o carregamento. As APIs não executam nenhuma deteção automática de dependências transitivas.
UDFs com dependências de terceiros
Se adicionaste uma dependência de Maven em build.sbt que é usada num UDF, mas não está disponível no cluster Databricks, por exemplo:
// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils
// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
StringEscapeUtils.escapeHtml4(text)
})
Use spark.addArtifact() com ivy:// para baixar dependências a partir do Maven:
Adicionar a
orobiblioteca ao ficheirobuild.sbtlibraryDependencies ++= Seq( "org.apache.commons" % "commons-text" % "1.10.0" % Provided, "oro" % "oro" % "2.0.8" // Required for ivy:// to work )Adicione o artefato depois de criar a sessão com a
addArtifact()API:def getSession(): SparkSession = { if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) { SparkSession.active } else { val spark = DatabricksSession.builder() .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI) .getOrCreate() // Convert Maven coordinates to ivy:// format // From: "org.apache.commons" % "commons-text" % "1.10.0" // To: ivy://org.apache.commons:commons-text:1.10.0 spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0") spark } }
APIs de conjunto de dados tipadas
As APIs de conjunto de dados tipadas permitem executar transformações como map(), filter(), mapPartitions()e agregações nos conjuntos de dados resultantes. O carregamento da classe compilada e dos JARs para o cluster usando a addCompiledArtifacts() API também se aplica a eles, portanto, seu código deve se comportar de forma diferente dependendo de onde ele é executado:
- Desenvolvimento local com Databricks Connect: Carregue artefatos para o cluster remoto.
- Implantado em Databricks em execução no cluster: Não há necessidade de carregar nada porque as classes já estão lá.
O aplicativo Scala a seguir usa a API map() para modificar um número em uma coluna de resultados para uma cadeia de caracteres prefixada.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Dependências JAR externas
Se você estiver usando uma biblioteca privada ou de terceiros que não esteja no cluster:
import com.mycompany.privatelib.DataProcessor
// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
DataProcessor.process(data)
})
Carregue JARs externos da sua lib/ pasta ao criar a sessão:
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
val builder = DatabricksSession.builder()
.addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
// Add all JARs from lib/ folder
val libFolder = new java.io.File("lib")
builder.addCompiledArtifacts(libFolder.toURI)
builder.getOrCreate()
}
}
Isso carrega automaticamente todos os JARs em seu diretório lib/ para o Databricks quando executado localmente.
Projetos com múltiplos módulos
Em um projeto SBT multimódulo, getClass.getProtectionDomain.getCodeSource.getLocation.toURI retorna apenas a localização do módulo atual. Se o UDF usar classes de outros módulos, obterás ClassNotFoundException.
my-project/
├── module-a/ (main application)
├── module-b/ (utilities - module-a depends on this)
Use getClass numa classe em cada módulo para obter todas as suas localizações e carregá-las separadamente.
// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor // From module-b
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
// Get location using a class FROM module-a
val moduleALocation = Main.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
// Get location using a class FROM module-b
val moduleBLocation = DataProcessor.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
DatabricksSession.builder()
.addCompiledArtifacts(moduleALocation) // Upload module-a
.addCompiledArtifacts(moduleBLocation) // Upload module-b
.getOrCreate()
}
}