个人技术分享

Flink1.17之前实现JdbcLookup谓词下推

需求背景

Flink在1.17版本之前,flink-connector-jdbc的LookupJoin是不支持on条件下推的,例如on device_id=‘1’,查询SQL中是不会包含device_id='1’的条件,相关issue:https://issues.apache.org/jira/browse/FLINK-32321,在1.19版本该问题已经解决。谓词不下推会导致每次查询的数据量变多,本篇文章主要介绍如何在1.17支持谓词下推

技术实现

在JdbcDynamicTableSource中是已经支持谓词下推到连接器端的,支持连接器的Lookup查询没有将谓词下推应用到SQL语句上,所以我们主要变动如下两个类:

  1. JdbcDynamicTableSource
  2. JdbcRowDataLookupFunction

修改JdbcDynamicTableSource

位置:org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource

目的:在getLookupRuntimeProvider方法中将将谓词下推的查询条件以及参数传入到LookupFunction中。

修改内容:如下代码

    @Override
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
        // JDBC only support non-nested look up keys
        String[] keyNames = new String[context.getKeys().length];
        for (int i = 0; i < keyNames.length; i++) {
            int[] innerKeyArr = context.getKeys()[i];
            Preconditions.checkArgument(
                    innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
            keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
        }
        final RowType rowType = (RowType) physicalRowDataType.getLogicalType();

        JdbcRowDataLookupFunction lookupFunction =
                new JdbcRowDataLookupFunction(
                        options,
                        lookupMaxRetryTimes,
                        DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
                        DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
                        keyNames,
                        rowType,
                        // 将谓词下推的查询条件以及参数传入到LookupFunction中
                        resolvedPredicates,
                        pushdownParams
                );
        if (cache != null) {
            return PartialCachingLookupProvider.of(lookupFunction, cache);
        } else {
            return LookupFunctionProvider.of(lookupFunction);
        }
    }

修改JdbcRowDataLookupFunction

位置:org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction

目的:接受下推的条件及参数,重新拼装SQL,并在执行的时候将参数传入

