Partilhar via


O que são funções definidas pelo usuário (UDFs)?

As funções definidas pelo utilizador (UDFs) permitem-lhe reutilizar e partilhar código que estende a funcionalidade incorporada no Azure Databricks. Use UDFs para executar tarefas específicas, como cálculos complexos, transformações ou manipulações de dados personalizadas.

Quando usar uma função UDF vs. Apache Spark?

Use UDFs para lógica difícil de expressar com funções integradas do Apache Spark. As funções integradas do Apache Spark são otimizadas para processamento distribuído e oferecem melhor desempenho em escala. Para obter mais informações, consulte Functions.

O Databricks recomenda UDFs para consultas ad hoc, limpeza manual de dados, análise exploratória de dados e operações em conjuntos de dados de pequeno a médio porte. Os casos de uso comuns para UDFs incluem criptografia de dados, descriptografia, hashing, análise JSON e validação.

Use os métodos Apache Spark para operações em conjuntos de dados muito grandes e quaisquer cargas de trabalho executadas regularmente ou continuamente, incluindo trabalhos ETL e operações de streaming.

Compreender os tipos de UDF

Selecione um tipo UDF nas guias a seguir para ver uma descrição, um exemplo e um link para saber mais.

UDF escalar

UDFs escalares operam em uma única linha e retornam um único valor de resultado para cada linha. Eles podem ser governados pelo Unity Catalog ou com escopo de sessão.

O exemplo a seguir usa um UDF escalar para calcular o comprimento de cada nome em uma name coluna e adicionar o valor em uma nova coluna name_length.

+-------+-------+
| name  | score |
+-------+-------+
| alice |  10.0 |
| bob   |  20.0 |
| carol |  30.0 |
| dave  |  40.0 |
| eve   |  50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);

-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name  | score | name_length |
+-------+-------+-------------+
| alice |  10.0 |      5      |
|  bob  |  20.0 |      3      |
| carol |  30.0 |      5      |
| dave  |  40.0 |      4      |
|  eve  |  50.0 |      3      |
+-------+-------+-------------+

Para implementar isso em um notebook Databricks usando o PySpark:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def get_name_length(name):
  return len(name)

df = df.withColumn("name_length", get_name_length(df.name))

# Show the result
display(df)

Consulte Funções definidas pelo usuário (UDFs) no Unity Catalog e Funções escalares definidas pelo usuário - Python.

UDFs escalares em lote

Processe dados em lotes, mantendo a paridade de linha de entrada/saída 1:1. Isso reduz a sobrecarga de operações linha a linha para processamento de dados em grande escala. As UDFs de lote também mantêm o estado entre lotes para serem executadas com mais eficiência, reutilizar recursos e lidar com cálculos complexos que precisam de contexto em blocos de dados.

Eles podem ser governados pelo Unity Catalog ou com escopo de sessão.

O seguinte Batch Unity Catalog Python UDF calcula o IMC durante o processamento de lotes de linhas:

+-------------+-------------+
| weight_kg   | height_m    |
+-------------+-------------+
|      90     |     1.8     |
|      77     |     1.6     |
|      50     |     1.5     |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
|  BMI   |
+--------+
|  27.8  |
|  30.1  |
|  22.2  |
+--------+

Consulte Funções definidas pelo usuário (UDFs) no Unity Catalog e Batch Python User-defined functions (UDFs) no Unity Catalog.

UDFs não escalares

UDFs não escalares operam em conjuntos de dados/colunas inteiros com relações de entrada/saída flexíveis (1:N ou muitos:muitos).

Os UDFs de pandas em lote com escopo de sessão podem ser dos seguintes tipos:

  • De Série Para Série
  • Iterador de Série para Iterador de Série
  • Iterador de múltiplas séries para iterador de uma série
  • Conversão de série para escalar

Segue-se um exemplo de um pandas UDF de Série para Série.

from pyspark.sql.functions import pandas_udf
import pandas as pd

df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])

@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
  return weight / (height ** 2)

df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()

Consulte as funções definidas pelo utilizador do pandas.

UDAF

As UDAFs operam em várias linhas e retornam um único resultado agregado. As UDAFs estão limitadas ao âmbito da sessão.

O exemplo UDAF a seguir agrega pontuações por comprimento de nome.

from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd

# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
  return scores.sum()

# Group by name length and aggregate
result_df = (df.groupBy("name_length")
  .agg(total_score_udf(df["score"]).alias("total_score")))

display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
|      3      |     70.0    |
|      4      |     40.0    |
|      5      |     40.0    |
+-------------+-------------+

Consulte funções pandas definidas pelo usuário para Python e funções agregadas definidas pelo usuário - Scala.

Funções de Tabela Definidas pelo Utilizador (UDTFs)

Um UDTF usa um ou mais argumentos de entrada e retorna várias linhas (e possivelmente várias colunas) para cada linha de entrada. Eles podem ser governados pelo Unity Catalog ou com escopo de sessão.

O UDTF a seguir cria uma tabela usando uma lista fixa de dois argumentos inteiros:

CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y
$$;

SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13  | 7    |
+-----+------+

Para implementar isso em um notebook Databricks usando o PySpark:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Consulte UDTFs do Unity Catalog e UDTFs com escopo específico de sessão.

