# 消息队列

# 功能介绍

为了支持产品在多云环境的部署,我们对消息队列中间件进行了云中立适配,适配了 RocketMQ,RabbitMQ和TDMQ 。通过配置文件可以很方便的进行切换。

消息队列的使用方式非常简单,分为两种模式:

1. 单实例消息队列
2. 多实例消息队列
单实例消息队列:生产者和消费者各自只能有一个实例,并且生产者和消费者共享同一份配置信息;
多实例消息队列:生产者和消费者各自允许有2个及以上的实例,并且生产者和消费者各自维护配置信息.
1
2
3
4

大部分情况下,一个模块只会需要一个生产者或者消费者实例,因此第一种方式更常见。但是当业务需要同时向多个消息队列实例生产、消费消息时,就必须使用第二种方式。

# 使用流程

使用前,请保证有可用的rocketmq(ONS),rabbitmq或者TDMQ实例,创建实例等操作请参考各大公有云厂商或者自建。

# 1. maven settings.xml 添加仓库地址

<!-- scg maven仓库 -->
<repository>
    <id>scg-private</id>
    <name>maven-scg-private</name>
    <url>http://packages.glodon.com/artifactory/maven-scg-private/</url>
</repository>
1
2
3
4
5
6

# 2. 项目中添加 pom 依赖

<dependency>
  <groupId>com.glodon.cloud</groupId>
  <artifactId>message-queue-spring-boot-starter</artifactId>
  <!-- version在spring-cloud-glodon已经管理,此处可以省略. -->
  <version>1.0.2</version>
</dependency>
1
2
3
4
5
6

# 3. 配置文件

# 3.1 ons(rocketmq)消息队列

需要创建好ONS实例,在实例中创建好Topic、group-id,便于后续使用。

1. 支持普通消息、顺序消息、延时消息、事务消息;
2. 集群模式支持消费失败重试,广播模式消费失败不重试(阿里云ONS自带特性),推荐使用集群消费模式;
3. 支持消费进程多线程,默认20个. 
1
2
3
# 3.1.1 单实例配置

生产者和消费者共用一份配置.

application.yml

paas:
  mq:
    ons:
      access-key: {access-key}
      secret-key: {secret-key}
      scope: DEV_GLD
      group-id: GID-CONSUMER-2
      message-model: CLUSTERING
      ordered: false
      namesrv-addr: http://MQ_INST_1973889025569992_BaiMybAw.mq-internet-access.mq-internet.aliyuncs.com:80
1
2
3
4
5
6
7
8
9
10
参数名 含义 是否必填
access-key 阿里云接口访问秘钥
secret-key 阿里云接口访问秘钥
scope 域空间,区分开发、测试、生产环境,务必同一环境保持一致 否(保持生产者和消费者一致即可)
group-id 分组 ID 否(生产者不必填写,消费者必须填写)
message-model 消息模式:
CLUSTERING-集群消费模式,BROADCASTING-广播消费模式
是(建议集群消费模式)
ordered 是否有序:
true-有序消息,false-无序消息
否(默认无序消息)
namesrv-addr ONS 接入点 URL 是(阿里云消息队列 RocketMQ/实例详情/TCP 协议接入点)
1. message-model: CLUSTERING-集群消费,消息会被集群中的多个节点分摊消费;                      					 BROADCASTING-广播消费,消息会被集群中的每个节点全部消费.
2. group-id: 消费组或者生产组,兼容老版本的consumer-id和producer-id.
3. 倘若ordered: true, 那么一定要在ProduceMessage中加上shardingKey.
4. namesrv-addr获取方式:进入阿里云控制台,RocketMQ/实例详情,如下图所示:
1
2
3
4

# 3.1.2 多实例配置

生产者和消费者需要分开配置,多实例支持多生产者、消费者实例配置,通过id进行区分。 id在生产者、消费者配置中,必须唯一,且需要和代码中调用的地方保持一致。

application.yml

生产者:

paas:
     mq:
       ons:
         producers:
           producer-list:
           - id: ons-1
             access-key: {access-key}
             secret-key: {secret-key}
             scope: DEV_GLD
             group-id: GID-CONSUMER-2
             message-model: CLUSTERING
             ordered: false
             namesrv-addr: http://MQ_INST_1973889025569992_BaiMybAw.mq-internet-access.mq-internet.aliyuncs.com:80
           - id: ons-2
           	 ......
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

