Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
SE APLICA A:
Azure Data Factory
Azure Synapse Analytics
Sugerencia
Pruebe Data Factory en Microsoft Fabric, una solución de análisis integral para empresas. Microsoft Fabric abarca todo, desde el movimiento de datos hasta la ciencia de datos, el análisis en tiempo real, la inteligencia empresarial y los informes. Obtenga información sobre cómo iniciar una nueva evaluación gratuita.
Importante
El 1 de enero de 2026 ya no podrás crear nuevas instancias de Airflow usando el Workflow Orchestration Manager de ADF. Recomendamos que migres todas las cargas de trabajo de Workflow Orchestration Manager (Apache Airflow en Azure Data Factory) a trabajos de Apache Airflow en Microsoft Fabric antes del 31 de diciembre de 2025.
Para más información o para recibir apoyo durante tu migración a Apache Airflow en Microsoft Fabric, contacta con el soporte de Microsoft.
Las canalizaciones de Data Factory proporcionan más de 100 conectores de origen de datos que ofrecen flujos de datos o integración de datos escalables y confiables. Habrá escenarios en los que quiera ejecutar una canalización de factoría de datos existente desde el DAG de Apache Airflow. En este tutorial se muestra cómo hacerlo.
Requisitos previos
- Suscripción de Azure. Si no tiene una suscripción a Azure, cree una cuenta gratuita de Azure antes de empezar.
- Cuenta de Azure Storage. Si no tiene una cuenta de almacenamiento, consulte Crear una cuenta de almacenamiento de Azure para crear una. Asegúrese de que la cuenta de almacenamiento solo permita el acceso desde las redes seleccionadas.
- Canalización de Azure Data Factory Puedes seguir cualquiera de los tutoriales y crear una nueva canalización de factoría de datos en caso de que aún no tengas una, o crear una con una selección en Comenzar y probar tu primera canalización de factoría de datos.
- Configuración de una entidad de servicio Deberá crear una nueva entidad de servicio o usar una existente y concederle permiso para ejecutar la canalización (ejemplo: rol colaborador en la factoría de datos donde existen las canalizaciones existentes), incluso si el entorno de Administrador de orquestación de flujo de trabajo y las canalizaciones existen en la misma factoría de datos. Deberás obtener el identificador de cliente y el secreto de cliente (clave de API) de la entidad de servicio.
Pasos
Cree un nuevo archivo de Python adf.py con el siguiente contenido:
from datetime import datetime, timedelta from airflow.models import DAG, BaseOperator try: from airflow.operators.empty import EmptyOperator except ModuleNotFoundError: from airflow.operators.dummy import DummyOperator as EmptyOperator # type: ignore from airflow.providers.microsoft.azure.operators.data_factory import AzureDataFactoryRunPipelineOperator from airflow.providers.microsoft.azure.sensors.data_factory import AzureDataFactoryPipelineRunStatusSensor from airflow.utils.edgemodifier import Label with DAG( dag_id="example_adf_run_pipeline", start_date=datetime(2022, 5, 14), schedule_interval="@daily", catchup=False, default_args={ "retries": 1, "retry_delay": timedelta(minutes=3), "azure_data_factory_conn_id": "<connection_id>", #This is a connection created on Airflow UI "factory_name": "<FactoryName>", # This can also be specified in the ADF connection. "resource_group_name": "<ResourceGroupName>", # This can also be specified in the ADF connection. }, default_view="graph", ) as dag: begin = EmptyOperator(task_id="begin") end = EmptyOperator(task_id="end") # [START howto_operator_adf_run_pipeline] run_pipeline1: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline1", pipeline_name="<PipelineName>", parameters={"myParam": "value"}, ) # [END howto_operator_adf_run_pipeline] # [START howto_operator_adf_run_pipeline_async] run_pipeline2: BaseOperator = AzureDataFactoryRunPipelineOperator( task_id="run_pipeline2", pipeline_name="<PipelineName>", wait_for_termination=False, ) pipeline_run_sensor: BaseOperator = AzureDataFactoryPipelineRunStatusSensor( task_id="pipeline_run_sensor", run_id=run_pipeline2.output["run_id"], ) # [END howto_operator_adf_run_pipeline_async] begin >> Label("No async wait") >> run_pipeline1 begin >> Label("Do async wait with sensor") >> run_pipeline2 [run_pipeline1, pipeline_run_sensor] >> end # Task dependency created via `XComArgs`: # run_pipeline2 >> pipeline_run_sensorDeberá crear la conexión mediante la interfaz de usuario del Administrador de orquestación de flujo de trabajo: Administración ->Conexiones -> '+' -> Elija 'tipo de conexión' como 'Azure Data Factory', después, rellene su client_id, client_secret, tenant_id, subscription_id, resource_group_name, data_factory_name y pipeline_name.
Cargue el archivo adf.py en el almacenamiento de blobs dentro de una carpeta denominada DAGS.
Importe la carpeta DAGS en el entorno del Administrador de orquestación de flujo de trabajo. Si no tienes uno, crea uno nuevo