UDFs governadas pelo Catálogo Unity vs. UDFs com escopo de sessão

As UDFs Python do Catálogo Unity, as UDFs Python do Catálogo Unity em Lote e as UDTFs Python do Catálogo Unity são persistidas no Catálogo Unity para melhorar a governança, a reutilização e a facilidade de descoberta. Todas as outras UDFs são baseadas em sessão, o que significa que elas são definidas em um bloco de anotações ou trabalho e têm como escopo a SparkSession atual. Você pode definir e acessar UDFs com escopo de sessão usando Scala ou Python.

Folha de dicas de UDFs geridas pelo Unity Catalog

As UDFs governadas pelo Unity Catalog permitem que funções personalizadas sejam definidas, usadas, compartilhadas com segurança e controladas em ambientes de computação. Consulte Funções definidas pelo usuário (UDFs) no Unity Catalog.

Tipo UDF Computação suportada Descrição
Unity Catálogo Python UDF
  • Blocos de anotações e trabalhos sem servidor
  • Computação clássica com modo de acesso padrão (Databricks Runtime 13.3 LTS e superior)
  • SQL warehouse (sem servidor e pro)
  • Lakeflow Spark Declarative Pipelines (clássico e sem servidor)
Defina um UDF em Python e registre-o no Unity Catalog para governança.
UDFs escalares operam em uma única linha e retornam um único valor de resultado para cada linha.
Batch Unity Catálogo Python UDF
  • Blocos de anotações e trabalhos sem servidor
  • Computação clássica com modo de acesso padrão (Databricks Runtime 16.3 e superior)
  • SQL warehouse (sem servidor e pro)
Defina um UDF em Python e registre-o no Unity Catalog para governança.
Realizar operações em lote com múltiplos valores e devolver múltiplos resultados. Reduz a sobrecarga das operações realizadas linha por linha para o processamento de dados em grande escala.
Unity Catálogo Python UDTF
  • Blocos de anotações e trabalhos sem servidor
  • Computação clássica com modo de acesso padrão (Databricks Runtime 17.1 e superior)
  • SQL warehouse (sem servidor e pro)
Defina um UDTF em Python e registre-o no Unity Catalog para governança.
Um UDTF usa um ou mais argumentos de entrada e retorna várias linhas (e possivelmente várias colunas) para cada linha de entrada.

Folha de truques UDFs com escopo de sessão para computação isolada do usuário

As UDFs com escopo de sessão são definidas em um bloco de anotações ou trabalho e têm como escopo a SparkSession atual. Você pode definir e acessar UDFs com escopo de sessão usando Scala ou Python.

Tipo UDF Computação suportada Descrição
Python escalar
  • Blocos de anotações e trabalhos sem servidor
  • Computação clássica com modo de acesso padrão (Databricks Runtime 13.3 LTS e superior)
  • Lakeflow Spark Declarative Pipelines (clássico e sem servidor)
UDFs escalares operam em uma única linha e retornam um único valor de resultado para cada linha.
Python não-escalar
  • Blocos de anotações e trabalhos sem servidor
  • Computação clássica com modo de acesso padrão (Databricks Runtime 14.3 LTS e superior)
  • Lakeflow Spark Declarative Pipelines (clássico e sem servidor)
UDFs não escalares incluem pandas_udf, mapInPandas, mapInArrow, applyInPandas. Os Pandas UDFs usam a Seta Apache para transferir dados e os pandas para trabalhar com os dados. Os Pandas UDFs suportam operações vetorizadas que podem aumentar consideravelmente o desempenho em UDFs escalares linha a linha.
Python UDTFs
  • Blocos de anotações e trabalhos sem servidor
  • Computação clássica com modo de acesso padrão (Databricks Runtime 14.3 LTS e superior)
  • Lakeflow Spark Declarative Pipelines (clássico e sem servidor)
Um UDTF usa um ou mais argumentos de entrada e retorna várias linhas (e possivelmente várias colunas) para cada linha de entrada.
UDFs escalares Scala
  • Computação clássica com modo de acesso padrão (Databricks Runtime 13.3 LTS e superior)
UDFs escalares operam em uma única linha e retornam um único valor de resultado para cada linha.
Scala UDAFs
  • Computação clássica com modo de acesso dedicado (Databricks Runtime 14.2 LTS e superior)
As UDAFs operam em várias linhas e retornam um único resultado agregado.

Considerações sobre desempenho

  • Funções internas e UDFs SQL são as opções mais eficientes.

  • UDFs Scala são geralmente mais rápidos do que UDFs Python.

    • UDFs Scala não isoladas são executadas na Java Virtual Machine (JVM), evitando a sobrecarga de mover dados para dentro e para fora da JVM.
    • UDFs Scala isoladas precisam mover dados para dentro e para fora da JVM, mas ainda podem ser mais rápidas do que UDFs Python porque lidam com memória de forma mais eficiente.
  • UDFs Python e UDFs pandas tendem a ser mais lentos do que UDFs Scala porque precisam serializar dados e movê-los da JVM para o interpretador Python.

    • As UDFs Pandas são até 100x mais rápidas que as UDFs Python porque usam a Seta Apache para reduzir os custos de serialização.