Airflow: can a subdag be run inside another subdag?

I have a subdag as one of the nodes of a main DAG. The workflow works fine. I tried to increase the levels of hierarchy by including another subdag inside the subdag. But airflow seem to get confused. Couple of questions in this regard: 1) Does airflow support subdag inside a subdag? If so, is there a limit to the hierarchy? 2) Are there any best practices in using a subdag inside a subdag?

Where in the Airflow UI can I add the aws connections?

In this example, I see that the EmrCreateJobFlowOperator receives the aws/emr connections that were setup in Airflow UI: cluster_creator = EmrCreateJobFlowOperator( task_id='create_job_flow', job_flow_overrides=JOB_FLOW_OVERRIDES, aws_conn_id='aws_default', emr_conn_id='emr_conn_id', dag=dag) In Airflow UI, in the tabs of connections, how can I add my AWS credentials so the DAG can pick them up? I don't see any type of connections for AWS. Any idea?

Airflow - Using an upstream task for multiple downstream tasks

Okay I apologize if this is a dumb question, because it seems so obvious that it should work. But I can't find it documented, and as are examining our options as we look to build a new data pipeline, I really want this to be a feature... Can multiple downstream processes be dependent on a single upstream process, where the upstream process only runs once. In other words, can I extract a table one time, and then load it to my data warehouse, and have multiple aggregations that are dependent on

Airflow backfill stops if any task fails

I am using airflow cli's backfill command to manually run some backfill jobs. airflow backfill mydag -i -s 2018-01-11T16-00-00 -e 2018-01-31T23-00-00 --reset_dagruns --rerun_failed_tasks The dag interval is hourly and it has around 40 tasks. Hence this kind of backfill job takes more than a day to finish. I need it to run without supervision. I noticed however, that even if one task fails at one of the runs in the backfill interval, the entire backfill job stops with the following exception

airflow pool used slots is greater than slots limit

