一步一个脚印,一天一道面试题
首先,为什么要读过的 Kafka 数据重写读一次呢?什么场景下要怎么做呢?
答:当任务失败,从检查点Checkpoint 开始重启时,检查点的数据是之前的了,就需要跟Kafka申请回退,从Checkpoint版的 offset 的数据读,会有一点重复消费

那具体怎么重复读 Kafka 呢?
跟 Kafka 提交之前的 offset 即可。这一步骤往往是 Flink 自动找 Kafka 读的,比如 Checkpoint 设置 EXCATLY_ONCE 的时候,如果从Checkpoint 重启,读取 Kafka时会回到检查点对应的 offset 数据开始读。
比如下面的代码:
public class KafkaEOSDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 代码中用到hdfs,需要导入hadoop依赖、指定访问hdfs的用户名
System.setProperty("HADOOP_USER_NAME", "atguigu");
// TODO 1、启用检查点,设置为精准一次
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// TODO 2.读取kafka
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
.setGroupId("atguigu")
.setTopics("topic_1")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.latest())
.build();
DataStreamSource<String> kafkasource = env
.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");
/**
* TODO 3.写出到Kafka
* 精准一次 写入Kafka,需要满足以下条件,缺一不可
* 1、开启checkpoint
* 2、sink设置保证级别为 精准一次
* 3、sink设置事务前缀
* 4、sink设置事务超时时间: checkpoint间隔 < 事务超时时间 < max的15分钟
*/
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
// 指定 kafka 的地址和端口
.setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
// 指定序列化器:指定Topic名称、具体的序列化
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("ws")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
// TODO 3.1 精准一次,开启 2pc
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// TODO 3.2 精准一次,必须设置 事务的前缀
.setTransactionalIdPrefix("atguigu-")
// TODO 3.3 精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000+"")
.build();
kafkasource.sinkTo(kafkaSink);
env.execute();
}
}
我是近未来,祝你变得更强!