修改内容:

  1. 构造方法支持接受下推的条件及参数两个变量,拼接条件语句,并将条件中的’?‘参数占位符替换为’:predicate_1’以支持FieldNamedPreparedStatement
 public JdbcRowDataLookupFunction(
            JdbcConnectorOptions options,
            int maxRetryTimes,
            String[] fieldNames,
            DataType[] fieldTypes,
            String[] keyNames,
            RowType rowType,
            List<String> resolvedPredicates,
            Object[] pushdownParams
    ) {
        checkNotNull(options, "No JdbcOptions supplied.");
        checkNotNull(fieldNames, "No fieldNames supplied.");
        checkNotNull(fieldTypes, "No fieldTypes supplied.");
        checkNotNull(keyNames, "No keyNames supplied.");
        this.connectionProvider = new SimpleJdbcConnectionProvider(options);
        List<String> nameList = Arrays.asList(fieldNames);
        DataType[] keyTypes =
                Arrays.stream(keyNames)
                        .map(
                                s -> {
                                    checkArgument(
                                            nameList.contains(s),
                                            "keyName %s can't find in fieldNames %s.",
                                            s,
                                            nameList);
                                    return fieldTypes[nameList.indexOf(s)];
                                })
                        .toArray(DataType[]::new);
        this.maxRetryTimes = maxRetryTimes;


        // 添加谓词条件查询的逻辑
        List<String> predicateNames = new ArrayList<>(resolvedPredicates.size());
        List<String> fieldNamedPredicates = new ArrayList<>(resolvedPredicates.size());

        for (String pred : resolvedPredicates) {
            while (pred.contains("?")){
                String predicateName = "predicate_"+predicateNames.size();
                pred = pred.replaceFirst("\\?", ":" + predicateName);
                predicateNames.add(predicateName);
            }

            fieldNamedPredicates.add(String.format("(%s)", pred));
        }
        String joinedConditions = fieldNamedPredicates.isEmpty() ? "" : " AND " + String.join(" AND ", fieldNamedPredicates);
        this.pushdownParams = pushdownParams;
        this.conditionNames = ArrayUtils.concat(keyNames, predicateNames.toArray(new String[0]));

        this.query =
                options.getDialect()
                        .getSelectFromStatement(options.getTableName(), fieldNames, keyNames) + joinedConditions;
        LOG.debug("Query generated for JDBC lookup: " + query);


        JdbcDialect jdbcDialect = options.getDialect();
        this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType);
        this.lookupKeyRowConverter =
                jdbcDialect.getRowConverter(
                        RowType.of(
                                Arrays.stream(keyTypes)
                                        .map(DataType::getLogicalType)
                                        .toArray(LogicalType[]::new)));
    }
  1. 修改establishConnectionAndStatement方法,在创建Statement是将新生成的conditionNames作为fieldNames传入
    private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
        Connection dbConn = connectionProvider.getOrEstablishConnection();
        statement = FieldNamedPreparedStatement.prepareStatement(dbConn, query, conditionNames);

    }
  1. 新增paddingPredicates方法用来想Statement中填充参数
    private FieldNamedPreparedStatement paddingPredicates() throws SQLException {
        // 进行谓词填充
        int pushdowParamStartIndex = conditionNames.length - pushdownParams.length;
        for (int i = pushdowParamStartIndex; i < conditionNames.length; i++) {
            Object param = pushdownParams[i - pushdowParamStartIndex];
            if (param instanceof String) {
                statement.setString(i, (String) param);
            } else if (param instanceof Long) {
                statement.setLong(i, (Long) param);
            } else if (param instanceof Integer) {
                statement.setInt(i, (Integer) param);
            } else if (param instanceof Double) {
                statement.setDouble(i, (Double) param);
            } else if (param instanceof Boolean) {
                statement.setBoolean(i, (Boolean) param);
            } else if (param instanceof Float) {
                statement.setFloat(i, (Float) param);
            } else if (param instanceof BigDecimal) {
                statement.setBigDecimal(i, (BigDecimal) param);
            } else if (param instanceof Byte) {
                statement.setByte(i, (Byte) param);
            } else if (param instanceof Short) {
                statement.setShort(i, (Short) param);
            } else if (param instanceof Date) {
                statement.setDate(i, (Date) param);
            } else if (param instanceof Time) {
                statement.setTime(i, (Time) param);
            } else if (param instanceof Timestamp) {
                statement.setTimestamp(i, (Timestamp) param);
            } else {
                // extends with other types if needed
                throw new IllegalArgumentException(
                        "Padding predicate failed. Parameter "
                                + i
                                + " of type "
                                + param.getClass()
                                + " is not handled (yet).");
            }
        }
        return statement;
    }
  1. 修改lookup方法,在执行查询之前,进行参数填充
    /**
     * This is a lookup method which is called by Flink framework in runtime.
     *
     * @param keyRow lookup keys
     */
    @Override
    public Collection<RowData> lookup(RowData keyRow) {
        for (int retry = 0; retry <= maxRetryTimes; retry++) {
            try {
                statement.clearParameters();
                // 谓词填充
                statement = paddingPredicates();
                statement = lookupKeyRowConverter.toExternal(keyRow, statement);
                try (ResultSet resultSet = statement.executeQuery()) {
                    ArrayList<RowData> rows = new ArrayList<>();
                    while (resultSet.next()) {
                        RowData row = jdbcRowConverter.toInternal(resultSet);
                        rows.add(row);
                    }
                    rows.trimToSize();
                    return rows;
                }
            } catch (SQLException e) {
                LOG.error(String.format("JDBC executeBatch error, retry times = %d", retry), e);
                if (retry >= maxRetryTimes) {
                    throw new RuntimeException("Execution of JDBC statement failed.", e);
                }

                try {
                    if (!connectionProvider.isConnectionValid()) {
                        statement.close();
                        connectionProvider.closeConnection();
                        establishConnectionAndStatement();
                    }
                } catch (SQLException | ClassNotFoundException exception) {
                    LOG.error(
                            "JDBC connection is not valid, and reestablish connection failed",
                            exception);
                    throw new RuntimeException("Reestablish JDBC connection failed", exception);
                }

                try {
                    Thread.sleep(1000L * retry);
                } catch (InterruptedException e1) {
                    throw new RuntimeException(e1);
                }
            }
        }
        return Collections.emptyList();
    }