消费者:

 paas:
     mq:
       ons:
         consumers:
           consumer-list:
           - id: ons-1
             access-key: {access-key}
             secret-key: {secret-key}
             scope: DEV_GLD
             group-id: GID-CONSUMER-2
             message-model: CLUSTERING
             ordered: false
             namesrv-addr: http://MQ_INST_1973889025569992_BaiMybAw.mq-internet-access.mq-internet.aliyuncs.com:80
           - id: ons-2
             ......
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
参数名 含义 是否必填
id 生产者、消费者实例ID,保证全局唯一
access-key 阿里云接口访问秘钥
secret-key 阿里云接口访问秘钥
scope 域空间,区分开发、测试、生产环境,务必同一环境保持一致 否(保持生产者和消费者一致即可)
group-id 分组ID 否(生产者不必填写,消费者必须填写)
message-model 消息模式:
CLUSTERING-集群消费模式,BROADCASTING-广播消费模式
ordered 是否有序:true-有序消息,false-无序消息 否(默认无序消息)
namesrv-addr ONS接入点URL 是(参考单实例)
id: 消费者配置中需要和MessageListener实现类的getId()保持一致,在生产者配置中需要和producerAssist.getProducerById("ons-1")中的参数一致。
1

# 3.2 rabbitmq消息队列

支持普通消息,单实例消费者有序消息(暂时不支持全局顺序消息、延时消息、事务消息).

# 3.2.1 单实例配置

生产者和消费者共用一份配置.

application.yml

paas:
  mq:
    rabbitmq:
      host: 119.3.245.184
      port: 5672
      username: {username}
      password: {password}
      virtual-host: /
      group-id: CID_VPC_PRODUCT_QUALITY
      scope: DEV_GLD
      exchange-name: exchange_cloudt
      dlx-exchange-name: dlx_exchange_cloudt
1
2
3
4
5
6
7
8
9
10
11
12
参数名 含义 是否必须
host rabbtmq 实例 IP
port 端口号,默认 5672
username 用户名
password 密码
virtual-host 虚拟主机地址,默认是/
group-id 分组 ID,和阿里云 group-id 一致 否(生产者不必填写,消费者必须填写)
scope 域空间,和阿里云 scope 一致,区分开发、测试、生产环境,务必同一环境保持一致 否(保持收发消息的队列一致即可)
exchange-name 普通消息交换机名称,默认是 exchange_cloudt
dlx-exchange-name 延时和死信交换机名称,默认是 dlx_exchange_cloudt
# 3.2.2 多实例配置

生产者和消费者需要分开配置,支持多生产者、消费者实例配置,通过id进行区分。 id在生产者、消费者配置中,必须唯一,且需要和代码中调用的地方保持一致。

application.yml

生产者:

 paas:
       mq:
         rabbitmq:
           producers:
             producer-list:
             - id: rabbit-1
               host: 119.3.245.184
               port: 5672
               username: {username}
               password: {password}
               virtual-host: /
               group-id: CID_VPC_PRODUCT_QUALITY
               scope: DEV_GLD
               exchange-name: exchange_cloudt
               dlx-exchange-name: dlx_exchange_cloudt           
             - id: rabbit-2
               host: 119.3.245.184
               port: 5672
               username: {username}
               password: {password}
               virtual-host: /
               group-id: CID_VPC_PRODUCT_QUALITY
               scope: DEV_GLD
               exchange-name: exchange_cloudt
               dlx-exchange-name: dlx_exchange_cloudt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

消费者:

paas:
     mq:
        rabbitmq:
           consumers:
             consumer-list:
             - id: rabbit-1
               host: 119.3.245.184
               port: 5672
               username: {username}
               password: {password}
               virtual-host: /
               group-id: CID_VPC_PRODUCT_QUALITY
               scope: DEV_GLD
               exchange-name: exchange_cloudt
               dlx-exchange-name: dlx_exchange_cloudt
             - id: rabbit-2
               ......
   
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
参数名 含义 是否必须
id 生产者、消费者实例ID,保证全局唯一
host rabbtmq实例IP
port 端口号,默认5672
username 用户名
password 密码
virtual-host 虚拟主机地址,默认是/
group-id 分组ID,和阿里云group-id一致 否(生产者不必填写,消费者必须填写)
scope 域空间,和阿里云scope一致,区分开发、测试、生产环境,务必同一环境保持一致 否(保持收发消息的队列一致即可)
exchange-name 普通消息交换机名称,默认是exchange_cloudt
dlx-exchange-name 延时和死信交换机名称,默认是dlx_exchange_cloudt
id: 在消费者配置中需要和MessageListener实现类的getId()保持一致,在生产者配置中需要和producerAssist.getProducerById("rabbit-1")中的参数一致。
1

