线程同步

在开发多线程程序的过程中,你是否发现使用同步关键字后,怎么不好使呢,还是出现不能同步的情况。

见下面的例子,这时一个连接MQ的程序,通过使用多线程,模拟多个客户端同时登录、连接MQ。首先创建多个线程,然后每个线程自己连接MQ服务器。

这里实现的主要功能是测试创建线程的时间(T1),测试所有线程都连接上MQ服务器的时间(T2)。

1. T1实现

直接在创建线程循环语句前后计时,然后算出所花时间。

2. T2实现

使用一个计数器,每个线程连接完成后,计数器加1,该计数器为全局计数器,所有线程均能访问。当计数达到500个时,获取当时时刻,然后减去开始时刻,求得所有线程连接完成的时间。

开始时刻在线程类的构造函数中传入。

3. 客户端编号的正确获取

由于采用多线程,获取编号需要同步,否则获取到的编号会出现紊乱情况(如编号重复),同时也会导致创建连接成功计数不准确。

这里给出的代码是能正常工作的,如果对代码进行一些修改,代码还能正常执行吗?修改如下:

原代码:

 

[java]  
  1. /**
  2.  * 获得线程号
  3.  * @return
  4.  */
  5. synchronized public static int getThreadNum(){
  6.     return threadNum++;
  7. }

修改后代码:

 

 

[java] 
  1. /**
  2.  * 获得线程号
  3.  * @return
  4.  */
  5. synchronized public int getThreadNum(){
  6.     return threadNum++;
  7. }

运行修改后的代码,发现出现了编号重复的现象,实际运行错误信息为:

 

javax.jms.InvalidClientIDException: Broker: Message_Bus – Client: client_255 already connected from tcp://127.0.0.1:40454

意即创建的客户编号有重复。

可以看到,代码的修改部分仅仅去掉了一个修饰词:static,结果就不正确了,这是为什么呢?

如果是静态方法(使用static)关键字,所有创建的线程均共享一个方法getThreadNum;如果使用非静态方法,则在创建新线程时,每个线程均拥有自己的getThreadNum()方法,即使加了同步方法,也只能在类实例的内部起作用,而不能对其他线程对象起作用。

所以,涉及到多线程的共享方法或变量同步,一般使用同步关键字(synchronized)和静态方法(使用static修饰)。

 

示例代码MsgSubscribetest.java

 

[java]  
  1. import javax.jms.Connection;
  2. import javax.jms.ConnectionFactory;
  3. import javax.jms.JMSException;
  4. import javax.jms.Session;
  5. import javax.jms.Topic;
  6. import javax.jms.TopicSubscriber;
  7. import org.apache.activemq.ActiveMQConnectionFactory;
  8. import org.apache.activemq.ActiveMQSession;
  9. public class MsgSubscribeTest extends Thread{
  10.     private final static String JMS_HOST_IP = “127.0.0.1”;
  11.     private final static String JMS_HOST_PORT = “61616”;
  12.     private final static String USERNAME = “system”;
  13.     private final static String PASSWORD = “manager2”;
  14.     private final static String TEST_TOPIC = “my-topic”;
  15.     public static int threadNum = 0;
  16.     //线程完成计数
  17.      public static int iFinishedThread = 0;
  18.      //测试开始时间
  19.     long startTestTime = 0;
  20.      //JMS连接
  21.     Connection connection = null;
  22.     /**
  23.      * 构造函数,设置运行开始时间
  24.      * @param t1
  25.       */
  26.     public UapMsgSubscribeTest(long t1){
  27.         startTestTime = t1;
  28.      }
  29.     /**
  30.      * 临时订阅消息
  31.      * @param msg
  32.      */
  33.     void tempSubscribTopic(String clientid) {
  34.         //System.out.println(“Connection Count:”+ clientid + “,” + iFinishedThread);
  35.         try {
  36.             ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
  37.                     USERNAME,
  38.                      PASSWORD,
  39.                     “tcp://” + JMS_HOST_IP + “:” + JMS_HOST_PORT +”?wireFormat.maxInactivityDurationInitalDelay=30000″);
  40.         //JMS 客户端到JMS Provider 的连接
  41.         Connection connection = connectionFactory.createConnection();
  42.         connection.setClientID(clientid);
  43.             connection.start();
  44.             // Session: 一个发送或接收消息的线程
  45.             //Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
  46.         ActiveMQSession session = (ActiveMQSession) connection
  47.             .createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
  48.             Topic topic = session.createTopic(TEST_TOPIC);
  49.             TopicSubscriber consumer = session.createSubscriber(topic);
  50.             consumer.setMessageListener(new UapTextListener(connection, session));
  51.             //Thread.sleep(3*1000);
  52.             //session.close();
  53.             //connection.stop();
  54.             //alConnMgr.add(connection);
  55.             finishedCount();
  56.             System.out.println(“订阅消息成功!”);
  57.     } catch (Exception e) {
  58.         e.printStackTrace();
  59.     }
  60.     }
  61.     /**
  62.      * 检查连接是否创建完成
  63.      * @return
  64.      */
  65.     boolean checkCreateConnectionFinished(){
  66.      //     System.out.println(“iFinishedThread=” + iFinishedThread + “,” + startTestTime);
  67.         if(iFinishedThread ==499){
  68.         long endTime = System.currentTimeMillis();
  69.             System.out.println(“Finished Created All Connections Cost:” + (endTime-startTestTime));
  70.         return true;
  71.         }
  72.         return false;
  73.     }
  74.     /**
  75.      * 创建完成线程计数
  76.      * @return
  77.      */
  78.     synchronized public static int finishedCount(){
  79.         return iFinishedThread++;
  80.     }
  81.     /**
  82.      * 获得线程号
  83.      * @return
  84.      */
  85.     synchronized public static int getThreadNum(){
  86.         return threadNum++;
  87.     }
  88.     @Override
  89.     public void run() {
  90.         String clientId = “client_” + getThreadNum();
  91.     tempSubscribTopic(clientId);
  92.     //检查是否完成
  93.     checkCreateConnectionFinished();
  94.     }
  95.     /**
  96.      * 主入口
  97.      * @param args
  98.      */
  99.     public static void main(String[] args) {
  100.         long t1 = System.currentTimeMillis();
  101.         long startTime = System.currentTimeMillis();
  102.         for (int i = 0; i < 500; i++) {
  103.             new MsgSubscribeTest(t1).start();
  104.         }
  105.         long endTime = System.currentTimeMillis();
  106.         System.out.println(“Create all threads cost:” + (endTime-startTime));
  107.     }

标签