个人技术分享

chunjun涉及到的操作有注册metric、同步metric、metric判断。

一. 管理(注册、同步)metric

1. BaseRichInputFormat对metric的管理

注册metric的操作主要是在open方法中,主要逻辑如下:


this.context = (StreamingRuntimeContext) getRuntimeContext();  

//获取flink全局参数,用于通过脏数据管理器的参数配置
ExecutionConfig.GlobalJobParameters params =  
        context.getExecutionConfig().getGlobalJobParameters();  
DirtyConfig dc = DirtyConfUtil.parseFromMap(params.toMap());  
//注册脏数据管理器
this.dirtyManager = new DirtyManager(dc, this.context);  
  
。。。

if (!initialized) {  
    //初始化累加器,
    initAccumulatorCollector();  
    //初始化行大小
    initRowSizeCalculator();  
    //初始统计metric
    initStatisticsAccumulator();  
    //初始化消费速率
    initByteRateLimiter();  
    //初始化cp相关文件
    initRestoreInfo();  
    initialized = true;  
}

nbsp;
开启metric同步线程,每taskmanager心跳+1s同步一次数据。

private void initAccumulatorCollector() {  
    String lastWriteLocation =  
            String.format("%s_%s", Metrics.LAST_WRITE_LOCATION_PREFIX, indexOfSubTask);  
    String lastWriteNum =  
            String.format("%s_%s", Metrics.LAST_WRITE_NUM__PREFIX, indexOfSubTask);  
  
    accumulatorCollector =  
            new AccumulatorCollector(  
                    context,  
                    Arrays.asList(  
                            Metrics.NUM_READS,  
                            Metrics.READ_BYTES,  
                            Metrics.READ_DURATION,  
                            Metrics.WRITE_BYTES,  
                            Metrics.NUM_WRITES,  
                            lastWriteLocation,  
                            lastWriteNum)); 
    accumulatorCollector.start();  
}

注册如下指标

  • 先从RuntimeContext获取指标,如果没有则注册meitric到jobmanager中。
  • 或者通过MetricGroup()进行注册,在BaseMetric类中有详细逻辑,见下节分析。
private void initStatisticsAccumulator() {  
    numReadCounter = getRuntimeContext().getLongCounter(Metrics.NUM_READS);  
    bytesReadCounter = getRuntimeContext().getLongCounter(Metrics.READ_BYTES);  
    durationCounter = getRuntimeContext().getLongCounter(Metrics.READ_DURATION);  
  
    inputMetric = new BaseMetric(getRuntimeContext());  
    inputMetric.addMetric(Metrics.NUM_READS, numReadCounter, true);  
    inputMetric.addMetric(Metrics.READ_BYTES, bytesReadCounter, true);  
    inputMetric.addMetric(Metrics.READ_DURATION, durationCounter);  
  
    inputMetric.addDirtyMetric(Metrics.DIRTY_DATA_COUNT, this.dirtyManager.getConsumedMetric());  
    inputMetric.addDirtyMetric(  
            Metrics.DIRTY_DATA_COLLECT_FAILED_COUNT,  
            this.dirtyManager.getFailedConsumedMetric());  
}

 

openInputFormat中可以定义:用户的merticReporter


...
if (useCustomReporter()) {  
    customReporter =  
            DataSyncFactoryUtil.discoverMetric(  
                    config, getRuntimeContext(), makeTaskFailedWhenReportFailed());  
    customReporter.open();  
}

 

2. 通过BaseMetric管理metric

BaseMetric提供注册metric group、注册metric 以及等待metric同步(到jobmanager)等能力。

BaseMetric主要有addMetric、addDirtyMetric、waitForReportMetrics、getChunjunMetricGroup等方法,其中构造方法中,通过runtimeContext注册了chunjunMetricGroup、chunjunDirtyMetricGroup。具体逻辑如下:

注册metric group:


public BaseMetric(RuntimeContext runtimeContext) {  

    //获取全局变量:DELAY_PERIOD_MILL
    ExecutionConfig.GlobalJobParameters params =  
            runtimeContext.getExecutionConfig().getGlobalJobParameters();  
    Map<String, String> confMap = params.toMap();  
    this.DELAY_PERIOD_MILL =  
            Long.parseLong(  
                    String.valueOf(confMap.getOrDefault(DELAY_PERIOD_MILL_KEY, "20000")));
    //注册metric group:chunjun、output
    chunjunMetricGroup =  
            runtimeContext  
                    .getMetricGroup()  
                    .addGroup(  
                            Metrics.METRIC_GROUP_KEY_CHUNJUN,  
                            Metrics.METRIC_GROUP_VALUE_OUTPUT);  
    //注册metric group:DirtyData、output
    chunjunDirtyMetricGroup =  
            chunjunMetricGroup.addGroup(  
                    Metrics.METRIC_GROUP_KEY_DIRTY, Metrics.METRIC_GROUP_VALUE_OUTPUT);  
}

 
注册metric:有gauge、meter等metric类型。

//注册指标
public void addMetric(String metricName, LongCounter counter) {  
    addMetric(metricName, counter, false);  
}

