个人技术分享

一,场景

二,生产者

(1)声明交换机和两个队列,队列绑定交换机,分别设置routing key。

QUEUE_INFORM_EMAIL 的routing key为ROUTINGKEY_EMAIL,QUEUE_INFORM_SMS的routing key为ROUTINGKEY_SMS。

@Configuration
public class RabbitmqConfig {

    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    //声明交换机
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM() {
        //durable(true) 持久化,mq重启之后交换机还在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    //声明QUEUE_INFORM_EMAIL队列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }
    //声明QUEUE_INFORM_SMS队列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        return new Queue(QUEUE_INFORM_SMS);
    }

    //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
    @Bean
    public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                          @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }

}

(2)采用多线程的方式向MQ中发送10w条消息。

DEFAULT_CORE_POOL_SIZE 的值在cpu密集型中是处理器的数量-1,保证处理器能够得到充分的利用并且给其他的线程留出一部分处理器资源;在非cpu密集型当中为处理器的数量*2,或者*3,都有可能,保证处理器是在一直工作中。

指定的routing key是“inform.sms”,所以会匹配到QUEUE_INFORM_SMS队列,那么在消费方监听

QUEUE_INFORM_SMS队列的可以拿到这些消息。

@SpringBootTest
class MqsendApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    private static final int DEFAULT_TASK_NUM =10_0000;
    private static  final int DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();
    private static  final long DEFAULT_KEEP_ALIVE_TIME = 10L;

    private static final int  BLOCKINGQUEUE_SIZE =10_0000;

    @Test
    public void Producer_topics_springbootTest() throws InterruptedException {

        //使用rabbitTemplate发送消息
        String message = "send email message to user";

        long start = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(DEFAULT_TASK_NUM);
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE,DEFAULT_CORE_POOL_SIZE*2,DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS,new LinkedBlockingQueue<>(BLOCKINGQUEUE_SIZE));
        for (int i = 0; i < DEFAULT_TASK_NUM; i++) {
            threadPoolExecutor.execute(() -> {
                rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.sms", message);
//                System.out.println("msg send!");
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                latch.countDown();
            });
        }
        latch.await();
        System.out.println(System.currentTimeMillis()-start);
    }


}

(3)打开网页,查看消息队列中存放的消息数量。

(正在添加中,状态是running,速率是2k条每秒)

(插入成功,状态从running->idle,ready中已经有了10w条数据)

三,消费者

接着我们在接收方去消费这些消息,也就是消费者去监听队列,将队列中的消息取出来。

(1)配置(这和二中的配置是一样的,两个服务那个先启动,就由那个来创建交换机,队列)

@Configuration
public class RabbitmqConfig {

    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    //声明交换机
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM() {
        //durable(true) 持久化,mq重启之后交换机还在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    //声明QUEUE_INFORM_EMAIL队列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }
    //声明QUEUE_INFORM_SMS队列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        return new Queue(QUEUE_INFORM_SMS);
    }

    //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
    @Bean
    public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                          @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }

}

(2)监听者来监听两个队列。

@Component
public class ReceiveHandler {//消费者监听队列 config和队列绑定,然后监听
    private AtomicInteger count=new AtomicInteger();
    //监听email队列
    @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
    public void receive_email(Object msg, Message message, Channel channel){
        System.out.println("QUEUE_INFORM_EMAIL msg"+msg);
    }
    //监听sms队列
    @RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
    public void receive_sms(Object msg, Message message, Channel channel){
        System.out.println("QUEUE_INFORM_SMS msg"+msg);
        byte[] body = message.getBody();
        String s = new String(body);
        System.out.println(s);
//        channel.
        int i = count.incrementAndGet();
        System.out.println(i);

    }
}

(3)启动消费者服务,查看控制台输出和rabbitmq管理器。

可以看到,在最佳的情况下,发送消息的速度差不多可以达到3k/s。