个人技术分享

在使用 Apache Hudi 时,尤其是开启了流式读取(read.streaming.enabledtrue),配置查询类型非常重要。查询类型决定了如何读取数据,尤其是在处理更新和删除操作时。

查询类型选项

在 Hudi 中,常见的查询类型包括:

  1. Snapshot 查询
  2. Incremental 查询
  3. Read Optimized 查询

Snapshot 查询

Snapshot 查询类型会读取所有历史数据,并且会应用所有的增量更新和删除操作,确保你获取的是最新的快照数据。这种查询类型在需要完整视图并确保数据一致性时非常有用。

当开启流式读取时,通常需要设置查询类型为 snapshot 以确保能够实时获取最新的数据变更。

配置示例

假设你正在使用 Flink SQL 或 DataStream API 来读取 Hudi 数据表,可以参考以下配置:

Flink SQL

在 Flink SQL 中,可以使用如下 SQL 语句来设置:

CREATE TABLE hudi_table (
  ...
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///path/to/hudi/table',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4',
  'hoodie.datasource.query.type' = 'snapshot'
);
DataStream API

在 Flink DataStream API 中,可以通过配置 FlinkOptions 来设置:

Configuration conf = new Configuration();
conf.setString(FlinkOptions.PATH, "hdfs:///path/to/hudi/table");
conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
conf.setBoolean(FlinkOptions.READ_STREAMING_ENABLED, true);
conf.setString(FlinkOptions.QUERY_TYPE, "snapshot");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Table hudiTable = tableEnv.fromDataStream(dataStream, schema);

tableEnv.createTemporaryView("hudi_table", hudiTable);

TableResult result = tableEnv.executeSql("SELECT * FROM hudi_table");

总结

即使在开启流式读取时,明确设置查询类型(如 snapshot)仍然是一个好习惯。这样可以确保你获取到最新的数据,并且与 Hudi 的流式处理特性相配合,提供一致且准确的实时数据视图。

因此,除了设置 read.streaming.enabledtrue 之外,还应设置 hoodie.datasource.query.typesnapshot,以确保流式读取能够正确应用所有更新和删除操作。