个人技术分享

反压

Flink反压是一个在实时计算应用中常见的问题,特别是在流式计算场景中。以下是对Flink反压的详细解释:

一、反压释义

反压(backpressure)意味着数据管道中某个节点成为瓶颈,其处理速率跟不上上游发送数据的速率,从而需要对上游进行限速。在Flink等实时计算框架中,反压通常是从某个节点传导至数据源,并降低数据源(如Kafka consumer)的摄入速率。

二、反压原因

  1. 数据倾斜:数据分布不均,导致个别task处理数据过多。
  2. 算子性能问题:可能某个节点逻辑很复杂,如sink节点很慢或lookup join热查询慢等。
  3. 流量陡增:如大促时流量激增,或者使用了数据炸开的函数。

三、反压影响

  1. 任务处理性能出现瓶颈:例如,在消费Kafka时,可能会出现消费Kafka Lag。
  2. Checkpoint时间长或失败:因为某些反压会导致barrier需要花很长时间才能对齐,从而影响任务的稳定性。
  3. State状态变大:由于数据处理速度不匹配,可能导致系统内部状态堆积。
  4. Kafka数据积压:当Flink作业无法及时处理Kafka中的数据时,会导致数据在Kafka中积压。
  5. OOM(内存溢出):严重的反压可能导致系统资源耗尽,进而引发内存溢出等问题。

四、反压机制实现方式

在Flink中,反压机制可以通过以下两种方式实现:

  1. 阻塞式反压:当下游任务无法及时处理上游任务生成的数据时,上游任务会被阻塞,直到下游任务处理完毕。这种方式可以保证数据不丢失,但会造成延迟增加。
  2. 异步非阻塞式反压:此方式的具体实现和细节可能因Flink版本和配置而异,但通常旨在通过异步处理和非阻塞操作来减轻反压的影响。

五、解决思路

  1. 优化数据分布:通过调整数据分区策略或重新设计数据模型来减少数据倾斜。
  2. 提升算子性能:针对性能瓶颈的算子进行优化,如简化逻辑、增加资源等。
  3. 限流与缓冲:在数据源端或关键节点前设置限流策略和缓冲区,以平滑处理流量陡增的情况。
  4. 监控与告警:建立完善的监控和告警机制,及时发现并处理反压问题。

综上所述,Flink反压是一个需要关注的问题,它可能影响到实时计算应用的性能和稳定性。通过理解反压的原理和影响,并采取相应的解决措施,可以有效地提升Flink作业的处理能力和稳定性。

监控

Flink Web

Flink Web 界面提供了一个选项卡来监控正在运行 jobs 的反压行为。

Task 性能指标

task(SubTask)的每个并行实例都可以用三个一组的指标评价:

  • backPressuredTimeMsPerSecond,subtask 被反压的时间
  • dleTimeMsPerSecond,subtask 等待某类处理的时间
  • busyTimeMsPerSecond,subtask 实际工作时间 在任何时间点,这三个指标相加都约等于1000ms。

这些指标每两秒更新一次,上报的值表示 subtask 在最近两秒被反压(或闲或忙)的平均时长。 当工作负荷是变化的时需要尤其引起注意。如,一个以恒定50%负载工作的 subtask 和另一个每秒钟在满负载和闲置切换的 subtask 的busyTimeMsPerSecond值相同,都是500ms。
在内部,反压根据输出 buffers 的可用性来进行判断的。 如果一个 task 没有可用的输出 buffers,那么这个 task 就被认定是在被反压。 相反,如果有可用的输入,则可认定为闲置,

WebUI

WebUI 集合了所有 subTasks 的反压和繁忙指标的最大值,并在 JobGraph 中将集合的值进行显示。除了显示原始的数值,tasks 也用颜色进行了标记,使检查更加容易。
**加粗样式**

闲置的 tasks 为蓝色,完全被反压的 tasks 为黑色,完全繁忙的 tasks 被标记为红色。 中间的所有值都表示为这三种颜色之间的过渡色。

反压状态

在 Job Overview 旁的 Back Pressure 选项卡中,可以找到更多细节指标。
在这里插入图片描述

如果看到 subtasks 的状态为 OK 表示没有反压。HIGH 表示这个 subtask 被反压。状态用如下定义:

  • OK: 0% <= 反压比例 <= 10%
  • LOW: 10% < 反压比例 <= 50%
  • HIGH: 50% < 反压比例 <= 100%

Prometheus监控

在Flink中使用Prometheus进行反压监测通常涉及配置Flink的metrics系统以及Prometheus的配置。以下是配置Flink以使用Prometheus进行反压的基本步骤:

配置

  1. 在Flink配置文件中启用Prometheus metrics(通常是flink-conf.yaml):
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: <prometheus-pushgateway-host>
metrics.reporter.promgateway.port: <prometheus-pushgateway-port>
metrics.reporter.promgateway.jobName: <job-name>
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
  1. 确保Prometheus配置了PushGateway,并且Prometheus能够从Flink TaskManagers推送指标。
  • 下载并解压 Prometheus Pushgateway:
wget https://github.com/prometheus/pushgateway/releases/download/v1.4.1/pushgateway-1.4.1.linux-amd64.tar.gz
tar xvzf pushgateway-1.4.1.linux-amd64.tar.gz
cd pushgateway-1.4.1.linux-amd64
  • 创建一个系统服务文件 /etc/systemd/system/pushgateway.service:
[Unit]
Description=Pushgateway
After=network.target
 
[Service]
User=nobody
Group=nobody
Type=simple
ExecStart=/path/to/pushgateway
 
[Install]
WantedBy=multi-user.target
  • 启动并使 Pushgateway 服务随系统启动:
sudo systemctl daemon-reload
sudo systemctl start pushgateway
sudo systemctl enable pushgateway
  • 配置 Prometheus 来从 Pushgateway 拉取数据。在 Prometheus 配置文件 (prometheus.yml) 中添加以下内容:
scrape_configs:
  - job_name: 'pushgateway'
    static_configs:
      - targets: ['localhost:9091']
  1. 配置Prometheus PushGateway,通常在Prometheus配置文件中(prometheus.yml):
scrape_configs:
  - job_name: 'flink-metrics'
    honor_labels: true
    static_configs:
      - targets: ['<pushgateway-host>:<pushgateway-port>']
  1. 重启Flink集群和Prometheus以应用配置更改。

配置告警规则,推送alertmanager进行告警通知推送
通过Grafana显示Flink运行状态

方案

增加资源

  • 增加CPU资源: 调整TaskManager的CPU配置和并行化任务
  • 增加内存资源:调整TaskManager的内存配置和优化数据结构
  • 增加其他资源:例如:使用SSD或RDMA等网络加速设备、增加GPU资源

注意:

  • 避免过度资源分配:虽然增加资源可以缓解反压问题,但过度分配资源可能导致资源浪费和成本增加。因此,在增加资源之前,需要仔细评估任务的实际需求和资源使用情况。
  • 结合其他优化措施:除了增加资源外,还可以结合其他优化措施来进一步缓解反压问题。例如,优化处理逻辑、减少计算复杂度、使用更高效的数据结构等。

数据倾斜

  1. 数据倾斜定义
    数据倾斜是指数据的分布严重不均,造成一部分数据很多,一部分数据很少的局面。在Flink中,这通常表现为部分节点处理的数据量远大于其他节点。
  2. 数据倾斜的原因
  • 业务原因:如订单数据中某些城市的订单量远大于其他城市。
  • 技术原因:大量使用KeyBy、GroupBy等操作,错误地使用了分组Key,人为产生数据热点。
  1. 解决方案
  • 业务层面
    • 尽量避免热点key的设计,例如将热点城市分成不同的区域,并进行单独处理。
    • 在数据预处理阶段对数据进行均衡处理,如使用随机前缀打散key。
  • 技术层面
    • 调整方案打散原来的key,避免直接聚合。
    • 使用Flink提供的二次聚合等策略,先对打散后的数据进行聚合,再还原为真正的key进行二次聚合。
    • 优化join操作,将条目少的表/子查询放在Join的左边,减少内存溢出的几率。
    • 使用MapJoin处理小表关联大表的情况,避免数据倾斜。
  • 配置层面
    • 设置合理的mapreduce的task数,能有效提升性能。
    • 在数据量较大的情况下,慎用count(distinct)等操作。
    • 对小文件进行合并,减少文件数据源带来的倾斜问题。## 算子性能

算子性能

原因:

  • 下游算子性能差:下游算子sub-task的处理性能低下,无法及时消费上游算子产生的数据。
  • 外部接口访问:算子需要频繁访问外部接口,如数据库或API,这些操作耗时长,导致数据处理速率下降。
  • 代码问题:用户代码执行效率低下,例如存在频繁的阻塞操作或性能瓶颈。

判断:

  • 通过Flink Web UI的BackPressure模块,观察算子的颜色和数值来判断是否出现反压。红色- 表示当前算子繁忙,有反压;绿色表示当前算子不繁忙,没有反压。
  • 通过对比不同SubTask处理的数据量,判断是否存在数据倾斜导致的个别SubTask性能下降。

