ActiveMQ高级特性:虚拟Destinations实现消费者分组与简单路由

ActiveMQ支持的虚拟Destinations分为有两种,分别是

Ø  虚拟主题(Virtual Topics)

Ø  组合 Destinations(CompositeDestinations)

这两种虚拟Destinations可以看做对简单的topic和queue用法的补充,基于它们可以实现一些简单有用的EIP功能,虚拟主题类似于1对多的分支功能+消费端的cluster+failover,组合Destinations类似于简单的destinations直接的路由功能。

 

1.   虚拟主题(Virtual Topics)

ActiveMQ中,topic只有在持久订阅(durablesubscription)下是持久化的。存在持久订阅时,每个持久订阅者,都相当于一个持久化的queue的客户端,它会收取所有消息。这种情况下存在两个问题:

1.        同一应用内consumer端负载均衡的问题:同一个应用上的一个持久订阅不能使用多个consumer来共同承担消息处理功能。因为每个都会获取所有消息。queue模式可以解决这个问题,broker端又不能将消息发送到多个应用端。所以,既要发布订阅,又要让消费者分组,这个功能jms规范本身是没有的。

2.        同一应用内consumer端failover的问题:由于只能使用单个的持久订阅者,如果这个订阅者出错,则应用就无法处理消息了,系统的健壮性不高。

为了解决这两个问题,ActiveMQ中实现了虚拟Topic的功能。使用起来非常简单。

对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。

对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。

 

Virtual Topics使用示例如下

 

[java][/java] view plaincopy

  1. package kk;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. import javax.jms.Message;
  4. import javax.jms.MessageConsumer;
  5. import javax.jms.MessageListener;
  6. import javax.jms.MessageProducer;
  7. import javax.jms.Queue;
  8. import javax.jms.Session;
  9. import javax.jms.TextMessage;
  10. import org.apache.activemq.ActiveMQConnection;
  11. import org.apache.activemq.ActiveMQConnectionFactory;
  12. import org.apache.activemq.command.ActiveMQQueue;
  13. import org.apache.activemq.command.ActiveMQTopic;
  14. public class TestVirtualTopic {
  15.     public static void main(String[] args) {
  16.         try {
  17.             ActiveMQConnectionFactory factoryA = new ActiveMQConnectionFactory(
  18.                     “tcp://127.0.0.1:61616”);
  19.             Queue queue = new ActiveMQQueue(getVirtualTopicConsumerNameA());
  20.             ActiveMQConnection conn = (ActiveMQConnection) factoryA
  21.                     .createConnection();
  22.             conn.start();
  23.             Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
  24.             MessageConsumer consumer1 = session.createConsumer( queue );
  25.             MessageConsumer consumer2 = session.createConsumer( queue );
  26.             MessageConsumer consumer3 = session.createConsumer( new ActiveMQQueue(getVirtualTopicConsumerNameB()) );
  27.             final AtomicInteger aint1 = new AtomicInteger(0);
  28.             MessageListener listenerA = new MessageListener() {
  29.                 public void onMessage(Message message) {
  30.                     try {
  31.                         System.out.println(aint1.incrementAndGet()
  32.                                 + ” => receive from “+ getVirtualTopicConsumerNameA() +”: ” + message);
  33.                     } catch (Exception e) {
  34.                         e.printStackTrace();
  35.                     }
  36.                 }
  37.             };
  38.             consumer1.setMessageListener(listenerA);
  39.             consumer2.setMessageListener(listenerA);
  40.             final AtomicInteger aint2 = new AtomicInteger(0);
  41.             MessageListener listenerB = new MessageListener() {
  42.                 public void onMessage(Message message) {
  43.                     try {
  44.                         System.out.println(aint2.incrementAndGet()
  45.                                 + ” => receive from “+ getVirtualTopicConsumerNameB() +”: ” + message);
  46.                     } catch (Exception e) {
  47.                         e.printStackTrace();
  48.                     }
  49.                 }
  50.             };
  51.             consumer3.setMessageListener(listenerB);
  52.             MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName()));
  53.             int index = 0;
  54.             while (index++ < 100) {
  55.                 TextMessage message = session.createTextMessage(index
  56.                         + ” message.”);
  57.                 producer.send(message);
  58.             }
  59.         } catch (Exception e) {
  60.             e.printStackTrace();
  61.         }
  62.     }
  63.       protected static String getVirtualTopicName() {
  64.             return “VirtualTopic.TEST”;
  65.         }
  66.         protected static String getVirtualTopicConsumerNameA() {
  67.             return “Consumer.A.VirtualTopic.TEST”;
  68.         }
  69.         protected static String getVirtualTopicConsumerNameB() {
  70.             return “Consumer.B.VirtualTopic.TEST”;
  71.         }
  72. }

 

使用同样queue名称的消费者会平分所有消息。

从queue接收到的消息,message.getJMSDestination().toString()为topic://VirtualTopic.TEST,即原始的destination。消息的persistent属性为true,即每个相当于一个持久订阅。

Virtual Topic这个功能特性在broker上有个总开关,useVirtualTopics属性,默认为true,设置为false即可关闭此功能。

当此功能开启,并且使用了持久化的存储时,broker启动的时候会从持久化存储里拿到所有的destinations的名称,如果名称模式与Virtual Topics匹配,则把它们添加到系统的Virtual Topics列表中去。当然,没有显式定义的Virtual Topics,也可以直接使用的,系统会自动创建对应的实际topic。

当有consumer访问此VirtualTopics时,系统会自动创建持久化的queue,并在每次Topic收到消息时,分发到具体的queue。

消费端使用的queue名称前缀的Consumer是可以修改的。示例如下:

[java][/java] view plaincopy

  1. <beans
  2.   xmlns=”http://www.springframework.org/schema/beans”
  3.   xmlns:amq=”http://activemq.apache.org/schema/core”
  4.   xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
  5.   xsi:schemaLocation=”http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  6.   http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd”>
  7. <bean class=”org.springframework.beans.factory.config.PropertyPlaceholderConfigurer” />
  8.   <broker xmlns=”http://activemq.apache.org/schema/core”>
  9.     <destinationInterceptors>
  10.       <virtualDestinationInterceptor>
  11.         <virtualDestinations>
  12.           <virtualTopic name=”>”prefix=”VirtualTopicConsumers.*.”selectorAware=”false”/>
  13.         </virtualDestinations>
  14.       </virtualDestinationInterceptor>
  15.     </destinationInterceptors>
  16.   </broker>
  17. </beans>

 

 

前缀修改成了VirtualTopicConsumers。

其实也可以使用postfix属性设置后缀(貌似一般没有必要)。

selectorAware属性则表明如果consumer端有selector,则只有匹配selector的消息才会分派到对应的queue中去。

 

2.   组合Destinations (Composite Destinations)

标签