消息队列

功能介绍

消息队列云中立组件是一个提供了消息队列能力的框架,兼容了rocketmq、rabbitmq、kafka等消息队列服务。它基于Spring Boot实现,继承了自动化配置能力。对外提供统一的发送消息、消费消息、新建消息接口。通过切换配置文件,自动载入rocketmq、rabbitmq、kafka消息队列服务中的一种,有效简化开发人员对消息中间件的使用复杂度,真正实现了一套代码、多云部署,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。
目前支持的消息队列如下:

  • rocketmq

  • rabbitmq

  • kafka

  • tdmq

  • sqs

    当多云部署,切换云环境,或者私有化部署过程中,消息队列服务多而杂。云中立消息队列组件可以有效帮助用户平滑迁移,无需改动一行代码,仅仅改变配置文件,即可切换到任何一个云环境,极大地提升了开发人员的效率。

    消息队列的使用方式非常简单,分为两种模式:

1. 单实例消息队列
2. 多实例消息队列
单实例消息队列:生产者和消费者各自只能有一个实例,并且生产者和消费者共享同一份配置信息;
多实例消息队列:生产者和消费者各自允许有2个及以上的实例,并且生产者和消费者各自维护配置信息.

大部分情况下,一个模块只会需要一个生产者或者消费者实例,因此第一种方式更常见。但是当业务需要同时向多个消息队列实例生产、消费消息时,就必须使用第二种方式。

使用流程

使用前,请保证有可用的rocketmq(ONS)或者rabbitmq实例,创建实例等操作请参考各大公有云厂商或者自建。

1. maven settings.xml 添加仓库地址

<!-- scg maven仓库 -->
<repository>
    <id>scg-private</id>
    <name>maven-scg-private</name>
    <url>http://packages.glodon.com/artifactory/maven-scg-private/</url>
</repository>

2. 项目中添加 pom 依赖

仓库地址:

<dependency>
    <groupId>com.glodon.paas.foundation</groupId>
    <artifactId>message-queue-starter</artifactId>
    <version>1.0.12-SNAPSHOT</version>
</dependency>

目前最新的版本是1.0.12-SNAPSHOT。

3. 配置文件

3.1 ons(rocketmq)消息队列

需要创建好ONS实例,在实例中创建好Topic、group-id,便于后续使用。

1. 支持普通消息、顺序消息、延时消息、事务消息;
2. 集群模式支持消费失败重试,集群模式无序消息一共重试16次:10s、30s、1min、2min …… 10min、20min、30min、1h、2h,最后放入该groupID对应的死信队列,等待人工手动消费补偿;
3. 支持消费进程多线程,默认20个. 
4. 广播模式消费失败不重试.(阿里云ONS自带特性)
3.1.1 单实例配置

生产者和消费者共用一份配置.

application.yml

paas:
  mq:
    ons:
      access-key: {your access-key}
      secret-key: {your secret-key}
      scope: DEV_GLD
      group-id: GID-CONSUMER-2
      message-model: CLUSTERING
      ordered: false
      namesrv-addr: {your namesrv-addr}
参数名 含义 是否必填
access-key 阿里云接口访问秘钥
secret-key 阿里云接口访问秘钥
scope 域空间,区分开发、测试、生产环境,务必同一环境保持一致 否(保持生产者和消费者一致即可)
group-id 分组 ID 否(生产者不必填写,消费者必须填写)
message-model 消息模式:
CLUSTERING-集群消费模式,BROADCASTING-广播消费模式
是(建议集群消费模式)
ordered 是否有序:
true-有序消息,false-无序消息
否(默认无序消息)
namesrv-addr ONS 接入点 URL 是(阿里云消息队列 RocketMQ/实例详情/TCP 协议接入点)
1. message-model: CLUSTERING-集群消费,消息会被集群中的多个节点分摊消费;                      					 BROADCASTING-广播消费,消息会被集群中的每个节点全部消费.
2. group-id: 消费组或者生产组,兼容老版本的consumer-id和producer-id.
3. 倘若ordered: true, 那么一定要在ProduceMessage中加上shardingKey.
4. namesrv-addr获取方式:进入阿里云控制台,RocketMQ/实例详情,如下图所示:

3.1.2 多实例配置

生产者和消费者需要分开配置,多实例支持多生产者、消费者实例配置,通过id进行区分。
id在生产者、消费者配置中,必须唯一,且需要和代码中调用的地方保持一致。

application.yml

生产者:

paas:
     mq:
       ons:
         producers:
           producer-list:
           - id: ons-1
             access-key: {your access-key}
             secret-key: {your secret-key}
             scope: DEV_GLD
             group-id: GID-CONSUMER-2
             message-model: CLUSTERING
             ordered: false
             namesrv-addr: {your namesrv-addr}
           - id: ons-2
           	 ......

消费者:

 paas:
     mq:
       ons:
         consumers:
           consumer-list:
           - id: ons-1
             access-key: {your access-key}
             secret-key: {your secret-key}
             scope: DEV_GLD
             group-id: GID-CONSUMER-2
             message-model: CLUSTERING
             ordered: false
             namesrv-addr: {your namesrv-addr}
           - id: ons-2
             ......
参数名 含义 是否必填
id 生产者、消费者实例ID,保证全局唯一
access-key 阿里云接口访问秘钥
secret-key 阿里云接口访问秘钥
scope 域空间,区分开发、测试、生产环境,务必同一环境保持一致 否(保持生产者和消费者一致即可)
group-id 分组ID 否(生产者不必填写,消费者必须填写)
message-model 消息模式:
CLUSTERING-集群消费模式,BROADCASTING-广播消费模式
ordered 是否有序:true-有序消息,false-无序消息 否(默认无序消息)
namesrv-addr ONS接入点URL 是(参考单实例)
id: 消费者配置中需要和MessageListener实现类的getId()保持一致,在生产者配置中需要和producerAssist.getProducerById("ons-1")中的参数一致。

3.2 rabbitmq消息队列

支持普通消息,单实例消费者有序消息(暂时不支持全局顺序消息、事务消息).

3.2.1 单实例配置

生产者和消费者共用一份配置.

application.yml

paas:
   mq:
     rabbitmq:
       host: 119.3.245.184
       port: 5672
       username: rabbitmq
       password: 1qaz2wsx3edc!@#
       virtual-host: /
       group-id: CID_VPC_PRODUCT_QUALITY
       scope: DEV_GLD
       exchange-name: exchange_cloudt
       dlx-exchange-name: dlx_exchange_cloudt
       delay-exchange-name: delay_exchange_cloudt
       prefetch-count: 50
       concurrency: 100
       channel-retry-times: 100000
       queue-delete-by-expires-milliseconds: 20000
参数名 含义 是否必须
host rabbtmq实例IP
port 端口号,默认5672
username 用户名
password 密码
virtual-host 虚拟主机地址,默认是/
group-id 分组ID,和阿里云group-id一致 否(生产者不必填写,消费者必须填写)
scope 域空间,和阿里云scope一致,区分开发、测试、生产环境,务必同一环境保持一致 否(保持收发消息的队列一致即可)
exchange-name 普通消息交换机名称,默认是exchange_cloudt
dlx-exchange-name 重试延时和死信交换机名称,默认是dlx_exchange_cloudt
delay-exchange-name 延时消息交换机名称,默认是delay_exchange_cloudt 否(延时消息的生产者和消费者需要填写)
prefetch-count 消费预取消息数上限,允许consumer最大的NACK数量,min=10, max=250,默认是250
concurrency 消费者多线程数量. min=1, max=20,默认是1
channel-retry-times 消费者重建channel次数. min=300, max=10000,默认是300
queue-delete-by-expires-milliseconds 为队列设置超时时间,超过指定时间该队列如果未被使用被自动删除(单位毫秒),该值默认为0,即不启用队列超时
3.2.2 多实例配置

生产者和消费者需要分开配置,支持多生产者、消费者实例配置,通过id进行区分。
id在生产者、消费者配置中,必须唯一,且需要和代码中调用的地方保持一致。

application.yml

生产者:

paas:
    mq:
       rabbitmq:
         producers:
           producer-list:
            - id: rabbit-1
              host: 119.3.245.184
              port: 5672
              username: rabbitmq
              password: 1qaz2wsx3edc!@#
              virtual-host: /
              scope: DEV_GLD
              exchange-name: exchange_cloudt
              dlx-exchange-name: dlx_exchange_cloudt
              delay-exchange-name: delay_exchange_cloudt
              channel-retry-times: 100000
              queue-delete-by-expires-milliseconds: 20000
            - id: rabbit-2
              host: 119.3.245.184
              port: 5672
              username: rabbitmq
              password: 1qaz2wsx3edc!@#
              virtual-host: /
              scope: DEV_GLD
              exchange-name: exchange_cloudt
              dlx-exchange-name: dlx_exchange_cloudt
              delay-exchange-name: delay_exchange_cloudt
              channel-retry-times: 100000
              queue-delete-by-expires-milliseconds: 20000

消费者:

paas:
     mq:
        rabbitmq:
           consumers:
             consumer-list:
             - id: rabbit-1
               host: 119.3.245.184
               port: 5672
               username: rabbitmq
               password: 1qaz2wsx3edc!@#
               virtual-host: /
               group-id: CID_VPC_PRODUCT_QUALITY
               scope: DEV_GLD
               exchange-name: exchange_cloudt
               dlx-exchange-name: dlx_exchange_cloudt
               delay-exchange-name: delay_exchange_cloudt
               prefetch-count: 50
               concurrency: 100
               channel-retry-times: 100000
               queue-delete-by-expires-milliseconds: 20000
             - id: rabbit-2
               ......
参数名 含义 是否必须
id 生产者、消费者实例ID,保证全局唯一
host rabbtmq实例IP
port 端口号,默认5672
username 用户名
password 密码
virtual-host 虚拟主机地址,默认是/
group-id 分组ID,和阿里云group-id一致 否(生产者不必填写,消费者必须填写)
scope 域空间,和阿里云scope一致,区分开发、测试、生产环境,务必同一环境保持一致 否(保持收发消息的队列一致即可)
exchange-name 普通消息交换机名称,默认是exchange_cloudt
dlx-exchange-name 延时和死信交换机名称,默认是dlx_exchange_cloudt
delay-exchange-name 延时消息交换机名称,默认是delay_exchange_cloudt 否(延时消息的生产者和消费者需要填写)
prefetch-count 消费预取消息数上限,允许consumer最大的NACK数量,min=10, max=250,默认是250
concurrency 消费者多线程数量. min=1, max=20,默认是1
channel-retry-times 消费者重建channel次数. min=300, max=10000,默认是300
queue-delete-by-expires-milliseconds 为队列设置超时时间,超过指定时间该队列如果未被使用被自动删除(单位毫秒),该值默认为0,即不启用队列超时
id: 在消费者配置中需要和MessageListener实现类的getId()保持一致,在生产者配置中需要和producerAssist.getProducerById("rabbit-1")中的参数一致。

注意:

  • 1.从1.0.11-SNAPSHOT版本开始,如果要使用rabbitmq的延时消息功能,rabbitmq必须安装延时插件,否则启动报错;
  • 2.从1.0.11-SNAPSHOT版本开始,死信队列默认按照groupId生成,即:一个groupId的消费者共用一个死信队列,死信队列名称为 {scope}{retry_dead_letter_queue}{groupId};死信队列消息过期时间为3天,如果消息重要,需要及时人工处理。

3.3 tdmq消息队列

tdmq中有订阅(subscription)的概念,相当于ONS中的group-id。在我们的SDK中,配置文件中配置好group-id以后,会将其自动创建为subscription。

  1. 支持普通消息、顺序消息、延时消息

  2. 支持消费失败重试,默认重试次数16次,可以通过配置进行修改。

  3. tdmq在集群中有namespace的概念,默认命名空间是"default",也可以通过配置项进行配置

  4. 腾讯tdmq-client下载需要在maven配置TDMQ 私服地址,具体配置方式参见 腾讯云TDMQ

    <profiles>
        <profile>
          <id>nexus</id>
          <repositories>
              <repository>
                  <id>central</id>
                  <url>http://repo1.maven.org/maven2</url>
                  <releases>
                      <enabled>true</enabled>
                  </releases>
                  <snapshots>
                      <enabled>true</enabled>
                  </snapshots>
              </repository>
          </repositories>
          <pluginRepositories>
              <pluginRepository>
                  <id>central</id>
                  <url>http://repo1.maven.org/maven2</url>
                  <releases>
                      <enabled>true</enabled>
                  </releases>
                  <snapshots>
                      <enabled>true</enabled>
                  </snapshots>
              </pluginRepository>
          </pluginRepositories>
      </profile>
      <profile>
          <id>qcloud-repo</id>
          <repositories>
              <repository>
                  <id>qcloud-central</id>
                  <name>qcloud mirror central</name>
                  <url>http://mirrors.cloud.tencent.com/nexus/repository/maven-public/</url>
                  <snapshots>
                      <enabled>true</enabled>
                  </snapshots>
                  <releases>
                      <enabled>true</enabled>
                  </releases>
              </repository>
              </repositories>
          <pluginRepositories>
              <pluginRepository>
                  <id>qcloud-plugin-central</id>
                  <url>http://mirrors.cloud.tencent.com/nexus/repository/maven-public/</url>
                  <snapshots>
                      <enabled>true</enabled>
                  </snapshots>
                  <releases>
                      <enabled>true</enabled>
                  </releases>
              </pluginRepository>
          </pluginRepositories>
      </profile>
    </profiles>
    
    <activeProfiles>
      <activeProfile>nexus</activeProfile>
      <activeProfile>qcloud-repo</activeProfile>
    </activeProfiles>
    
3.3.1 单实例配置

yaml配置:

paas:
  mq:
    tdmq:
      # tdmqClient 用于自动创建 subscription 使用
      access-key: xxxxxx
      access-secret: xxxxxx
      region: ap-beijing
      # tdmq配置
      # ip:port 替换成路由ID,位于【集群管理】接入点列表
      service-url: pulsar://172.21.0.14:6000
      # custom:后面替换成路由ID,位于【集群管理】接入点列表
      net-model-key: custom:pulsar-4nbbp3w82z/vpc-k9oy42i6/subnet-4sz2sly3
      # 替换成角色密钥,位于【角色管理】页面
      authentication: xxxxxxxxxxxx
      # 命名空间
      namespace: default
      # 集群ID
      cluster: pulsar-4nbbp3w82z
      # 分组,即:对应TDMQ 订阅(subscription) 的概念
      group-id: tdmq-test-group
      scope: dev
参数名 含义 是否必须
access-key ak
access-secret sk
region 区域
service-url ip:port 替换成路由ID,位于【集群管理】接入点列表
net-model-key custom:后面替换成路由ID,位于【集群管理】接入点列表
authentication 替换成角色密钥,位于【角色管理】页面
namespace 命名空间,默认default
cluster 集群ID
group-id 分组,即:对应TDMQ 订阅(subscription) 的概念
scope 域空间,和阿里云scope一致,区分开发、测试、生产环境,务必同一环境保持一致
3.3.2 多实例配置

生产者和消费者需要分开配置,支持多生产者、消费者实例配置,通过id进行区分。
id在生产者、消费者配置中,必须唯一,且需要和代码中调用的地方保持一致。

yaml配置

paas:
  mq:
    tdmq:
      producers:
        producer-list:
          - id: tdmq-producer-1
            access-key: xxxxxx
            access-secret: xxxxxx
            region: ap-beijing
            service-url: pulsar://172.21.0.14:6000
            net-model-key: custom:pulsar-4nbbp3w82z/vpc-k9oy42i6/subnet-4sz2sly3
            authentication: xxxxxxxxxxxx
            namespace: default
            cluster: pulsar-4nbbp3w82z
            group-id: tdmq-test-group
            scope: dev
          - id: tdmq-producer-2
            access-key: xxxxxx
            access-secret: xxxxxx
            region: ap-beijing
            service-url: pulsar://172.21.0.14:6000
            net-model-key: custom:pulsar-4nbbp3w82z/vpc-k9oy42i6/subnet-4sz2sly3
            authentication: xxxxxxxxxxxx
            namespace: default
            cluster: pulsar-4nbbp3w82z
            group-id: tdmq-test-group
            scope: dev
      consumers:
        consumer-list:
          - id: tdmq-consumer-1
            access-key: xxxxxx
            access-secret: xxxxxx
            region: ap-beijing
            service-url: pulsar://172.21.0.14:6000
            net-model-key: custom:pulsar-4nbbp3w82z/vpc-k9oy42i6/subnet-4sz2sly3
            authentication: xxxxxxxxxxxx
            namespace: default
            cluster: pulsar-4nbbp3w82z
            group-id: tdmq-test-group
            scope: dev
          - id: tdmq-consumer-2
            access-key: xxxxxx
            access-secret: xxxxxx
            region: ap-beijing
            service-url: pulsar://172.21.0.14:6000
            net-model-key: custom:pulsar-4nbbp3w82z/vpc-k9oy42i6/subnet-4sz2sly3
            authentication: xxxxxxxxxxxx
            namespace: default
            cluster: pulsar-4nbbp3w82z
            group-id: tdmq-test-group
            scope: dev
参数名 含义 是否必须
id 生产者、消费者实例ID,保证全局唯一
access-key ak
access-secret sk
region 区域
service-url ip:port 替换成路由ID,位于【集群管理】接入点列表
net-model-key custom:后面替换成路由ID,位于【集群管理】接入点列表
authentication 替换成角色密钥,位于【角色管理】页面
namespace 命名空间,默认default
cluster 集群ID
group-id 分组,即:对应TDMQ 订阅(subscription) 的概念
scope 域空间,和阿里云scope一致,区分开发、测试、生产环境,务必同一环境保持一致

注意

id: 在消费者配置中需要和MessageListener实现类的getId()保持一致,在生产者配置中需要和producerAssist.getProducerById("tdmq-1")中的参数一致。

示例代码

1. 生产消息

生产消息主要分为3步:

  1. 自动装配Producer;
  2. 构造消息结构体;
  3. 发送消息返回ID.

1.1 单实例场景

发送普通消息:

@Autowired
private Producer producer;

public void sendMsg() {

    //构造一条消息.
    ProduceMessage message = ProduceMessage.fromString("Topic", "tag", "msgBody");

    //将自定义属性信息注入到userProperties中.
    Map<String, String> userProperties = new HashMap<>();
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
    message.setUserProperties(userProperties);

    //发送消息并返回消息ID.
    String messageId = producer.send(message);
}

发送有序消息:务必 setShardingKey,否则会报异常

@Autowired
private Producer producer;

public void testSendOrderMsg() {

    //构造一条消息.
    ProduceMessage message = ProduceMessage.fromString("Topic", "tag", "msgBody");

    //设置有序key.
    message.setShardingKey("sequence");

    //将自定义属性信息注入到userProperties.
    Map<String, String> userProperties = new HashMap<>();
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
    message.setUserProperties(userProperties);

    //发送消息并返回messageID.
    String messageId = producer.send(message);

}

1.2 多实例场景

发送普通消息

@Autowired
private ProducerAssist producerAssist;
public void testSendMsg(){

    //根据id获取对应生产者实例.
    Producer producer = producerAssist.getProducerById("ons-1");

    //构造消息.
    ProduceMessage message = ProduceMessage.fromString("topic", "tag", "msgBody");

    //将自定义属性信息注入到userProperties.
    Map<String, String> userProperties = new HashMap<>();
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
    message.setUserProperties(userProperties);

    //发送消息并返回messageId.
    String messageId = producer.send(message);
}

发送有序消息:务必setShardingKey,否则会报异常

@Autowired
private ProducerAssist producerAssist;
public void testSendOrderMsg() {

    //根据id获取对应生产者实例.
    Producer producer = producerAssist.getProducerById("ons-1");

    //构造消息.
    ProduceMessage message = ProduceMessage.fromString("topic", "tag", "msgBody");

    //设置有序key.
    message.setShardingKey("sequence");

    //将自定义属性信息注入到userProperties.
    Map<String, String> userProperties = new HashMap<>();
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
    message.setUserProperties(userProperties);

    //发送消息并返回messageId.
    String messageId = producer.send(message);
}

注意:

  1. producerAssist.getProducerById("xxx")中的ID务必和application.yml的paas.mq.ons.producers.producer-list[i].id一致,否则无法实例化生产者.

  2. ProduceMessage.fromString("topic","tag","msgBody")是ProduceMessage消息的构造函数,第一个参数“topic”代表topic,第二个参数“tag”代表tag,第三个参数“msgBody”代表消息内容。如果tag为空,传入null即可。

  3. ProduceMessage结构如下:

public class ProduceMessage {

  /**
   * 消息所属的主题
   */
  private final String topic;
  /**
   * 消息体的二进制数据
   */
  private final byte[] payload;

  /**
   * 消息的tag,接收端可用于进行消息的二级分类,对发送端无影响。
   * 一般情况下接收消息的一端会接收topic相同的消息,当接收消息设置了相同的tag的时候仅接收同一主题下相同tag的消息
   */
  private String tag;
  /**
   * 消息需要延迟发送的时间,单位毫秒(ms).
   * 默认情况下消息立即发送,当设置了该值后消息会在间隔给定的时间后再发送。
   * 如果没有设置该值,且没有设置{@link #getAtTime()},则消息会立即送达接收端
   */
  private Integer delayTime;
  /**
   * 毫秒级的Unix时间戳:1648006200000(2022-03-23 11:30:00)
   * 设置一个时间,表示消息进入消息队列后在指定的时间才会被推送给接收端。
   * 该值的优先级低于 {@link #getDelayTime()} 属性
   */
  private Long atTime;

  /**
   * 如果此值不为NULL,则发送有序消息,同一个值下的消息会保证有序
   */
  private String shardingKey = "";

  /**
   * 消息队列附加的额外的属性。
   * 不解析消息体就能看到某些特殊的信息(例如租户Id,项目id等),可用于查询消息历史时过滤使用。
   */
  private Map<String, String> userProperties = new HashMap<>();

  private ProduceMessage(String topic, byte[] data) {
    this.topic = topic;
    this.payload = data;
  }

  private ProduceMessage(String topic, String tag, byte[] data){
    this.topic = topic;
    this.tag = tag;
    this.payload = data;
  }

  public static ProduceMessage fromBinary(String topic, byte[] data) {
    return new ProduceMessage(topic, data);
  }

  public static ProduceMessage fromString(String topic, String data) {
    byte[] binary = data.getBytes(Charsets.UTF_8);
    return new ProduceMessage(topic, binary);
  }

  public static ProduceMessage fromString(String topic, String tag, String data){
    byte[] binary = data.getBytes(Charsets.UTF_8);
    return new ProduceMessage(topic,tag,binary);
  }

  public static ProduceMessage fromJSON(String topic, JSONObject data) {
    byte[] binary = JSON.toJSONBytes(data);
    return fromBinary(topic, binary);
  }

  public static ProduceMessage fromObject(String topic, Object obj) {
    byte[] binary = JSON.toJSONBytes(obj);
    return fromBinary(topic, binary);
  }
}

2. 消费消息

@Component
public class Test1MessageListener implements MessageListener{

  @Override
  public String getId() {
    //单实例可不填,多实例需要和配置文件中的id对应.  
    return "ons-1";
  }

  @Override
  public String getTopic() {
    return "topic";
  }

  @Override
  public String getTag() {
    return "tag";
  }

  @Override
  public void process(ConsumeMessage message) {
    //message.getValueAsString()得到消息内容.
    System.out.println("Test1MessageListener正在监听:"+message.getValueAsString());
    
    //properties存储的是消息的自定义属性信息:例如上下文信息、topic、tag等.
	Properties properties = message.getUserProperties();
 	System.out.println("message监听到的上下文信息是: => "+ properties);
  }
}

注意事项

  1. 单实例消费者getId()返回为空即可,此处不做任何判断;多实例getId()务必和application.yml的paas.mq.ons.consumers.consumer-list[i].id保持一致,否则该监听器无法注册到消费者上;

  2. 同一个消费者实例中,MessageListener实现类可以有多个,订阅不同的topic和tag;

  3. MessageListener中不允许出现tag1||tag2||tag3的订阅方式,请分开成3个MessageListener,每个MessageListener只允许订阅一个tag,符合开闭原则。否则订阅的消息无法被消费;

  4. 如果一个group订阅了一个Topic下的全部tag,并且该Topic有子标签tag,那么MessageListener必须根据tag的个数,拆分成多个MessageListener,每个订阅一个tag.;

  5. 如果一个group订阅了一个Topic,并且该Topic下没有任何tag,MessageListener的tag可以用*或者null来表示;

  6. 各个产品线订阅的topic、tag命名不可太长,group-id/consumer-id也不可太长,Topic+tag+group-id的总长度不能超过255个字符;

  7. 集群消费模式下,需要确保topic+tag+group-id的订阅关系是唯一的,否则会出现消息丢失;

  8. scope要么不填,填写务必保持生产模块和消费模块统一;

  9. 使用ONS消息队列,尽量采用集群消费模式(CLUSTERING),不建议采用广播消费模式(BROADCASTING)。因为广播模式消费失败不会重试,并且广播模式不支持分布式多实例部署;

  10. ConsumeMessage结构如下:

public class ConsumeMessage implements Serializable{
  /**
   * 消息队列返回的当前消息的id值
   */
  @Getter
  private String messageId;

  /**
   * 消息体的二进制数组格式
   */
  private final byte[] payload;

  /**
   * 消息主题
   */
  @Getter
  private String topic;

  /**
   * 消息tag
   */
  @Getter
  private String tag;
  /**
   * 消息是否已设置为提交状态
   */
  @Getter
  private boolean committed;
  /**
   * 正常返回的情况下,消息队列框架是否自动提交消息
   */
  @Getter
  @Setter
  private boolean autoCommit = true;

  /**
   * 返回当前消息失败重试的次数.
   */
  @Getter
  @Setter
  private int reconsumeTimes;

  /**
   * userProperties存储了Ons和rabbitmq的Message的原始属性信息.
   * 直接通过key获取相应value.
   */
  @Getterq
  @Setter
  private Properties userProperties;

  public ConsumeMessage(String messageId, byte[] payload) {
    this.messageId = messageId;
    this.payload = payload;
  }

  public ConsumeMessage(String messageId, byte[] payload,String topic,String tag) {
    this.messageId = messageId;
    this.payload = payload;
    this.topic=topic;
    this.tag=tag;
  }
  /**
   * 提交消息,表示该消息已处理完成
   */
  public final void commit() {
    this.committed = true;
  }

  /**
   * 读取消息内容,以byte数组形式返回
   */
  public byte[] getValueAsBytes() {
    return payload;
  }

  /**
   * 读取消息内容,以Json对象形式返回
   */
  public JSONObject getValueAsJson() {
    return (JSONObject) JSON.parse(payload);
  }

  /**
   * 读取消息内容,以对象形式返回
   */
  public <T> T getValueAsObject(Class<T> cls) {
    return JSON.parseObject(payload, cls);
  }