解决

  • 限制数据源消费速度:在数据源处设置限流措施,确保数据匀速消费,避免速度不均导致的反压。
  • 关闭Checkpoint:在数据回溯期间关闭Checkpoint,以减少barrier对齐对性能的影响。完成数据回溯后再重新开启Checkpoint。
  • 优化代码:检查并优化用户代码,减少阻塞操作和性能瓶颈,提高算子处理效率。
  • 增加计算资源:根据实际需要增加计算资源,如增加计算节点、CPU和内存等,提高系统的整体处理能力。
  • 动态调整并行度:根据系统负载情况动态调整任务的并行度,将任务分配到更多的计算节点上,以提高处理能力。
  • 重分区:通过重分区将数据均匀地分布到不同的分区中,减少数据倾斜并提高并行度。
  • 使用缓冲区:设置缓冲区来暂存数据,避免在下游算子处理速度不足时导致数据丢失或延迟增加。

调大并行度

**并行度:**并行度(Parallelism)是指Flink任务中每个算子的并行实例数。增加并行度意味着更多的任务实例将同时处理数据,从而提高了系统的整体处理能力。

调大并行度

  1. 分析原因:
  • 在调大并行度之前,首先需要分析反压的具体原因。常见原因包括资源不足、数据倾斜、算子性能问题等。
  • 使用Flink的监控工具(如Web UI、Metrics等)来观察任务的资源使用情况和性能瓶颈。
  1. 确定合理的并行度:
  • 可以通过压测来确定合理的并行度。例如,先获取高峰期的QPS(每秒处理的数据量),然后测试不同并行度下系统的处理能力,找到能够处理该QPS而不发生反压的并行度。
  • 也可以考虑使用经验法则,如根据数据源(如Kafka)的分区数来设置并行度。
  1. 设置并行度:
  • 在Flink程序中,可以通过多种方式来设置并行度。
    • 在代码中:通过StreamExecutionEnvironment的setParallelism()方法来设置全局并行度。
    • 在算子层次:对于单个算子,可以调用其setParallelism()方法来设置该算子的并行度。
    • 在配置文件或提交任务时:通过配置文件或提交任务时的参数来设置并行度。
  • 需要注意的是,当使用savepoints时,应该考虑设置最大并行度。这可以确保在从savepoint恢复任务时,能够改变特定算子或整个程序的并行度,而不会超过设定的上限。
  1. 监控与调整:
  • 在调整并行度后,需要持续监控任务的运行情况,观察是否解决了反压问题,以及是否出现了新的问题(如资源利用率不足、资源浪费等)。
  • 根据监控结果,可以进一步调整并行度或其他相关配置,以达到最佳的性能和稳定性。

注意:

  1. 避免过度并行化:虽然增加并行度可以提高系统的处理能力,但过度并行化可能导致资源利用率下降、管理复杂性增加等问题。因此,在调整并行度时需要权衡利弊。
  2. 考虑数据倾斜:数据倾斜可能导致部分节点处理的数据量远大于其他节点,从而引发反压。在调整并行度时,需要考虑数据倾斜的情况,并采取相应的措施来平衡数据分布。
  3. 优化其他配置:除了调整并行度外,还可以考虑优化其他相关配置,如内存大小、缓存策略等,以进一步提升系统的性能和稳定性。

限流与缓冲

限流机制

Flink通过水位线(Watermark)机制来实现限流。水位线是一个时间戳,表示当前处理的数据已经到达的位置。通过控制水位线的传播速度,Flink可以限制数据的流量,避免数据的堆积和延迟。当下游节点处理速度较慢时,水位线的传播速度会相应减慢,从而限制上游节点的生产速度。

缓冲机制

Flink在网络传输和TaskManager内部都使用了缓冲机制来处理反压。

  • 网络传输缓冲:在网络传输过程中,Flink使用NetworkBufferPool来管理内存块(MemorySegment)。每个Task都有一个输入区域(InputGate)和输出区域(ResultPartition),它们使用Buffer来存储和传输数据。当数据从上游节点传输到下游节点时,首先会存储在Buffer中,等待下游节点消费。如果下游节点消费速度较慢,Buffer中的数据会逐渐累积,形成反压。此时,Flink会根据Buffer的使用情况来限制上游节点的生产速度。
  • TaskManager内部缓冲:在TaskManager内部,Flink为每个Task创建了输入和输出的LocalBufferPool。这些缓冲池用于存储和传输数据。当Task的消费速度跟不上生产速度时,LocalBufferPool中的数据会逐渐累积,形成反压。Flink会根据LocalBufferPool的使用情况来限制Task的生产速度。

关闭Checkpoint

关闭 Checkpoint。关闭 Checkpoint 可以将 barrier 对齐这一步省略掉,促使任务能够快速回溯数据。然后等数据回溯完成之后,再将 Checkpoint 打开