首页 > 站长在线 > Apache ActiveMQ 集群配置方法

Apache ActiveMQ 集群配置方法

构建高可用的AMQ系统在生产环境中是非常重要的,对于这个apache的消息中间件实现高可用非常简单,只要在Apache ActiveMQ单点基本配置基础上做一次配置变更(如果在一台设备上部署多个AMQ,需要修改对应端口号),即可实现。

AMQ实现高可用部署有三种方案:

1、Master-Slave

2、SharedFile System Master Slave

3、JDBCMaster Slave

第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛;

而第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库;

而第二种方案同样支持N个AMQ实例组网,但由于他是基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。

集群配置方法参考:

http://activemq.apache.org/masterslave.html

“Apache ActiveMQ单点基本配置” 原配置内容:

[html]

  1. <persistenceAdapter>    
  2.             <kahaDB directory="${activemq.data}/kahadb"    
  3.                     enableIndexWriteAsync="true"    
  4.                     enableJournalDiskSyncs="false"/>    
  5.             <!--<amqPersistenceAdapter directory="${activemq.data}/activemq-data" /> -->    
  6.         </persistenceAdapter>   


修改为:

[html]

  1. <persistenceAdapter>  
  2.              <kahaDB directory="X:\\shareBrokerData"  
  3.                         enableIndexWriteAsync="true"  
  4.                         enableJournalDiskSyncs="false"/>  
  5.         </persistenceAdapter>  


注意:

1、前面提到部署一台设备上的AMQ系统,需要修改对应的端口号,如AMQ对外的监听端口61616和jetty的监听端口8161等。

2、如果多套AMQ部署在不同的设备上,这里的directory应该指向一个远程的系统目录(分布式文件系统)

Producer for java:

[java]

  1. ActiveMQConnectionFactory connectionFactory =     
  2.         new ActiveMQConnectionFactory("failover:(  
  3. tcp://192.168.0.87:61616?wireFormat.maxInactivityDuration=0,  
  4. tcp://192.168.0.87:61617?wireFormat.maxInactivityDuration=0)");    
  5.     Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);    
  6.         
  7.     Queue queue = session.createQueue(qName);    
  8.     ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue); // Default DeliveryMode.PERSISTENT    
  9.     //producer.setPriority(3); // Default priority 4.    
  10.     //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);    
  11.         
  12.     session.createObjectMessage();    
  13.     jmsMessage.setObject(message);    
  14.     while(true){          
  15.   
  16.          producer.send(jmsMessage);    
  17.          TimeUnit.MILLISECONDS.sleep(10);  
  18.   
  19.  }  


Consumer for java:

[java]

  1. final String qName = "Test.foo?consumer.prefetchSize=100";  
  2. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("  
  3. failover:(  
  4. tcp://192.168.0.87:61616,  
  5. tcp://192.168.0.87:61617  
  6. )");   
  7. Connection connection = connectionFactory.createConnection();  
  8. connection.start();  
  9.   
  10. final Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
  11. Destination destination = session.createQueue(qName);  
  12. ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session.createConsumer(destination);  
  13. System.out.println(consumer.getPrefetchNumber());  
  14.   
  15. //listener 方式   
  16. consumer.setMessageListener(new MessageListener() {  
  17.      public void onMessage(Message msg) {  
  18.      //TODO something....   
  19.         try {  
  20.             System.out.println("收到消息:"+counter.incrementAndGet()+","+msg);  
  21.         } catch (JMSException e1) {  
  22.         // TODO Auto-generated  
  23.          catch blocke1.printStackTrace();  
  24.         }  
  25.      }  
  26.  });  


通过failover方式进行连接,多个AMQ实例地址使用英文逗号隔开,当某个实例断开时会自动重连,但如果所有实例都失效,failover默认情况下会无限期的等待下去,不会有任何提示。

failover参数配置参考:

http://activemq.apache.org/failover-transport-reference.html


本文固定链接: http://www.devba.com/index.php/archives/6082.html | 开发吧

报歉!评论已关闭.