侧边栏壁纸
博主头像
清如许博主等级

努力成长为一颗大树,一半洒落阴凉,一半沐浴阳光,非常沉默非常骄傲,从不依靠从不寻找

  • 累计撰写 80 篇文章
  • 累计创建 44 个标签
  • 累计收到 5 条评论

目 录CONTENT

文章目录

A001-消息队列客户端V1.0.0-使用手册.md

清如许
2022-08-05 / 0 评论 / 0 点赞 / 3 阅读 / 2,399 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-08-05,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

| --- | --- | --- |
| 更新日期 | 2022年04月19日 | 作 者 | 张留杨 |
| 保密等级 | C | 开放范围 | 研发测试部 |

版本历史

版 本更新日期作 者备注
1.0.02022年04月08日张留杨新建文档
1.1.02022年04月19日张留杨增加消息存储文档说明

本客户端最大的特点就是无需关注队列、交换机等配置,只需通过指定topic即可完成消息的发送与消费。

1. 支持功能

  •  支持自动配置
  •  支持消息自动解析
  •  支持消息分组消费
  •  支持消息共享
  •  支持延时队列
  •  支持深度自定义配置
  •  支持消息发布确认
  •  支持消息消费确认
  •  支持消息异步回调
  •  支持多数据源
  •  消息存储
  •  消费重新次数超限
  •  简单指标监控

2. 版本记录

  • 1.0.0

    • 支持基本的消息队列功能
  • 1.1.0

    • 支持消息存储
  • 2.0.0

    • Message.getId() 修改为Message.getMessageId()
    • 增加简单指标监控

3. 使用示例

mqclient-example

4. 快速开始

SpringBoot官方提供的starter是以队列为关注点进行消息分发,而此客户端的使用方式是以topic为关注点进行消息分发,无关乎队列。

4.1 引入依赖

<dependency>
   <groupId>com.dindo</groupId>
   <artifactId>mq-client</artifactId>
   <version>2.0.0</version>
</dependency>

4.2 配置

spring:
  mq:
    rabbit:
      client:
        default:
          host: 192.168.88.200
          port: 5672
          username: admin
          password: admin
          virtualHost: /

更多配置如下

spring:
  mq:
    rabbit:
      client:
        default:
          host: 192.168.88.200
          port: 5672
          username: admin
          password: admin
          virtualHost: /
          supportDelayed: true
          codecId: JSON
          connection:
            connectionTimeout: 3000
            maxConnection: 10
            minIdle: 10
            minIdleTime: 30000
            readBufferSize: 1048576
            writeBufferSize: 1048576
          ioThread:
            threadCount: 10
            queueSize: -1
            threadName: rabbitmq-io-thread
          producerThread:
            threadCount: 10
            queueSize: -1
            threadName: rabbitmq-producer-thread
          consumerThread:
            threadCount: 10
            queueSize: -1
            threadName: rabbitmq-consumer-thread
          publisher:
            exchange:
              name: dindo-cloud
          consumer:
            exchange:
              name: dindo-cloud
            channel:
              basicQos: 100
            queue:
              durable: true
              maxLength: 1000000
              maxLengthBytes: 1073741824

4.3 开启自动配置

在启动类上增加@EnableRabbitMqAutoConfiguration注解

import com.dindo.mqclient.anno.rabbit.EnableRabbitMqAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableRabbitMqAutoConfiguration
public class TestMqclinetApplication {

    public static void main(String[] args) {
        SpringApplication.run(TestMqclinetApplication.class, args);
    }
    
}

4.4 发布消息

在生产者的业务程序中,注入MqClient

import com.dindo.mqclient.MqClient;

@Resource
private MqClient defaultMqClient;

消息实体实现Message接口

package com.example.testmqclinet.test;

import com.dindo.mqclient.message.Message;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.UUID;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class UserChangeMessage implements Message {

    private String name;

    private int age;

    private boolean man;

    /**
     * getId
     *
     * @return {@link String}
     */
    @Override
    public String getMessageId() {
        return UUID.randomUUID().toString();
    }

}

然后直接调用该类的publish方法发送即可

defaultMqClient.publish("dindo-userChange", new UserChangeMessage("dindo", 10, true));

其中存在多个重载的方法。

public interface MqClient {

    /**
     * 发布消息
     *
     * @param topic   Topic
     * @param message 消息
     * @return {@link CallbackFuture}<{@link MessageContext}>
     */
    default <T extends Message> CallbackFuture<MessageContext> publish(String topic, T message);

