首页 > Java开发 > ActiveMQ的BlobMessage传输文件示例:完全嵌入式的使用jetty的fileserver

ActiveMQ的BlobMessage传输文件示例:完全嵌入式的使用jetty的fileserver

前段时间的《ActiveMQ传输文件的几种方式原理与优劣》中提到BlobMessage这种利用fileserver中转的高效处理文件的方式。

 

其实ActiveMQ自带的web console中已经有了fileserver的demo,位于ActiveMQ安装目录的webapps下。

启动ActiveMQ的时候,如果配置文件中import了jetty.xml,一般会自动加载这个fileserver,就可以使用了。

 

另外,如果在embedded的环境下使用ActiveMQ,也可以简单的完全embedded的方式使用这个fileserver。

只需要两个步骤:

1、拿到fileserver的代码。很简单,就3个类,而且不需要什么依赖包。

从http://svn.apache.org/repos/asf/activemq/trunk/activemq-fileserver/ 拿到这三个类,可以mvn eclipse:eclipse方式变成项目再引用,

也可以直接把这几个类复制到自己的项目代码里去。

2、嵌入方式启动jetty并加载fileserver

 

[java][/java] view plaincopy

  1. Server server = new Server(8162);
  2. ServletContextHandler  handler = new ServletContextHandler ();
  3. handler.setResourceBase(".");
  4. handler.setContextPath("/fileserver");
  5. System.out.println(handler.getServletContext().getRealPath("/"));
  6. handler.addFilter(org.apache.activemq.util.FilenameGuardFilter.class, "/*", DispatcherType.FORWARD.ordinal() );
  7. handler.addFilter(org.apache.activemq.util.RestFilter.class, "/*", DispatcherType.FORWARD.ordinal() );
  8. ServletHolder defaultServlet = new ServletHolder();
  9. defaultServlet.setName("DefaultServlet");
  10. defaultServlet.setClassName("org.eclipse.jetty.servlet.DefaultServlet");
  11. handler.addServlet(defaultServlet, "/*");
  12. server.setHandler( handler );
  13. server.start();

然后就可以启动fileserver来使用了。可以简单测试一下。

 

步骤如下:

1、手工启动一个ActiveMQ,嵌入或是独立的都可以。

2、运行上面的代码,启动jetty。

3、使用如下代码发送了接收文件:

 

[java][/java] view plaincopy

  1. package kk;
  2. import java.io.File;
  3. import java.io.InputStream;
  4. import java.util.List;
  5. import javax.jms.Message;
  6. import javax.jms.MessageConsumer;
  7. import javax.jms.MessageListener;
  8. import javax.jms.MessageProducer;
  9. import javax.jms.Queue;
  10. import javax.jms.Session;
  11. import org.apache.activemq.ActiveMQConnection;
  12. import org.apache.activemq.ActiveMQConnectionFactory;
  13. import org.apache.activemq.ActiveMQSession;
  14. import org.apache.activemq.BlobMessage;
  15. import org.apache.activemq.command.ActiveMQBlobMessage;
  16. import org.apache.activemq.command.ActiveMQQueue;
  17. import org.apache.commons.io.IOUtils;
  18. public class TestBlob {
  19.     public static void main(String[] args) {
  20.         try {
  21.             ActiveMQConnectionFactory factoryA = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616?jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8162/fileserver/");
  22.             Queue queue = new ActiveMQQueue("blob.kk");
  23.             ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection();
  24.             conn.start();
  25.             ActiveMQSession session = (ActiveMQSession) conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
  26.             MessageConsumer consumer = session.createConsumer(queue);
  27.             MessageListener listener = new MessageListener() {
  28.                 public void onMessage(Message message) {
  29.                     try {
  30.                         System.out.println(" => receive from blob.kk: ");
  31.                         if (message instanceof BlobMessage) {
  32.                             System.out.println("filename:"+message.getStringProperty("FILE.NAME"));
  33.                             System.out.println("filesize:"+message.getLongProperty("FILE.SIZE"));
  34.                               BlobMessage blobMessage = (BlobMessage) message;
  35.                               InputStream in = blobMessage.getInputStream();
  36.                               List list = IOUtils.readLines(in);
  37.                               for(Object s : list) System.out.println(s);
  38.                               in.close();
  39.                               ((ActiveMQBlobMessage)blobMessage).deleteFile();//注意处理完后需要手工删除服务器端文件
  40.                         }
  41.                     } catch (Exception e) {
  42.                         e.printStackTrace();
  43.                     }
  44.                 }
  45.             };
  46.             consumer.setMessageListener(listener);
  47.             File file = new File("D://y.txt");
  48.             MessageProducer producer = session.createProducer(queue);
  49.             BlobMessage blobMessage = session.createBlobMessage(file);
  50.             blobMessage.setStringProperty("FILE.NAME",file.getName());
  51.             blobMessage.setLongProperty("FILE.SIZE",file.length());
  52.             producer.send(blobMessage);
  53.         } catch (Exception e) {
  54.             e.printStackTrace();
  55.         }
  56.     }
  57. }

特别要注意broker是不会自动删除文件的,需要手工删除:

 

((ActiveMQBlobMessage)blobMessage).deleteFile(); //注意处理完后需要手工删除服务器端文件

4、运行TestBlob,查看结果:

 

=> receive from blob.kk:
filename:y.txt
filesize:39
hello,BlobMessage and jetty FileServer

 

文件没有被成功消费之前,在fileserver运行的项目文件夹下可以看到类似如下文件:ID_kimmking-33950-1374560343447-1_1_1_1_1

文件名和消息的关系是 filename = msg.getJMSMessageID().toString().replace(":", "_")


本文固定链接: http://www.devba.com/index.php/archives/4096.html | 开发吧

报歉!评论已关闭.