# 3.3 TDMQ消息队列

TDMQ消息队列官方暂不支持基础网络接入(不支持公网),需要在腾讯云服务器内进行调试

腾讯云控制台操作步骤:

  • 登录腾讯云TDMQ控制台:https://console.cloud.tencent.com/tdmq/env?rid=8
  • 在环境管理中新建接入点
  • 在角色管理中新建角色
  • 为default环境配置角色

注意事项

  • sdk暂时不支持自动创建topic,需要您在控制台“topic管理”中手动创建topic。topic格式为"${scope}${topic}${tag}",如"dev_topic_tag"

  • 使用默认“default”环境,无需在环境管理中新建环境

# 3.3.1 单实例配置

生产者和消费者共用一份配置.

application.yml

paas:
  mq:
    tdmq:
      service-url: pulsar://{接入点地址}
      net-model-key: custom:{路由ID}
      authentication: {密钥}
      group-id: group8889
      retry-count: 16
      scope: dev
1
2
3
4
5
6
7
8
9
参数名 含义 是否必须
service-url 通过腾讯云TDMQ控制台:环境管理->接入点->地址获得。
net-model-key 通过腾讯云TDMQ控制台:环境管理->接入点->路由ID获得。
authentication 通过腾讯云TDMQ控制台:角色管理->添加角色->复制秘钥获得。 且应在环境管理中配置权限
group-id 消费者组(不需在控制台配置,可随意填写)
retry-count 重试次数
scope 域空间 否(保持收发消息的队列一致即可)
# 3.3.2 多实例配置

生产者和消费者需要分开配置,支持多生产者、消费者实例配置,通过id进行区分。 id在生产者、消费者配置中,必须唯一,且需要和代码中调用的地方保持一致。

application.yml

生产者:

paas:
  mq:
    tdmq:
      producers:
        producer-list:
          - id: id-1
            service-url: pulsar://{接入点地址}
            net-model-key: custom:{路由ID}
            authentication: {密钥}
            group-id: group8889
            retry-count: 16
            scope: dev
          - id: id-2
            service-url: pulsar://{接入点地址}
            net-model-key: custom:{路由ID}
            authentication: {密钥}
            group-id: group0000
            retry-count: 16
            scope: dev
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

消费者:

paas:
     mq:
        rabbitmq:
          consumers:
            consumer-list:
              - id: id-1
                service-url: pulsar://{接入点地址}
                net-model-key: custom:{路由ID}
                authentication: {密钥}
                group-id: group8889
                retry-count: 16
                scope: dev
              - id: id-2
                service-url: pulsar://{接入点地址}
                net-model-key: custom:{路由ID}
                authentication: {密钥}
                group-id: group0000
                retry-count: 16
                scope: dev
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
参数名 含义 是否必须
id 生产者、消费者实例ID,保证全局唯一
service-url 通过腾讯云TDMQ控制台:环境管理->接入点->地址获得。
net-model-key 通过腾讯云TDMQ控制台:环境管理->接入点->路由ID获得。
authentication 通过腾讯云TDMQ控制台:角色管理->添加角色->复制秘钥获得。 且应在环境管理中配置权限
group-id 消费者组(不需在控制台配置,可随意填写)
retry-count 重试次数
scope 域空间 否(保持收发消息的队列一致即可)
id: 在消费者配置中需要和MessageListener实现类的getId()保持一致,在生产者配置中需要和producerAssist.getProducerById("rabbit-1")中的参数一致。
1

# 示例代码

# 1. 生产消息

生产消息主要分为3步:

  1. 自动装配Producer;
  2. 构造消息结构体;
  3. 发送消息返回ID.

# 1.1 单实例场景

发送普通消息:

@Autowired
private Producer producer;

