Flink作业执行之 3.StreamGraph
1. StreamGraphGenerator
在前文了解Transformation和StreamOperator后。接下来Transformation将转换成StreamGraph,即作业的逻辑拓扑结构。
在env.execute()
方法中调用getStreamGraph
方法生成StreamGraph
实例。StreamGraph
由StreamGraphGenerator
负责生成。
StreamGraphGenerator
实例中封装了前面生成的Transformation集合。
private StreamGraph getStreamGraph(List<Transformation<?>> transformations) {
synchronizeClusterDatasetStatus();
// 根据Transformation生成StreamGraphGenerator,然后再生成StreamGraph
return getStreamGraphGenerator(transformations).generate();
}
// 创建StreamGraphGenerator实例
private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> transformations) {
// ...
return new StreamGraphGenerator(
// 传入transformations集合
new ArrayList<>(transformations), config, checkpointCfg, configuration)
.setStateBackend(defaultStateBackend)
.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
.setSavepointDir(defaultSavepointDirectory)
.setChaining(isChainingEnabled)
.setUserArtifacts(cacheFile)
.setTimeCharacteristic(timeCharacteristic)
.setDefaultBufferTimeout(bufferTimeout)
.setSlotSharingGroupResource(slotSharingGroupResources);
}
generate
方法核心逻辑如下,首先创建一个空的StreamGraph实例。然后通过遍历transformations集合,依次调用transform
方法完成StreamGraph中节点和边实例的创建,并将节点和边加入到StreamGraph中。
public StreamGraph generate() {
// 先实例化一个空的StreamGraph
streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
// ...
for (Transformation<?> transformation : transformations) {
// 依次处理transformation
transform(transformation);
}
final StreamGraph builtStreamGraph = streamGraph;
// ...
return builtStreamGraph;
}
一个作业中生成的StreamGraph和Transformation实例数量而言,一个任务会生成多个Transformation实例,单个Transformation实例中仅包含直接上游实例。但一个任务只会生成一个StreamGraph实例,StreamGraph是一个完整的图的表示,其中包含了图中全部的节点和边。
2. TransformationTranslator
TransformationTranslator
负责根据执行模式将给定的 Transformation
转换为其运行时实现,即StreamGraph。其接口中定义了批和流处理模式下的方法。
public interface TransformationTranslator<OUT, T extends Transformation<OUT>> {
// 批模式
Collection<Integer> translateForBatch(final T transformation, final Context context);
// 流模式
Collection<Integer> translateForStreaming(final T transformation, final Context context);
}
在StreamGraphGenerator实例的创建过程中会通过静态代码块生成如下Transformation
和TransformationTranslator
的映射关系。包含了Transformation
子类中除FeedbackTransformation
和CoFeedbackTransformation
之外的其他剩余子类,共计16个值。
FeedbackTransformation
和CoFeedbackTransformation
未提供TransformationTranslator的实现,需要单独处理。
static {
Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
tmp = new HashMap<>();
tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator<>());
tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
tmp.put(KeyedBroadcastStateTransformation.class, new KeyedBroadcastStateTransformationTranslator<>());
tmp.put(CacheTransformation.class, new CacheTransformationTranslator<>());
// 将映射关系保存在成员属性中
translatorMap = Collections.unmodifiableMap(tmp);
}
3. StreamGraph
StreamGraph表示Flink执行图,描述了作业的逻辑拓扑结构,并以DAG的形式描述作业中算子之间的上下游连接关系。
StreamGraph
实现了Pipeline
接口,接口中没有任何内容,仅为了表示DataStream
中的StreamGraph
和DataSet
中的Plan
都属于Pipeline
类型。
StreamGraph
表示DAG,DAG中节点和边分别使用StreamNode
和StreamEdge
类表示。
三者的UML关系如下
StreamGraph
中将全部的StreamNode
节点保存在其集合属性中,同时单独指定了Source节点和sink节点,相关属性如下
// 全部节点数据,key=节点id,即transformation的id
private Map<Integer, StreamNode> streamNodes;
// 表示Source的节点id
private Set<Integer> sources;
// 表示sink的节点id
private Set<Integer> sinks;
private Set<Integer> expandedSinks;
// 旁路输出的节点信息
private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
// 虚拟节点信息,key = 新生成的虚拟节点id,tuple3为虚拟节点信息.f0=此虚拟节点的上游节点id
private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>> virtualPartitionNodes;
一个节点最基础的信息有:节点id/名称、入/出边信息、工作内容。
上述基础信息维护在以下属性中。其中operatorFactory和jobVertexClass属性表示节点工作内容。
// 节点id
private final int id;
// 并行度
private int parallelism;
private int maxParallelism;
// 节点名称
private final String operatorName;
// 工作内容:算子信息
private StreamOperatorFactory<?> operatorFactory;
// 节点入边
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
// 节点出边
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
// 工作内容:StreamTask实例,表示该节点所属的StreamTask子类型。
private final Class<? extends TaskInvokable> jobVertexClass;
StreamEdge中表示边基本信息的属性字段如下。
// 边id
private final String edgeId;
// 边连接的上游节点id,即StreamNode.id
private final int sourceId;
// 边连接的下游节点id
private final int targetId;
// 上游节点名称
private final String sourceOperatorName;
// 下游节点名称
private final String targetOperatorName;
4. 生成StreamGraph
对实现了TransformationTranslator接口的16种Transformation而言(上述静态代码内容),Transformation转换过程大致如下。
首先从Transformation中获取id、name、输入类型(即上游Transformation中的输出类型,Source没有)、输出类型、StreamOperatorFactory实例等内容作为节点和边实例中基础信息。
Class<? extends TaskInvokable> vertexClass
信息在具体的TransformationTranslator子类中进行指定。
然后通过StreamGraph中addNode
方法,生成StreamNode实例并将该实例加入到Map<Integer, StreamNode> streamNodes
,如果是Source则将节点id加入到Set<Integer> sources
,如果是sink则将节点id加入到Set<Integer> sinks
。
4.1. 生成节点
addNode
方法如下
protected StreamNode addNode(
Integer vertexID, // transformation id
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends TaskInvokable> vertexClass, // StreanTask实例
StreamOperatorFactory<?> operatorFactory, // transformation中的工厂实例
String operatorName) { // transformation name,如果是Source或sink,则分别拼接"Source: "或"Sink: "前缀
// ...
// 生成节点实例
StreamNode vertex =
new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
vertexClass);
// 将节点添加到map
streamNodes.put(vertexID, vertex);
return vertex;
}
节点id和名称直接取自Transformation的id和名称。如果是Source或sink,则分别拼接"Source: "或"Sink: "前缀。
节点工作内容来自Transformation中的StreamOperatorFactory实例。
生成节点实例后,根据Transformation中的并行度,设置节点的并行度。如果Transformation中未设置并行度时,获取配置中默认的并行度。
注意,此时的节点并不包含边属性。
4.2. 设置节点的边
节点可能存在入边和出边,根据节点是否存在上游决定是否需要设置入边信息,完成当前节点的入边设置同时,将该边设置为相应上游节点的出边。每个节点的出边由下游节点触发设置。
Source作为头节点,不存在上游,因此source节点不存在设置边的操作。
当节点存在上游节点时,通过StreamGraph中addEdge
方法完成节点边的设置。如果存在多个上游,则循环调用addEdge方法。
public void addEdge(
Integer upStreamVertexID, // 上游节点id
Integer downStreamVertexID, // 当前节点id
int typeNumber, // 只有co-task任务才会涉及到,多条入边的序号
IntermediateDataSetID intermediateDataSetId) {
// 注意在这里调用时, partitioner、outputTag、exchangeMode传null值
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
null, // 注意
new ArrayList<String>(),
null, // 注意
null, // 注意
intermediateDataSetId);
}
private void addEdgeInternal(
Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
StreamExchangeMode exchangeMode,
IntermediateDataSetID intermediateDataSetId) {
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
// 上游节点是旁路输出节点时
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
// 递归调用
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
null,
outputTag,
exchangeMode,
intermediateDataSetId);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
// 上游节点是虚拟节点时
int virtualId = upStreamVertexID;
// 上游(虚拟)节点的父节点id
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
// 获取了虚拟节点的partitioner
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
// 获取了虚拟节点的数据exchangeMode
exchangeMode = virtualPartitionNodes.get(virtualId).f2;
// 递归调用
addEdgeInternal(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
outputNames,
outputTag,
exchangeMode,
intermediateDataSetId);
} else {
// 创建边实例
createActualEdge(
upStreamVertexID,
downStreamVertexID,
typeNumber,
partitioner,
outputTag,
exchangeMode,
intermediateDataSetId);
}
}
createActualEdge
方法完成边的创建并将边添加到上下游节点中。
private void createActualEdge(
Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
OutputTag outputTag,
StreamExchangeMode exchangeMode,
IntermediateDataSetID intermediateDataSetId) {
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// 设置数据分区
partitioner = ...
// 算子之间的数据交换模式
if (exchangeMode == null) {
exchangeMode = StreamExchangeMode.UNDEFINED;
}
int uniqueId = getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();
// 生成边实例
StreamEdge edge =
new StreamEdge(
upstreamNode,
downstreamNode,
typeNumber,
partitioner,
outputTag,
exchangeMode,
uniqueId,
intermediateDataSetId);
// 最后将生成的边分别添加到上游节点的List<StreamEdge> outEdges和当前节点的List<StreamEdge>
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
5. WordCount实例的StreamGraph
WordCount示例中,按照DataStream的转换流程将得到如下关系的Transformation信息。因此StreamGraph将由如下Transformation得到。
前文提到Transformation分为物理和虚拟两大类,物理类别将会生成节点,而虚拟类别将生成边。上述生成的5个Transformation中PartitionTransformation属于虚拟类别,而其余4个均数据物理类别。既然虚拟类别将生成边,那么其处理方式定然与其他4个节点有所不同。
5.1. 虚拟节点
在PartitionTransformationTranslator中translateInternal
方法中,将调用StreamGraph中的addVirtualPartitionNode
方法,将PartitionTransformation加入到表示虚拟节点集合中。并没有生成节点的操作。
private Collection<Integer> translateInternal(
final PartitionTransformation<OUT> transformation,
final Context context,
boolean supportsBatchExchange) {
checkNotNull(transformation);
checkNotNull(context);
final StreamGraph streamGraph = context.getStreamGraph();
// 上游Transformation,在本示例中为OneInputTransformation,tId=2
final Transformation<?> input = ...
List<Integer> resultIds = new ArrayList<>();
StreamExchangeMode exchangeMode = ...;
for (Integer inputId : context.getStreamNodeIds(input)) {
// 当前作业中已生成5个Transformation实例,因此下一个自增id为6
final int virtualId = Transformation.getNewNodeId();
// 加入虚拟节点集合中
streamGraph.addVirtualPartitionNode(
// inputId即上游id=2,virtualId=6
inputId, virtualId, transformation.getPartitioner(), exchangeMode);
resultIds.add(virtualId);
}
// 最后将新生成的ids返回
return resultIds;
}
// StreamGraph中的addVirtualPartitionNode方法
public void addVirtualPartitionNode(
Integer originalId,
Integer virtualId,
StreamPartitioner<?> partitioner,
StreamExchangeMode exchangeMode) {
virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, partitioner, exchangeMode));
}
处理完成PartitionTransformation之后,StreamGraph实例中的虚拟节点集合中Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>> virtualPartitionNodes
中便存在了元素。
接下来处理ReduceTransformation,其上游节点是虚拟节点,因此在生成边时,在addEdgeInternal
方法中将会执行上游节点是虚拟节点时得逻辑分支。
还记得前面提到的addEdgeInternal
方法中存在3个逻辑判断吗?
private void addEdgeInternal(
Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
StreamExchangeMode exchangeMode,
IntermediateDataSetID intermediateDataSetId) {
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
// 上游节点是旁路输出节点时
// ...
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
// 上游节点是虚拟节点时
// 本实例中ReduceTransformation的上游节点为虚拟节点,因此将会执行这段逻辑。
int virtualId = upStreamVertexID; // 6,为什么是6在介绍PartitionTransformationTranslator处理逻辑时有解释
// 上游(虚拟)节点的父节点
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; // 2,即OneInputTransformation的id
if (partitioner == null) {
// 获取了虚拟节点的partitioner
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
// 获取了虚拟节点的exchangeMode
exchangeMode = virtualPartitionNodes.get(virtualId).f2;
// 递归调用时,PartitionTransformation从上下游中消失了,仅仅从PartitionTransformation中获取了partitioner和exchangeMode信息。
addEdgeInternal(
upStreamVertexID, // 2
downStreamVertexID, // 4
typeNumber,
partitioner,
outputNames,
outputTag,
exchangeMode,
intermediateDataSetId);
} else {
// 生成边信息
// ...
}
}
原始的Transformation关系中,ReduceTransformation的上游是PartitionTransformationT(tId=3),从前面PartitionTransformationTranslator处理逻辑中已知,PartitionTransformation并未真正生成节点,而是加入到了表示虚拟节点集合中,因此获PartitionTransformation的上游节点即OneInputTransformation(tId=2),作为ReduceTransformation在StreamGraph的父节点。
最终得到的StreamGraph示意图如下图所示(省略并行度信息)。
当作业中存在旁路输出时,处理方式与虚拟节点类似,不在赘述。
6. 一点理解
试着理解下为什么要将Transformation转成StreamGraph?
最初设计者的设计和初衷不得而知,以下纯粹个人理解。
Transformation到StreamGraph转换可以看作是链表结构到图结构的转换。
Transformation是类似于单向链表的结构,并且还是指向上游的逆向链表,从其中任何一个Transformation开始只能获取其上游数据。必须遍历全部的Transformation实例后,才能得到完成的作业信息。
Transformation结构中和上游是嵌套关系,这样多个实例中都最终指向同一个上游,处理关系时存在冗余。
但是Transformation的好处是生成方便。每次DataStream转换时,十分清楚的知道上游是谁,直接将上游实例传递到当前实例中即可。
StreamGraph是图的结构。可以使用图的处理方式快速处理节点关系。同时也更接近最终的作业执行拓扑结构。