There are three Sensor task and use same pool, the pool 'limit_sensor' is set to 1, but the pool limit not work, three pool is running together sensor_wait = SqlSensor( task_id='sensor_wait', dag=dag, conn_id='dest_data', sql="select count(*) from test", poke_interval=10, timeout=60, pool='limit_sensor', priority_weight=100 ) same_pool1 = SqlSensor( task_id='same_pool1', dag=dag, conn_id='dest_data', sql="select count(*) from test", poke_inte

what is the difference between AIRFLOW_HOME vs AIRFLOW__CORE__AIRFLOW_HOME environment variables

Documentation for Airflow talks about setting an environment variable named $AIRFLOW_HOME which is where airflow will be installed. The configuration file airflow.cfg is created by this process has an attribute called airflow_home in the [core] section at the top of the file. This makes sense. But, the way you override airflow variables in the airflow.cfg with environment variables is with the pattern AIRFLOW__[SECTION]__VARIABLENAME

Airflow: DAG marked as success although one task failed

A similar question has been asked before but as far as I can tell, it can't be explained by this answer since I don't have any tasks with "all_done" trigger rule. As seen from the image above, a task has failed but that DAG is still marked as a success. The trigger rule for all the tasks but one is the default one. Any idea how to mark the whole DAG as Failed in such case?

Airflow Run DAG with tasks of different interval

I have 3 tasks, A, B and C. I want to run task A only once, and then run task B monthly until end_date, then run task C only once to clean up. This is similar to this question, but not applicable. How to handle different task intervals on a single Dag in airflow? Thanks for your help

Is there a way to extract Last run (date) , last expired from Airflow for a DAG and sent over an email

I am looking to automate the manual process of checking DAG's status (success/failure) from Last Run , Last Expired Date of a DAG . Currently our DAG's are scheduled to run at time Current hour - 2hrs . Issue : At times scheduler stops and the jobs start to fail . To avoid this I need to set up and email which will give info about DAG success and its Last Run , last Expired fields . I checked these fields are available in mysql (database for airflow); under Database : airflow Table : dag

Installing (python3) airflow does not create airflow directory

Having problem where installing airflow (in python3) does not create the airflow directory (unlike in the python2 version, where the directory is created automatically after pip install). [airflow@airflowetl ~]$ pip3 --no-cache-dir install apache-airflow --user Collecting apache-airflow Downloading (5.9MB) 78% |█████████████████████████▎

Apache AirFlow : How to schedule it on remote machines

I am new to apache airflow, Could you please help me to understand where/what should I configure to run a DAG in a remote machines. I am using the celery_executor to execute the code on worker nodes, I have not done any configurations on worker nodes, I am using RabitMQ as queue service and seems like I have configured the Airflow cluster correctly. My DAG file : """ Code that goes along with the Airflow tutorial located at:

Prevent Airflow from back filling jobs

We use the most recent Airflow v1.10.7 and have set catchup_by_default=False in the airflow.cfg and set catchup=False in each one of our dags and set catchup=False in the default args. But still the airflow backfills and runs the jobs. We really want to use Airflow but this is a show stopper. Also why does the dag run automatically when it's first created even though catchup = False? Thanks for your help. Sample dag: from airflow import DAG from utils.general import Schedule, default_args, A

Airflow Google Cloud Composer DAG not appearing in UI

Was attempting to add a new DAG to our Google Cloud Composer instance - we have 32+ DAGs currently - and doing the usual things in doesn't appear to be having any effect - we can't see these DAGs in the webserver/UI and I don't see that they are necessarily being loaded. I do see them being copied to the appropriate bucket in the logs but nothing beyond that. I even tried setting a dummy environment variable to kick off a full re

Putting a code from a .bat file into a Dag file apache airflow

I' am new in BI and I need some help I'am using windows Jobscheduler in order to executer tasks , but sometimes it bugs so i am moving to apache airflow , I have already a bat file who execute but I want to use it in apache airflow dags , this is my file bat code cd /d D:\EXMOOV\Scripts call RunDTS.bat EXMOOV Js_002_MOOV_AIR I want to put it in a dag file code in order to execute it , so I took an example of a Dag code and try it to put it so the file become unreadable and apache airflow di

In Airflow, how do you get a parent tasks, task_id?

I have a branch task that relies on an XCOM set by it's direct upstream. The upstream task id's are generated via loop such as task_1, task_2..task_n. So something like this: task_n >> branch[task_a, task_b] Is there a way for a branch to access an XCOM set by it's direct upstream? I know I could use op_kwargs and pass the task id to the branch. I just wanted to see if there was a more Airflow native way to do it.

Sqs sensor in airflow not triggering when there is a new message

I am new to Airflow. I am trying to run my dag whenever there is a message in SQS. I am using SQSSensor to do the same. It triggered for the first run but after that it doesn't invoke when there is a new message. Please let me know if i am missing something. default_args = { 'owner': 'Airflow', 'start_date': days_ago(2), 'provide_context': True, } dag = DAG('sqs_test', default_args=default_args, schedule_interval='@daily') task = SQSSensor( task_id='sqs_test', poke_interval

Airflow param 'concurrency'

in my DAG i set the concurrency param to 2. The problem with that is, that my defined order in which the tasks are supposed to be processed is not being met anymore. With concurrency=1 the defined order is working. Is there a way to have simultaneous task processions while following the defined task order? Thans a lot for feedback. Cheers.

Airflow Dynamic Task Creation Stream Setting Not Working

I have a complex DAG that basically repeats it's stream six times for six different sources. So I've been using a for loop to dynamically create my streams like this (small example): sources = ['source_1', 'source_2', 'source_3', 'source_4', 'source_5', 'source_6'] for source in sources: source_task_1 = PythonOperator( task_id=source + '_create_emr', dag=dag, provide_context=True, retries=10, python_callable=execute_arl_source_emr_creation,

How do I trigger an Airflow DAG via the REST API?

The 1.10.0 documentation says I should be able to make a POST against /api/experimental/dags//dag_runs to trigger a DAG run, but instead when I do this, I receive an error: <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN"> <title>400 Bad Request</title> <h1>Bad Request</h1> <p>The browser (or proxy) sent a request that this server could not understand.</p>

why airflow googlecloudstorageobjectsensor is not serching for the partial object names (eg:file*.csv)

gcs_sensor in airflow is not working for the partial object name. for example in object I have given myfile* but it is not working. can you suggest a solution that takes partial names to search in the google cloud storage file_watcher = GoogleCloudStorageObjectSensor( task_id='filesensor', bucket='poc-1', object='myfile*', google_cloud_conn_id='google_cloud_default', dag=example_dag )

Airflow How to fix WARNING - schedule_interval is used for <taskname> though it has been deprecated as a task parameter

I keep getting the warning: WARNING - schedule_interval is used for <Task(BigQueryOperator): mytask>, though it has been deprecated as a task parameter, you need to specify it as a DAG parameter instead I am on google composer 1.9.0 schedule_interval = datetime.timedelta(days=1) default_args = { 'owner': 'e', 'catchup': False, 'start_date': datetime.datetime(2019, 10, 25), } with models.DAG( dag_id=f"mydag", schedule_interval=schedule_interval, def

How to launch a Dataflow job with Apache Airflow and not block other tasks?

Airflow tasks of the type DataflowTemplateOperator take a long time to complete. This means other tasks can be blocked by it (correct?). When we run more of these tasks, that means we would need a bigger Cloud Composer cluster (in our case) to execute tasks that are essentially blocking while they shouldn't be (they should be async operations). Options Option 1: just launch the job and airflow job is successful Option 2: write a wrapper as explained here and use a reschedule mode as expla

Airflow DAGs are successful but tasks are not running

I am trying to run a sample Airflow DAG. The DAG status is successful, but tasks are not running. Can anyone help me understand why? I've tried the following: changing the start date to past date restarting web server and scheduler Here is my code: import datetime as dt from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator print('test1') def greet(): print('Writing in file') print('testing th

Probem installing dnspython Airflow docker puckel

Im running the dockerfile with this extra installtions && pip install pymongo[srv] \ && pip install dnspython \ The log in built command say that was success pip install dnspython Collecting dnspython Downloading dnspython-1.16.0-py2.py3-none-any.whl (188 kB) Installing collected packages: dnspython Successfully installed dnspython-1.16.0 But in airflow is impossible to see that dnspython was setup But from air

Airflow Jinja Template Variable Email ID not rendering when using ON_FAILURE_CALLBACK

Need help on rendering the jinja template email ID in the On_failure_callback. I understand that rendering templates work fine in the SQL file or with the operator having template_fields .How do I get below code rendered the jinja template variable It works fine with Variable.get('email_edw_alert'), but I don't want to use Variable method to avoid hitting DB Below is the Dag file import datetime import os from functools import partial from datetime import timedelta from airflow.models import

Airflow Filter by multiple LDAP groups in Flask App Builder using AUTH_LDAP_SEARCH_FILTER

I am configuring the Airflow FAB UI to use LDAP authentication. Currently I have this working, but I can only filter by users who are members of one group in LDAP. AUTH_LDAP_SEARCH_FILTER = '(memberOf=CN=group1)' I would like to authenticate users who are in one of two groups. I have tried multiple variations of the below in the AUTH_LDAP_SEARCH_FILTER = '|(memberOf=CN=group1)(memberOf=CN=group2)' but I am always greeted with this error in the Airflow Webserver logs when tryi

Error uploading file using S3FileTransformOperator in Apache Airflow

I'm converting a XML file into Parquet. This is the relevant code: File with DAG("s3-dag", default_args=default_args, schedule_interval= '@once') as dag: t1 = BashOperator( task_id='bash_test', bash_command='echo 1', dag=dag ) transformer = S3FileTransformOperator( task_id='S3_ETL_OP', source_s3_key='s3://<my bucket>/origin/105.xml', dest_s3_key='s3://<my bucket>/s3/105.parquet', replace=False,

Fetching metrics using airflow-exporter get error

I installed airflow-exporter following but after I triggered the example dag example_bash_operator and then accessed http://<host_ip>:8080/admin/metrics. I got error: TypeError: ("float() argument must be a string or a number, not 'NoneType'", Metric(airflow_dag_run_duration, Maximum duration of currently running dag_runs for each DAG in seconds, gauge, , [Sample(name='airflow_dag_run_duration', labels={'dag_id': '

Airflow long running job killed after 1 hr but the task is still in running state

I need a help with a long running dag that keeps on failing after an hour but the task is still in running mode. I have been using Airflow for the past 6-8 months. I with the help of our infrastructure team has setup Airflow in our company. It’s running on a AWS ECS cluster. The dags sit in an EFS instance with throughput set to provisioned. The logs are written in a s3 bucket. For the worker aws ecs service we have an autoscaling policy that scales up the cluster at night 1 AM and scales down a

airflow dag problem in run - run schedule in loop

i create this dag for run command on remote ssh in schedule. from datetime import timedelta from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago from airflow.contrib.hooks.ssh_hook import SSHHook as sscon from airflow.contrib.operators.ssh_operator import SSHOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(0), 'email': [''], 'email_on_failure': Fal

Schedule airflow to run on weekdays

What's the correct way to schedule an Airflow Dag so that it only runs on weekdays? I've tried including the hour offset both in the start_date and in the schedule_interval expression, but it's still not starting at the desired time. (I want it to start on January 28th at 1am UTC, and run at that time Tuesday through Saturday.) default_args = { 'owner': 'me', 'depends_on_past': False, 'start_date': datetime.datetime(2017, 1, 28, 0, 0), 'email': [''], 'email_on_

airflow install fails when using pipenv

I'm trying to install python package airflow into a virtualenv that has been created using pipenv, inside a docker container. It fails with an error that I'm clueless about. Here is my Dockerfile: FROM python:3.6-stretch WORKDIR /tmp # Define build args ARG http_proxy ARG https_proxy ARG no_proxy RUN apt-get update && \ apt-get -y install default-jdk # Detect JAVA_HOME and export in bashrc. # This will result in something like this being added to /etc/bash.bashrc # export J

Airflow: How to template or pass the output of a Python Callable function as arguments to other tasks?

I'm new to Airflow and working on making my ETL pipeline more re-usable. Originally, I had a few lines of top-level code that would determine the job_start based on a few user input parameters, but I found through much searching that this would trigger at every heartbeat which was causing some unwanted behavior in truncating the table. Now I am investigating wrapping this top level code into a Python Callable so it is secure from the refresh, but I am unsure of the best way to pass the output t

Apache Airflow: Executor reports task instance finished (failed) although the task says its queued

Our airflow installation is using CeleryExecutor. The concurrency configs were # The amount of parallelism as a setting to the executor. This defines # the max number of task instances that should run simultaneously # on this airflow installation parallelism = 16 # The number of task instances allowed to run concurrently by the scheduler dag_concurrency = 16 # Are DAGs paused by default at creation dags_are_paused_at_creation = True # When not using pools, tasks are run in the "default pool

Airflow How to set equal priority_weight to task that depends on another task

I have 8 set of tasks. Each set is a series of task: task1 >> task2 >> task3. task3 depends on task2, so as task2 depends on task1. My problem is that task2 never starts until all task1 are finished. So in order for set1.task2 to start it must run set8.task1 first. My initial research is something about priority_weight that can be included in the default_args for the DAG. I have learned that task1 would have higher priority_weight to its downstream. Is there a way in such that all priority we

Can't delete dag from airflow UI after deleting from dag_bag

I deleted dag from airflow dag_bag and corresponding .pyc file as well. When I try to delete the same dag from airflow UI it is showing this error: Dag id MY_DAG_ID is still in DagBag. Remove the DAG file first. The airflow version I am using is 1.10.4 Even after restarting airflow I'm not able to delete from UI. I was using 1.10.3 previously, but I never faced this issue. I was able to delete from UI after deleting from dags folder. When I click on that dag in UI it is showing : DAG "MY_DAG

Airflow Can I create an adhoc parameterized DAG from scheduler job DAG running once a minute

I'm researching Airflow to see if it is a viable fit for my use case and not clear from the documentation if it fits this scenario. I'd like to schedule a job workflow per customer based on some very dynamic criteria which doesn't fall into the standard "CRON" loop of running every X minutes etc. (since there is some impact of running together) Customer DB Customer_id, "CRON" desired interval (best case) 1 , 30 minutes 2 , 2 hours ... ... <<<<<<< thousands of these potenti

Airflow: Dynamic Task dependent on itself completing first

I need to create a DAG that deletes and updates a few different tables. The updates happen by region. The database I work with does a table lock when doing any deletes or updates, so I would need to structure my dag like below, so that I avoid trying to update the same table at the same time. --> equals dependent on Florida_table_1 --> Carolina_table_1 --> Texas_table_1 Florida_table_2 --> Carolina_table_2 --> Texas_table_2 Florida_table_3 --> Carolina_table_3 --> Texas_tabl

Airflow log file exception

I am using apache airflow for running my dags. I am getting an exception as: *** Log file does not exist: /opt/airflow/logs/download2/download2/2020-07-26T15:00:00+00:00/1.log *** Fetching from: http://fb3393f5f01e:8793/log/download2/download2/2020-07-26T15:00:00+00:00/1.log *** Failed to fetch log file from worker. HTTPConnectionPool(host='fb3393f5f01e', port=8793): Max retries exceeded with url: /log/download2/download2/2020-07-26T15:00:00+00:00/1.log (Caused by NewConnectionError('<urllib

Airflow - Only run a DAG if all tasks on another DAG were successfull

I'm kinda new to DAGs, Airflow and Python Syntax(I learned coding from Java), but I have a DAG with about 10 tasks that are independent from one another and I have another DAG that can only run if all 10 tasks were sucessefull. Because the way I have it, if one task fails, the DAG still runs the other ones and the DAG is marked as successful. (Which is what I want) Is there a way to make a new task (task 11) that goes through the other tasks and check their state? I can't find a function that re

Airflow - Not able to delete dag

I am trying following command to delete a DAG, however getting following error. Command airflow delete_dag example_http_operator Error: File "/usr/local/lib64/python2.7/site-packages/sqlalchemy/engine/", line 588, in do_execute cursor.execute(statement, parameters) sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: dag [SQL: SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_i

Airflow Dynamically create tasks from output of another task

I can currently dynamically create tasks in airflow by loading a config file and looping through the values: s3 = boto3.resource("s3") s3Client = boto3.client("s3") result = s3Client.get_object(Bucket='mybucket', Key='file1/inputs.json') models = json.loads(result["Body"].read().decode()) for item in models: run_model = PythonOperator( task_id=item["col1"] +"_run_model", python_callable=batch_runner, dag=dag,

Maximum number of DAGs in Airflow and Cloud Composer

Is there a maximum number of DAGs that can be run in 1 Airflow or Cloud Composer environment? If this is dependent on several factors (Airflow infrastructure config, Composer cluster specs, number of active runs per DAG etc..) what are all the factors that affect this?

Apache Airflow - 1.10 - How to access SQL files durin run

TL;DR How do I make it so that assets (SQL files, text files) are reachable by the tasks in my dag Hello Folks, I'm running a single node Airflow installation (1.10.12). I have a dag that uses SQL statements. For the sake of readability, I'd like to keep these statements in their own SQL file. So my project looks like so: MyProject - - airflow_ftp_assets (directory) - WEX_HH_Upsert_SQL.sql When my dag runs, it complains that it can't find the SQL file. This

sqllite error when running airflow scheduler on mac

I keep getting the error below when running airflow? any ideas? This happens as soon as i kick off the airflow scheduler. It was previously working but seems to have stopped when i updated my mac. Traceback (most recent call last): File "/Users/user1/.pyenv/versions/3.6.10/lib/python3.6/site-packages/sqlalchemy/engine/", line 1248, in _execute_context cursor, statement, parameters, context File "/Users/user1/.pyenv/versions/3.6.10/lib/python3.6/site-packages/sqlalch

  1    2   3   4   5   6  ... 下一页 最后一页 共 8 页