    /**
     * 发布消息
     *
     * @param topic      Topic
     * @param message    消息
     * @param persistent 是否持久化
     * @return {@link CallbackFuture}<{@link MessageContext}>
     */
    default <T extends Message> CallbackFuture<MessageContext> publish(String topic, T message, boolean persistent);

    /**
     * 发布消息
     *
     * @param topic   Topic
     * @param message 消息
     * @param delay   延迟时间(Unit:ms)
     * @return {@link CallbackFuture}<{@link MessageContext}>
     */
    default <T extends Message> CallbackFuture<MessageContext> publish(String topic, T message, long delay);

    /**
     * 发布消息
     *
     * @param topic   Topic
     * @param message 消息
     * @param prop    消息配置
     * @return {@link CallbackFuture}<{@link MessageContext}>
     */
    <T extends Message> CallbackFuture<MessageContext> publish(String topic, T message, MessageProp prop);

}

4.5 接收消息

消费者需要在消息处理类上添加@MqSubscriber(topics = {"dindo-userChange"})注解,指定要监听topics和客户端名称即可。如果没有显示的指定客户端名称,则使用defaultMqClien
当消息处理类中有多个方法时,需要@ConsumerMethod标记具体消费方法

package com.example.testmqclinet.test;

import com.dindo.mqclient.anno.ConsumerMethod;
import com.dindo.mqclient.anno.MqSubscriber;
import com.dindo.mqclient.enums.Ack;
import lombok.CustomLog;
import lombok.Data;

@CustomLog
@MqSubscriber(topics = {"dindo-userChange"})
public class UserChangeMesageHandler {

    @ConsumerMethod
    public Ack handle(UserChangeMessage userChangeMessage) {
        logger.info("user-change:{}", userChangeMessage);
      
        return Ack.ACCEPT;
    }

}

如果对于特定消息特殊需求,可在消息处理类上增加@RabbitMqConsumerConfig注解,显示指定配置,如果没有指定则使用全局配置

import com.dindo.mqclient.anno.ConsumerMethod;
import com.dindo.mqclient.anno.MqSubscriber;
import com.dindo.mqclient.anno.rabbit.RabbitMqConsumerConfig;
import lombok.Data;

@RabbitMqConsumerConfig(
        channel = @RabbitMqConsumerConfig.ChannelConfig(basicQos = 1000),
        queue = @RabbitMqConsumerConfig.QueueConfig(durable = false, maxLength = 100000, maxLengthBytes = 1024),
        exchange = @RabbitMqConsumerConfig.ExchangeConfig(name = "dindo-cloud")
)
@MqSubscriber(topics = {"dindo-userChange", "dindo-userRegister"})
public class UserChangeMessageHandler {}

4.6 发送延迟消息

要使用延时队列,必须安装插件rabbitmq_delayed_message_exchange,要发送延时消息,配置文件需要开启supportDelayed配置

spring:
  mq:
    rabbit:
      client:
        default:
        	# 当前数据源是否支持发送延时消息。开启开配置后,默认会创建两个交换机。其中一个普通交换机,一个延时交换机
          supportDelayed: true

发送延时消息的方式相比之前,仅仅增加一个延时时间。其中延时时长的单位为毫秒

//发送一个延时时长为10s的消息
defaultMqClient.publish("testmqclient", new UserChangeMessage("dindo", 10, true), 10000);

4.7 多数据源

多数据源与单数据源配置属性相同,在配置文件中声明即可

spring:
  mq:
    rabbit:
      client:
        default:
          host: 192.168.88.200
          port: 5672
          username: admin
          password: admin
          virtualHost: /
          supportDelayed: true
        bill:
          host: 192.168.88.200
          port: 5672
          username: admin
          password: admin
          virtualHost: /
          supportDelayed: true

4.7.1 发布消息

首先注入MqClient,与单数据源的唯一区别就是bean的名称。默认向Spring容器中添加的实现类名称为“${数据源名称}MqClient”
以上面的配置文件为例,默认的bean名称为 defaultMqClientbillMqClient

import com.dindo.mqclient.MqClient;

@Resource
private MqClient defaultMqClient;

@Resource
private MqClient billMqClient;

其他操作同单数据源

4.7.2 接收消息

接收消息与单数据源基本一致,唯一的区别是在@MqSubscriber中指定clientName属性,指定当前从哪个数据源进行消费。

import com.dindo.mqclient.anno.MqSubscriber;

@MqSubscriber(topics = {"dindo-userChange"}, clientName = "bill")
public class UserChangeMessageHandler {}

4.8 消息分组