public void addMetric(String metricName, LongCounter counter, boolean meterView) {  
    metricCounters.put(metricName, counter);  
    chunjunMetricGroup.gauge(metricName, new SimpleAccumulatorGauge<>(counter)); 
    if (meterView) {  
        chunjunMetricGroup.meter(  
                metricName + Metrics.SUFFIX_RATE, new SimpleLongCounterMeterView(counter, 20));  
    }  
}  

//metricName: metric名字
//counter: 具体计数的counter
public void addDirtyMetric(String metricName, LongCounter counter) {  
    metricCounters.put(metricName, counter);  
    chunjunDirtyMetricGroup.gauge(metricName, new SimpleAccumulatorGauge<>(counter));  
}

 

等待metric指标:当taskslot结束之后,需要等待一段时间把未同步的指标同步给jobmanager。

public void waitForReportMetrics() {  
    try {  
        Thread.sleep(DELAY_PERIOD_MILL);  
    } catch (InterruptedException e) {  
        ThreadUtil.sleepMilliseconds(DELAY_PERIOD_MILL);  
        log.warn("Task thread is interrupted");  
    }  
}

 

二. AccumulatorCollector:metric同步

AccumulatorCollector实现了周期性合并并获取全局metric,主流程是:每taskmanager心跳后+1s,同步一次全局的metric给taskslot。

AccumulatorCollector有start、close、collectAccumulator、getAccumulatorValue、getLocalAccumulatorValue等方法。

1. 启动线程池,周期性更新metric信息

/** 启动线程池,周期性更新累加器信息 */  
public void start() {  
    scheduledExecutorService.scheduleAtFixedRate(  
            this::collectAccumulator, 0, period, TimeUnit.MILLISECONDS);  
}

收集累加器信息,具体逻辑是

调用requestJob(向jobmanager进行rpc请求,合并并获取全局metric),同步全局metric。具体步骤是:

  • 每个slot将自己的metric同步到全局(jobmanager)中
  • 脏数据同步给每个slot:拿到全局的脏数据metric传给taskslot,可在每个taskslot中判断处理的脏数据是否超过全局设置的。
  • 如果没有注册metric,则获取maxValue指标,这里主要是jdbc用到了此指标。
public void collectAccumulator() {  
    CompletableFuture<ExecutionGraphInfo> executionGraphInfoCompletableFuture =  
            gateway.requestJob(Time.seconds(10));  
    ExecutionGraphInfo executionGraphInfo;  
    try {  
        executionGraphInfo = executionGraphInfoCompletableFuture.get();  
    } catch (Exception e) {  
        // 限制最大出错次数,超过最大次数则使任务失败,如果不失败,统计数据没有及时更新,会影响速率限制,错误控制等功能  
        collectErrorTimes++;  
        if (collectErrorTimes > MAX_COLLECT_ERROR_TIMES) {  
            // 主动关闭线程和资源,防止异常情况下没有关闭  
            close();  
            throw new RuntimeException(  
                    "The number of errors in updating statistics data exceeds the maximum limit of 100 times. To ensure the correctness of the data, the task automatically fails");  
        }  
        return;  
    }  
    StringifiedAccumulatorResult[] accumulatorResult =  
            executionGraphInfo.getArchivedExecutionGraph().getAccumulatorResultsStringified();  
    for (StringifiedAccumulatorResult result : accumulatorResult) {  
        ValueAccumulator valueAccumulator = valueAccumulatorMap.get(result.getName());  
        if (valueAccumulator != null) {  
            valueAccumulator.setGlobal(Long.parseLong(result.getValue()));  
        } else if (result.getName().equals(Metrics.MAX_VALUE)) {  
            rdbMaxFuncValue = result.getValue();  
        }  
    }  
}

 

2. 获取全局指标、本地指标

获取指定累加器信息

public long getAccumulatorValue(String name, boolean needWaited) {  
    if (needWaited) {  
        waited();  
    }  
    ValueAccumulator valueAccumulator = valueAccumulatorMap.get(name);  
    if (valueAccumulator == null) {  
        return 0;  
    }  
    return valueAccumulator.getGlobal();  
}

获取每个subtask的本地指标

/**  
 * 根据名称获取指定累加器的本地value  
 * * @param name 累加器指标名称  
 * @return  
 */
public long getLocalAccumulatorValue(String name) {  
    ValueAccumulator valueAccumulator = valueAccumulatorMap.get(name);  
    if (valueAccumulator == null) {  
        return 0;  
    }  
    return valueAccumulator.getLocal().getLocalValue();  
}

 

3. 资源回收

/** 关闭线程池 */  
public void close() {  
    if (scheduledExecutorService != null  
            && !scheduledExecutorService.isShutdown()  
            && !scheduledExecutorService.isTerminated()) {  
        scheduledExecutorService.shutdown();  
    }  
}

 

三. 小结

我们大致了解了chunjun

  • 在什么时机注册metric指标:在BaseRichInputFormat中的open方法中,在连接器消费数据前,进行相关metric的注册;
  • chunjun提供了管理(注册、指标更新、等待metric同步等)metric的基类:BaseMetric;
  • 周期获取全局metric:以便每个subtask进行metric的指标判断; 等metric管理能力。