Airflow源码分析(2)-xcom部分
xcom简介
XComs(cross-communication)使得任务之间可以交换信息,允许更细粒度的控制和状态共享。XComs包含key, value, timestamp, 同时也包含创建xcom的任务实例的task_id, dag_id, execution_date等。
Task可以在运行时通过xcom_push(key, value)
发送任意可序列化成JSON的对象。(其实支持pickle,但是已经被废弃)另外,task.execute()的返回值会默认发送到xcom,key为return_value。
Task中也可以通过xcom_pull(task_id(s), key)
获取到一个或多个task的xcom值。不局限于下游。
详细介绍参考:xcom
详细分析
调用方式
在任务执行前,会先生成运行时的上下文context,然后调用task.execute(context)。因此在Operator的execute(context)方法中可以通过context[‘ti’]得到当前的TaskInstance,然后调用xcom_push
和xcom_pull
这里的context包含很多字段,比如ti, task, dag对象,还有Jinja渲染需要的字段。
Context的字段定义
1 | { |
xcom.py
Xcom是Model定义类,在里面实现了set(), get_one(), get_many(), delete()方法,对应数据库的增删查。
taskinstance.py
上面说的xcom_pull()和xcom_push在BaseOperator和TaskInstance中均有定义。BaseOperator中只是简单调用了TaskInstance中的方法。
TaskInstance部分代码:
1 | def xcom_push( |