消息分组是指对多个消费者进行分组消费,比如用户中心发送了一条消息,账单和停车都需要进行消费,这里账单是一组、停车是一组
系统默认使用spring.application.name作为分组名称,用户可在消息消费类上指定@MqSubscriber属性中group = "bill"即可

import com.dindo.mqclient.anno.MqSubscriber;

@MqSubscriber(topics = {"dindo-userChange"}, group = "bill")
public class UserChangeMessageHandler {}

4.9 消息共享

消息共享是指同一组内进行消息共享,也就是同一个消费者集群内各个节点均进行消息消费
比如账单服务发送了一条topicflush_cache的消息,需要停车服务进行消费进行刷新系统缓存,而此时停车服务部署了三个副本,如果按照正常方式进行消息消费时,这三个副本进行消息竞争,也就是只有一个副本能够消费到消息进行刷新系统缓存,而其它两个副本没有竞争到消息进行消费。消息共享就是为了解决此类问题,从而让三个副本都可以消费到消息。

实现消息共享只需要在消息消费类上指定@MqSubscriber属性中share = true即可

import com.dindo.mqclient.anno.MqSubscriber;

@MqSubscriber(topics = {"dindo-userChange"}, share = true)
public class UserChangeMessageHandler {}

4.10 异步回调

支持在消息发布时对callback的操作

CallbackFuture<MessageContext> callbackFuture = defaultMqClient.publish("dindo-userChange", new UserChangeMessage("dindo", 10, true));

FutureCallback<MessageContext> futureCallback = new FutureCallback<MessageContext>() {
    @Override
    public void complete(MessageContext result, Throwable ex, int status) {
        logger.info("消息发送完成");
    }
    @Override
    public void success(MessageContext result) {
        logger.info("消息发送成功");
    }
    @Override
    public void failed(Throwable ex) {
        logger.info("消息发送失败");
    }
    @Override
    public void cancelled() {
        logger.info("消息发送已取消");
    }
};

// 添加回调
callbackFuture.addCallback(futureCallback);

// 移除回调
callbackFuture.removeCallback(futureCallback);

4.11 消息持久化存储

since:1.1.0

提供了com.dindo.mqclient.store.MessageRepository消息存储接口,实现此接口自定义消息存储方式。

import com.dindo.mqclient.store.MessageRepository;

public class LoggerMessageRepository implements MessageRepository {

    /**
     * 保存消息
     *
     * @param message 要保存的消息
     */
    @Override
    public void save(MessageEntity message) {
        logger.info("持久化消息 [{}]", message);
    }

    /**
     * 根据条件查询指定状态的消息
     *
     * @param topic  topic
     * @param status 消息状态
     * @param offset 起始位置
     * @param limit  查询数量
     * @return {@link List}<{@link MessageEntity}>
     */
    @Override
    public List<MessageEntity> select(String topic, MessageStatus status, int offset, int limit) {
        return null;
    }

    /**
     * 更新消息状态
     *
     * @param topic  消息topic
     * @param id     消息标识
     * @param status 要更新的消息状态
     */
    @Override
    public void updateStatus(String topic, String id, MessageStatus status) {

    }
}

在配置文件中指定持久化bean。

spring:
  mq:
    rabbit:
      client:
        default:
          host: 192.168.88.200
          port: 5672
          username: admin
          password: admin
          virtualHost: /
          supportDelayed: true
          repositoryId: loggerMessageRepository

注意:开启消息持久化后,将自动确认接收消息,忽略return ack,且处理异常不会重新入队。

4.12 简单指标监控

since:2.0.0
image.png

2022-05-24 13:51:58.323 DEBUG 31123 --- [llMqClient-MQ监控] c.d.m.rabbit.SimpleMetricsCollector      : [MQ-billMqClient][MONITOR] 连接数: 2, 通道数: 2, 消息发布数: 0, 消息消费数: 0, 消费成功数: 0, 消费失败数: 0
2022-05-24 13:52:27.916 DEBUG 31123 --- [ltMqClient-MQ监控] c.d.m.rabbit.SimpleMetricsCollector      : [MQ-defaultMqClient][MONITOR] 连接数: 2, 通道数: 1, 消息发布数: 0, 消息消费数: 0, 消费成功数: 0, 消费失败数: 0
2022-05-24 13:52:28.330 DEBUG 31123 --- [llMqClient-MQ监控] c.d.m.rabbit.SimpleMetricsCollector      : [MQ-billMqClient][MONITOR] 连接数: 2, 通道数: 2, 消息发布数: 0, 消息消费数: 0, 消费成功数: 0, 消费失败数: 0
0

评论区