Airflow源码分析(2)-xcom部分
xcom简介
XComs(cross-communication)使得任务之间可以交换信息,允许更细粒度的控制和状态共享。XComs包含key, value, timestamp, 同时也包含创建xcom的任务实例的task_id, dag_id, execution_date等。
Task可以在运行时通过xcom_push(key, value)
发送任意可序列化成JSON的对象。(其实支持pickle,但是已经被废弃)另外,task.execute()的返回值会默认发送到xcom,key为return_value。
Task中也可以通过xcom_pull(task_id(s), key)
获取到一个或多个task的xcom值。不局限于下游。
详细介绍参考:xcom
详细分析
调用方式
在任务执行前,会先生成运行时的上下文context,然后调用task.execute(context)。因此在Operator的execute(context)方法中可以通过context[‘ti’]得到当前的TaskInstance,然后调用xcom_push
和xcom_pull
这里的context包含很多字段,比如ti, task, dag对象,还有Jinja渲染需要的字段。
Context的字段定义
1 | { |
xcom.py
Xcom是Model定义类,在里面实现了set(), get_one(), get_many(), delete()方法,对应数据库的增删查。
taskinstance.py
上面说的xcom_pull()和xcom_push在BaseOperator和TaskInstance中均有定义。BaseOperator中只是简单调用了TaskInstance中的方法。
TaskInstance部分代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54def xcom_push(
self,
key,
value,
execution_date=None):
# 这里日期设置为未来一个时间,到时才可以被其他task发现
"""
:param execution_date: if provided, the XCom will not be visible until
this date. This can be used, for example, to send a message to a
task on a future date without it being immediately visible.
:type execution_date: datetime
"""
if execution_date and execution_date < self.execution_date:
raise ValueError(
'execution_date can not be in the past (current '
'execution_date is {}; received {})'.format(
self.execution_date, execution_date))
XCom.set(
key=key,
value=value,
task_id=self.task_id,
dag_id=self.dag_id,
execution_date=execution_date or self.execution_date)
def xcom_pull(
self,
task_ids=None,
dag_id=None,
key=XCOM_RETURN_KEY,
include_prior_dates=False):
# 这里是为了得到那些设置为未来可见的xcom对象
"""
:param include_prior_dates: If False, only XComs from the current
execution_date are returned. If True, XComs from previous dates
are returned as well.
:type include_prior_dates: bool
"""
if dag_id is None:
dag_id = self.dag_id
pull_fn = functools.partial(
XCom.get_one,
execution_date=self.execution_date,
key=key,
dag_id=dag_id,
include_prior_dates=include_prior_dates)
if is_container(task_ids):
return tuple(pull_fn(task_id=t) for t in task_ids)
else:
return pull_fn(task_id=task_ids)