  /**
   * 回读取消息内容,以字符串形式返
   */
  public String getValueAsString() {
    return new String(payload, Charset.defaultCharset());
  }
}

3. 常见问题

  1. 报错:消息发送不出去或者接受不到,也没有提示任何rabbitmq,或者ConnectionFactory相关的日志信息。

    原因1

   pom.xml中没有引用最新的message-queue-starter包

解决方案

<dependency>
  <groupId>com.glodon.cloud</groupId>
  <artifactId>message-queue-spring-boot-starter</artifactId>
  <!-- version在spring-cloud-glodon已经管理,此处可以省略. -->
  <version>2.3.1.RELEASE</version>
</dependency>

原因2

配置文件没有加载,或者application.yml文件加载错了

解决方案

对于yml文件一定要确定加载的是当前环境下的配置,启动脚本里面的profile最好是定义的变量。
  1. 报错:生产消息没问题,消费消息有问题。

    原因:

    1. topic值在生产者和消费者中不一致,大概率是一个在代码中加了scope前缀,一个没有加scope前缀;
    
    2. scope应该放到配置文件中定义,可以是yml文件,可以是apollo中,但是千万不要写死在代码中!!!也千万不要和topic合并写死在配置文件中!!!
       反面典型:DEV_GLD_SYS_MGR!!!写成这样是肯定无法消费到的!
    
    3. group-id配置有问题,导致消息无法订阅上,需要前去阿里云ONS控制台观察.
    
    

    解决方案:

    1. scope=DEV_GLD,topic=SYS_MGR,topic是MessageListener.java中定义的,scope和topic一定要分开;
    2. 去阿里云控制台观察group-id是否配置正确.
    
  2. 报错:ONS启动服务报错如下

    img

    No route info of this topic, VPC_DEV_xxx
    

    原因:
    配置文件有问题

    paas:  
    	mq:    
    		ons:      
    			producers:        
    				producer-list:          
    					- id: commonmq            
    					  access-key: LTAIIwVnIn11IH0p            
    					  secret-key: r4kyqm6GXGTl9dmWFPSRcO1RyIHRHa            
    					  scope: VPC_DEV_GLD            
    					  message-model: CLUSTERING            
    					  group-id:            
    					  ordered: false            
    					  namesrv-addr: http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80
    

    namesrv-addr配置错误,access-key、secret-key有问题,或者scope有问题

    解决方案:
    参考3.1 和 3.2 的配置文件.

  3. 报错:rabbitmq启动服务报错如下

    2019-12-14 11:05:57,072 - INFO  #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.i.r.AbstractRabbitmqConsumer [AbstractRabbitmqConsumer.java:278]: 消息消费者启动:rabbitmq.
    2019-12-14 11:05:57,088 - INFO  #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.i.r.AbstractRabbitmqConsumer [AbstractRabbitmqConsumer.java:53]: 当前Rabbitmq消费者注册的消息监听器如下:
    2019-12-14 11:05:57,088 - INFO  #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.AbstractConsumer [AbstractConsumer.java:119]: 第1类消息监听器,topic:=> SYS_MGR, tag:=> LIC_CREATE
    2019-12-14 11:05:57,089 - INFO  #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.AbstractConsumer [AbstractConsumer.java:132]: 消息监听器id: , topic: SYS_MGR, tag: LIC_CREATE
    2019-12-14 11:05:57,107 - ERROR #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.i.r.AbstractRabbitmqConsumer [AbstractRabbitmqConsumer.java:263]: 消息交换机 VPC_PRODUCT_GLODON_exchange_cloudt 创建失败, null
    2019-12-14 11:05:57,107 - WARN  #[springAppName_IS_UNDEFINED,,,]# [AMQP Connection 10.129.247.165:5672] c.r.c.i.ForgivingExceptionHandler [ForgivingExceptionHandler.java:120]: An unexpected connection driver error occured (Exception message: Socket closed)
    

    原因:
    消息队列没有增加远程创建交换机的权限;


    解决方案:
    在ansible脚本中给相关用户加上远程创建交换机的权限.