public void sendMsg() {

    //构造一条消息.
    ProduceMessage message = ProduceMessage.fromString("Topic", "tag", "msgBody");

    //将自定义属性信息注入到userProperties中.
    Map<String, String> userProperties = new HashMap<>();
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
    message.setUserProperties(userProperties);

    //发送消息并返回消息ID.
    String messageId = producer.send(message);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

发送有序消息:务必 setShardingKey,否则会报异常

@Autowired
private Producer producer;

public void testSendOrderMsg() {

    //构造一条消息.
    ProduceMessage message = ProduceMessage.fromString("Topic", "tag", "msgBody");

    //设置有序key.
    message.setShardingKey("sequence");

    //将自定义属性信息注入到userProperties.
    Map<String, String> userProperties = new HashMap<>();
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
    message.setUserProperties(userProperties);

    //发送消息并返回messageID.
    String messageId = producer.send(message);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

# 1.2 多实例场景

发送普通消息

@Autowired
private ProducerAssist producerAssist;
public void testSendMsg(){

    //根据id获取对应生产者实例.
    Producer producer = producerAssist.getProducerById("ons-1");

    //构造消息.
    ProduceMessage message = ProduceMessage.fromString("topic", "tag", "msgBody");

    //将自定义属性信息注入到userProperties.
    Map<String, String> userProperties = new HashMap<>();
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
    message.setUserProperties(userProperties);

    //发送消息并返回messageId.
    String messageId = producer.send(message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

发送有序消息:务必setShardingKey,否则会报异常

@Autowired
private ProducerAssist producerAssist;
public void testSendOrderMsg() {

    //根据id获取对应生产者实例.
    Producer producer = producerAssist.getProducerById("ons-1");

    //构造消息.
    ProduceMessage message = ProduceMessage.fromString("topic", "tag", "msgBody");

    //设置有序key.
    message.setShardingKey("sequence");

    //将自定义属性信息注入到userProperties.
    Map<String, String> userProperties = new HashMap<>();
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_SERVICECONTEXT, "1377889900000000");
    userProperties.put(RabbitConstantsUtil.USER_PROPERTY_TRACE_ID, "2233333333000000000");
    message.setUserProperties(userProperties);

    //发送消息并返回messageId.
    String messageId = producer.send(message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

注意:

  1. producerAssist.getProducerById("xxx")中的ID务必和application.yml的paas.mq.ons.producers.producer-list[i].id一致,否则无法实例化生产者.

  2. ProduceMessage.fromString("topic","tag","msgBody")是ProduceMessage消息的构造函数,第一个参数“topic”代表topic,第二个参数“tag”代表tag,第三个参数“msgBody”代表消息内容。如果tag为空,传入null即可。

  3. ProduceMessage结构如下:

    public class ProduceMessage {
    
      // 消息所属的主题topic
      private final String topic;
      
      //消息体的二进制数据
      private final byte[] payload;
    
      /**
       * 消息的tag,接收端可用于进行消息的二级分类,对发送端无影响。
       * 一般情况下接收消息的一端会接收topic相同的消息,当接收消息设置了相同的tag的时候仅接收同一主题下相同tag的消息
       */
      private String tag;
      
      //消息需要延迟发送的时间。
      private Integer delayTime;
      
      // 设置一个时间,表示消息进入消息队列后在指定的时间才会被推送给接收端。
      private Long atTime;
    
      /**
       * 如果此值不为NULL,则发送有序消息,同一个值下的消息会保证有序
       */
      private String shardingKey = "";
    
      /**
       * 消息队列附加的额外的属性。
       * 不解析消息体就能看到某些特殊的信息(例如租户Id,项目id等),可用于查询消息历史时过滤使用。
       */
      private Map<String, String> userProperties = new HashMap<>();
    
      private ProduceMessage(String topic, byte[] data) {
        this.topic = topic;
        this.payload = data;
      }
    
      private ProduceMessage(String topic, String tag, byte[] data){
        this.topic = topic;
        this.tag = tag;
        this.payload = data;
      }
    
      public static ProduceMessage fromBinary(String topic, byte[] data) {
        return new ProduceMessage(topic, data);
      }
    
      public static ProduceMessage fromString(String topic, String data) {
        byte[] binary = data.getBytes(Charsets.UTF_8);
        return new ProduceMessage(topic, binary);
      }
    
      public static ProduceMessage fromString(String topic, String tag, String data){
        byte[] binary = data.getBytes(Charsets.UTF_8);
        return new ProduceMessage(topic,tag,binary);
      }
    
      public static ProduceMessage fromJSON(String topic, JSONObject data) {
        byte[] binary = JSON.toJSONBytes(data);
        return fromBinary(topic, binary);
      }
    
      public static ProduceMessage fromObject(String topic, Object obj) {
        byte[] binary = JSON.toJSONBytes(obj);
        return fromBinary(topic, binary);
      }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66

# 2. 消费消息

@Component
public class Test1MessageListener implements MessageListener{

  @Override
  public String getId() {
    //单实例可不填,多实例需要和配置文件中的id对应.  
    return "ons-1";
  }

  @Override
  public String getTopic() {
    return "topic";
  }

  @Override
  public String getTag() {
    return "tag";
  }

  @Override
  public void process(ConsumeMessage message) {
    //message.getValueAsString()得到消息内容.
    System.out.println("Test1MessageListener正在监听:"+message.getValueAsString());
    
    //properties存储的是消息的自定义属性信息:例如上下文信息、topic、tag等.
	Properties properties = message.getUserProperties();
 	System.out.println("message监听到的上下文信息是: => "+ properties);
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

注意事项

  1. 单实例消费者getId()返回为空即可,此处不做任何判断;多实例getId()务必和application.yml的paas.mq.ons.consumers.consumer-list[i].id保持一致,否则该监听器无法注册到消费者上;

  2. 同一个消费者实例中,MessageListener实现类可以有多个,订阅不同的topic和tag;

  3. MessageListener中不允许出现tag1||tag2||tag3的订阅方式,请分开成3个MessageListener,每个MessageListener只允许订阅一个tag,符合开闭原则。否则订阅的消息无法被消费;

  4. 如果一个group订阅了一个Topic下的全部tag,并且该Topic有子标签tag,那么MessageListener必须根据tag的个数,拆分成多个MessageListener,每个订阅一个tag.;

  5. 如果一个group订阅了一个Topic,并且该Topic下没有任何tag,MessageListener的tag可以用*或者null来表示;

  6. 各个产品线订阅的topic、tag命名不可太长,group-id/consumer-id也不可太长,Topic+tag+group-id的总长度不能超过255个字符;

  7. 集群消费模式下,需要确保topic+tag+group-id的订阅关系是唯一的,否则会出现消息丢失;

  8. scope要么不填,填写务必保持生产模块和消费模块统一;

  9. 使用ONS消息队列,尽量采用集群消费模式(CLUSTERING),不建议采用广播消费模式(BROADCASTING)。因为广播模式消费失败不会重试,并且广播模式不支持分布式多实例部署;

  10. ConsumeMessage结构如下:

    public class ConsumeMessage implements Serializable{
      /**
       * 消息队列返回的当前消息的id值
       */
      @Getter
      private String messageId;
    
      /**
       * 消息体的二进制数组格式
       */
      private final byte[] payload;
    
      /**
       * 消息主题
       */
      @Getter
      private String topic;
    
      /**
       * 消息tag
       */
      @Getter
      private String tag;
      /**
       * 消息是否已设置为提交状态
       */
      @Getter
      private boolean committed;
      /**
       * 正常返回的情况下,消息队列框架是否自动提交消息
       */
      @Getter
      @Setter
      private boolean autoCommit = true;
    
      /**
       * 返回当前消息失败重试的次数.
       */
      @Getter
      @Setter
      private int reconsumeTimes;
    
      /**
       * userProperties存储了Ons和rabbitmq的Message的原始属性信息.
       * 直接通过key获取相应value.
       */
      @Getterq
      @Setter
      private Properties userProperties;
    
      public ConsumeMessage(String messageId, byte[] payload) {
        this.messageId = messageId;
        this.payload = payload;
      }
    
      public ConsumeMessage(String messageId, byte[] payload,String topic,String tag) {
        this.messageId = messageId;
        this.payload = payload;
        this.topic=topic;
        this.tag=tag;
      }
      /**
       * 提交消息,表示该消息已处理完成
       */
      public final void commit() {
        this.committed = true;
      }
    
      /**
       * 读取消息内容,以byte数组形式返回
       */
      public byte[] getValueAsBytes() {
        return payload;
      }
    
      /**
       * 读取消息内容,以Json对象形式返回
       */
      public JSONObject getValueAsJson() {
        return (JSONObject) JSON.parse(payload);
      }
    
      /**
       * 读取消息内容,以对象形式返回
       */
      public <T> T getValueAsObject(Class<T> cls) {
        return JSON.parseObject(payload, cls);
      }
    
      /**
       * 回读取消息内容,以字符串形式返
       */
      public String getValueAsString() {
        return new String(payload, Charset.defaultCharset());
      }
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97

# 3. 常见问题

  1. 报错:消息发送不出去或者接受不到,也没有提示任何rabbitmq,或者ConnectionFactory相关的日志信息。

    原因1

    pom.xml中没有引用最新的message-queue-starter包
    
    1

    解决方案

    <dependency>
      <groupId>com.glodon.cloud</groupId>
      <artifactId>message-queue-spring-boot-starter</artifactId>
      <!-- version在spring-cloud-glodon已经管理,此处可以省略. -->
      <version>2.3.1.RELEASE</version>
    </dependency>
    
    1
    2
    3
    4
    5
    6

    原因2

    配置文件没有加载,或者application.yml文件加载错了
    
    1

    解决方案

    对于yml文件一定要确定加载的是当前环境下的配置,启动脚本里面的profile最好是定义的变量。
    
    1
  2. 报错:生产消息没问题,消费消息有问题。

    原因:

    1. topic值在生产者和消费者中不一致,大概率是一个在代码中加了scope前缀,一个没有加scope前缀;
       
    2. scope应该放到配置文件中定义,可以是yml文件,可以是apollo中,但是千万不要写死在代码中!!!也千万不要和topic合并写死在配置文件中!!!
       反面典型:DEV_GLD_SYS_MGR!!!写成这样是肯定无法消费到的!
       
    3. group-id配置有问题,导致消息无法订阅上,需要前去阿里云ONS控制台观察.
    
    
    1
    2
    3
    4
    5
    6
    7

    解决方案:

    1. scope=DEV_GLD,topic=SYS_MGR,topic是MessageListener.java中定义的,scope和topic一定要分开;
    2. 去阿里云控制台观察group-id是否配置正确.
    
    1
    2
  3. 报错:ONS启动服务报错如下

    img

    No route info of this topic, VPC_DEV_xxx
    
    1

    原因: 配置文件有问题

    paas:  
    	mq:    
    		ons:      
    			producers:        
    				producer-list:          
    					- id: commonmq            
    					  access-key: LTAIIwVnIn11IH0p            
    					  secret-key: r4kyqm6GXGTl9dmWFPSRcO1RyIHRHa            
    					  scope: VPC_DEV_GLD            
    					  message-model: CLUSTERING            
    					  group-id:            
    					  ordered: false            
    					  namesrv-addr: http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    namesrv-addr配置错误,access-key、secret-key有问题,或者scope有问题

    解决方案: 参考3.1 和 3.2 的配置文件.

  4. 报错:rabbitmq启动服务报错如下

    2019-12-14 11:05:57,072 - INFO  #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.i.r.AbstractRabbitmqConsumer [AbstractRabbitmqConsumer.java:278]: 消息消费者启动:rabbitmq.
    2019-12-14 11:05:57,088 - INFO  #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.i.r.AbstractRabbitmqConsumer [AbstractRabbitmqConsumer.java:53]: 当前Rabbitmq消费者注册的消息监听器如下:
    2019-12-14 11:05:57,088 - INFO  #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.AbstractConsumer [AbstractConsumer.java:119]: 第1类消息监听器,topic:=> SYS_MGR, tag:=> LIC_CREATE
    2019-12-14 11:05:57,089 - INFO  #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.AbstractConsumer [AbstractConsumer.java:132]: 消息监听器id: , topic: SYS_MGR, tag: LIC_CREATE
    2019-12-14 11:05:57,107 - ERROR #[springAppName_IS_UNDEFINED,,,]# [main] c.g.p.f.m.c.i.r.AbstractRabbitmqConsumer [AbstractRabbitmqConsumer.java:263]: 消息交换机 VPC_PRODUCT_GLODON_exchange_cloudt 创建失败, null
    2019-12-14 11:05:57,107 - WARN  #[springAppName_IS_UNDEFINED,,,]# [AMQP Connection 10.129.247.165:5672] c.r.c.i.ForgivingExceptionHandler [ForgivingExceptionHandler.java:120]: An unexpected connection driver error occured (Exception message: Socket closed)
    
    1
    2
    3
    4
    5
    6

    原因: 消息队列没有增加远程创建交换机的权限;
    解决方案: 在ansible脚本中给相关用户加上远程创建交换机的权限.

  • 在线客服

  • 意见反馈