个人技术分享

简述

  1. 从mongoDB数据库中读取千万级数据。
  2. 根据某个字段进行汇总统计数据。
  3. 将获取的数据保存到redis中,获取最终的统计数据。

maven

 		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb</artifactId>
            <version>1.1.0-1.18</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.8.2</version>
        </dependency>
		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>   
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>     

代码

RedisExampleMapper

package com.wfg.flink.connector.redis;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

/**
 * @author wfg
 */
public class RedisExampleMapper implements RedisMapper<Tuple2<String, Integer>> {

    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
    }

    @Override
    public String getKeyFromData(Tuple2<String, Integer> data) {
        return data.f0;
    }

    @Override
    public String getValueFromData(Tuple2<String, Integer> data) {
        return data.f1.toString();
    }
}

根据名字统计访问次数

package com.wfg.flink.connector.mongodb;


import cn.hutool.core.date.DateUtil;
import com.alibaba.fastjson2.JSONObject;
import com.wfg.flink.connector.dto.KafkaPvDto;
import com.wfg.flink.connector.redis.RedisExampleMapper;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.mongodb.source.MongoSource;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.bson.BsonDocument;

import static com.wfg.flink.connector.constants.Constants.MONGO_TEST_PV_COLLECTION;

/**
 * @author wfg
 * 根据名字统计访问次数
 */
public class MongoAllNameRedisCounts {

    public static void main(String[] args) throws Exception {
        String startTime = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss");
        System.out.println("StartTime:" + startTime);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 开启Checkpointing,设置Checkpoint间隔
        env.enableCheckpointing(30000);
        // 设置Checkpoint模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 设置最小Checkpoint间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 设置最大并发Checkpoint数目
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 使用RocksDB作为状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.setParallelism(10);

        // 配置MongoDB源
        MongoSource<String> mongoSource = MongoSource.<String>builder()
                .setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin")
                .setDatabase("sjzz")
                .setCollection(MONGO_TEST_PV_COLLECTION)
                .setFetchSize(2048)
//                .setLimit(1000)
                .setNoCursorTimeout(true)
                .setPartitionStrategy(PartitionStrategy.SINGLE)
                .setPartitionSize(MemorySize.ofMebiBytes(64))
//                .setSamplesPerPartition(10)
                .setDeserializationSchema(new MongoDeserializationSchema<>() {
                    @Override
                    public String deserialize(BsonDocument document) {
                        return document.toJson();
                    }

                    @Override
                    public TypeInformation<String> getProducedType() {
                        return BasicTypeInfo.STRING_TYPE_INFO;
                    }
                })
                .build();
        // 创建MongoDB数据流
        DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
// 转换数据,提取人名作为Key
        DataStream<Tuple2<String, Integer>> nameCountStream = sourceStream
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class);
                        return Tuple2.of(data.getUserName(), 1);
                    }
                })
                .keyBy(value -> value.f0)
                .reduce(new ReduceFunction<>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                    }
                });
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
        RedisSink<Tuple2<String, Integer>> sink = new RedisSink<>(conf, new RedisExampleMapper());
        nameCountStream.addSink(sink);
        // 输出结果
        env.execute("Flink MongoDB Name Count ");
        System.out.println("-----------------------------------");
        System.out.println("startTime: " + startTime);
        System.out.println("EndTime: " + DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss"));
    }
}

注意:
大数据生成参考:批量数据生成
中间件部署参考: