Airflow源码分析(1)-executor部分
Executor简介
Executor是在scheduler和worker之间的一个组件,主要作用是接收scheduler发过来的可执行task,然后根据自身类型决定task的运行环境。
目前有四种类型:
- SequentialExecutor:Dag在单进程中顺序执行,用于测试跟开发
- LocalExecutor:Dag在本地多进程执行,也是用于测试跟开发
- CeleryExecutor:通过Celery下发任务到分布式集群。
- DaskExecutor:下发任务到Dask集群上执行。Dask不支持队列。
流程分析
scheduler经过各种验证以后,终于将task标记为queued状态。
接下来,scheduler调用executor.queue_command将task_instance交给实际的executor。
1 | self.executor.queue_command(simple_task_instance, command, priority=priority, queue=queue) |
BaseExecutor
我们打开base_executor.py
,这个文件中BaseExecutor
类作为具体executor的基类,可以看出executor大概的一个流程。
1 | # 两个任务队列和一个事件buffer都是dict类型的,key是task_id |
总结一下executor的功能:
- 接收来自scheduler的task,加入到自身维护的queued_tasks中
- 在接收到scheduler的心跳后,打印自身的一些状态,在trigger_tasks(…)中将task从queued_tasks转移到running中,并最终调用execute_async(…)异步执行命令并调用sync(…)收集状态
- scheduler可以调用get_event_buffer(…)获取executor的事件。executor改变自身维护的queued_tasks和running队列中task的状态时,都会上报事件到event_buffer中,从而可以被scheduler获取到
SequentialExecutor
接下来以SequentialExecutor为例,看下execute_async和sync具体是怎么实现的
1 | def execute_async(self, key, command, queue=None, executor_config=None): |
那么,task的调用到这里就结束了吗?
一个subprocess.check_call(command, close_fds=True)
就完了?task本身的状态是在哪改变的?对于HttpOperator,这个command又是如何执行的?
眉头一皱,发现事情并没有那么简单。
这里的command具体是什么呢?通过日志我们可以看到,其实是调用airflow CLI的run命令。
1 | [2019-07-04 15:42:25,046] {base_executor.py:59} INFO - Adding to queue: ['airflow', 'run', 'example_json', 'echo_env', '2019-07-04T07:42:24.824155+00:00', '--local'] |
cli.run
继续来看cli.run()
1 | # run()中加载了配置文件,获取dag并实例化了TaskInstance,最终调用了_run()方法 |
接下来是base_job.py
,
1 | def run(self): |
代码看到这里发现又是subprocess.Popen(cmd),那么这个时候的cmd内容是什么呢?可以从日志中看到。
1 | [2019-07-04 13:15:38,406] {base_task_runner.py:133} INFO - Running: ['airflow', 'run', 'example_json', 'echo_env', '2019-07-04T05:15:36.239140+00:00', '--job_id', '120', '--raw', '--cfg_path', '/tmp/tmpg2123epz'] |
会发现又是airflow run,但是这个时候的cmd参数更多了,而且有一个--raw
的参数。
TaskInstance._run_raw_task
回到_run(...)
,这个时候再执行命令会去另一个分支。于是会执行:
1 | # cli.py |
到这里,从scheduler将某个task分发给executor开始,一直到task被真正地执行的流程就完成了。
这里只分析了SequentialExecutor,对于CeleryExecutor,只是通过CeleryExecutor将cmd分发到远程worker上面执行了,接下来的流程是一样。
最后总结一下task是如何在worker上运行起来的:
- 在worker上执行
airflow run <dag_id> <task_id> <execution_date> --local
命令 - 进入到_run函数,选择local分支执行
- 在local分支中绑定一个
LocalTaskJob
,并选择一个BaseTaskRunner
作为task的执行环境。目前实现有StandardTaskRunner
和CgroupTaskRunner
两种 - 在TaskRunner中调用run_command方法在子进程中继续执行命令,这时候命令为
airflow run <dag_id> <task_id> <execution_date> --raw
- 再次进入_run函数,选择raw分支执行
- 在raw分支中调用_run_raw_task(),最后真正执行task.execute()方法