java生产者消费者问题代码分析

作者要的是一个生产者生成,接着必须有一个消费者消费,那这不是需要单线程吗?或者使用1个大小的阻塞队列。所以只谈论问题本身,不谈论好不好。

具体代码:

Java代码
  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. //生产/消费者模式
  5. public class Basket {
  6.     Lock lock = new ReentrantLock();
  7.     // 产生Condition对象
  8.     Condition produced = lock.newCondition();
  9.     Condition consumed = lock.newCondition();
  10.     boolean available = false;
  11.     public void produce() throws InterruptedException {
  12.         lock.lock();
  13.         try {
  14.             if (available) {
  15.                 produced.await(); // 放弃lock进入睡眠
  16.             }
  17.             System.out.println(“Apple produced.”);
  18.             available = true;
  19.             consumed.signal(); // 发信号唤醒等待这个Condition的线程
  20.         } finally {
  21.             lock.unlock();
  22.         }
  23.     }
  24.     public void consume() throws InterruptedException {
  25.         lock.lock();
  26.         try {
  27.             if (!available) {
  28.                 consumed.await(); // 放弃lock进入睡眠
  29.             }
  30.             /* 吃苹果 */
  31.             System.out.println(“Apple consumed.”);
  32.             available = false;
  33.             produced.signal(); // 发信号唤醒等待这个Condition的线程
  34.         } finally {
  35.             lock.unlock();
  36.         }
  37.     }
  38. }
[java][/java]

view plaincopy

  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.Lock;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. //生产/消费者模式
  5. public class Basket {
  6.     Lock lock = new ReentrantLock();
  7.     // 产生Condition对象
  8.     Condition produced = lock.newCondition();
  9.     Condition consumed = lock.newCondition();
  10.     boolean available = false;
  11.     public void produce() throws InterruptedException {
  12.         lock.lock();
  13.         try {
  14.             if (available) {
  15.                 produced.await(); // 放弃lock进入睡眠
  16.             }
  17.             System.out.println(“Apple produced.”);
  18.             available = true;
  19.             consumed.signal(); // 发信号唤醒等待这个Condition的线程
  20.         } finally {
  21.             lock.unlock();
  22.         }
  23.     }
  24.     public void consume() throws InterruptedException {
  25.         lock.lock();
  26.         try {
  27.             if (!available) {
  28.                 consumed.await(); // 放弃lock进入睡眠
  29.             }
  30.             /* 吃苹果 */
  31.             System.out.println(“Apple consumed.”);
  32.             available = false;
  33.             produced.signal(); // 发信号唤醒等待这个Condition的线程
  34.         } finally {
  35.             lock.unlock();
  36.         }
  37.     }
  38. }
Java代码 复制代码 收藏代码
  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. //测试用类
  4. public class ConditionTester {
  5.     public static void main(String[] args) throws InterruptedException {
  6.         final Basket basket = new Basket();
  7.         // 定义一个producer
  8.         Runnable producer = new Runnable() {
  9.             public void run() {
  10.                 try {
  11.                     basket.produce();
  12.                 } catch (InterruptedException ex) {
  13.                     ex.printStackTrace();
  14.                 }
  15.             }
  16.         };
  17.         // 定义一个consumer
  18.         Runnable consumer = new Runnable() {
  19.             public void run() {
  20.                 try {
  21.                     basket.consume();
  22.                 } catch (InterruptedException ex) {
  23.                     ex.printStackTrace();
  24.                 }
  25.             }
  26.         };
  27.         // 各产生10个consumer和producer
  28.         ExecutorService service = Executors.newCachedThreadPool();
  29.         for (int i = 0; i < 4; i++)
  30.             service.submit(consumer);
  31.         Thread.sleep(2000 * 2);
  32.         for (int i = 0; i < 4; i++)
  33.             service.submit(producer);
  34.         service.shutdown();
  35.     }
  36. }
[java][/java]

view plaincopy

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. //测试用类
  4. public class ConditionTester {
  5.     public static void main(String[] args) throws InterruptedException {
  6.         final Basket basket = new Basket();
  7.         // 定义一个producer
  8.         Runnable producer = new Runnable() {
  9.             public void run() {
  10.                 try {
  11.                     basket.produce();
  12.                 } catch (InterruptedException ex) {
  13.                     ex.printStackTrace();
  14.                 }
  15.             }
  16.         };
  17.         // 定义一个consumer
  18.         Runnable consumer = new Runnable() {
  19.             public void run() {
  20.                 try {
  21.                     basket.consume();
  22.                 } catch (InterruptedException ex) {
  23.                     ex.printStackTrace();
  24.                 }
  25.             }
  26.         };
  27.         // 各产生10个consumer和producer
  28.         ExecutorService service = Executors.newCachedThreadPool();
  29.         for (int i = 0; i < 4; i++)
  30.             service.submit(consumer);
  31.         Thread.sleep(2000 * 2);
  32.         for (int i = 0; i < 4; i++)
  33.             service.submit(producer);
  34.         service.shutdown();
  35.     }
  36. }

原因分析:

1、假设前面有2个producer(此时available=true)

1.1、一个在等待lock

1.2、一个await

2、consumer生成内容后,available=false,produced.signal(); 最后lock.unlock();

3.1、因为lock.unlock所以会触发一个lock获取到锁(虽然signal也会触发等待这个条件的其他线程,但是多线程大家都知道什么时候触发这是不确定的),如果此时正好是[1.1]那么因为available=false,执行完释放锁

3.2、produced.signal()所以会触发一个await的producer;

解决方案:

只要保证[3.1]还是需要await即可解决问题

所以加一个 AtomicInteger producedAwaitCounter = new AtomicInteger(0); 统计当前等待的生产者,如果当前available=false,但已经有生产者生成了内容,那么先等待消费者消费了再说

if (available || producedAwaitCounter.get() > 0) {

producedAwaitCounter.incrementAndGet();

produced.await(); // 放弃lock进入睡眠

producedAwaitCounter.decrementAndGet();

}

当然最简单的是使用:自旋,原理可以自己分析下:

while (available) {

produced.await(); // 放弃lock进入睡眠

}

Java代码
  1. package com.sishuok.es.test;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. import java.util.concurrent.locks.Condition;
  4. import java.util.concurrent.locks.Lock;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. //生产/消费者模式
  7. public class Basket {
  8.     Lock lock = new ReentrantLock(true);
  9. // 产生Condition对象
  10.     Condition produced = lock.newCondition();
  11.     Condition consumed = lock.newCondition();
  12.     boolean available = false;
  13.     AtomicInteger producedAwaitCounter = new AtomicInteger(0);
  14.     public void produce() throws InterruptedException {
  15.         lock.lock();
  16.         try {
  17.             if (available || producedAwaitCounter.get() > 0) {
  18.                 producedAwaitCounter.incrementAndGet();
  19.                 produced.await(); // 放弃lock进入睡眠
  20.                 producedAwaitCounter.decrementAndGet();
  21.             }
  22.             System.out.println(“Apple produced.”);
  23.             available = true;
  24.             consumed.signal(); // 发信号唤醒等待这个Condition的线程
  25.         } finally {
  26.             lock.unlock();
  27.         }
  28.     }
  29.     public void consume() throws InterruptedException {
  30.         lock.lock();
  31.         try {
  32.             if (!available) {
  33.                 consumed.await(); // 放弃lock进入睡眠
  34.             }
  35.             /* 吃苹果 */
  36.             System.out.println(“Apple consumed.”);
  37.             available = false;
  38.             produced.signal(); // 发信号唤醒等待这个Condition的线程
  39.         } finally {
  40.             lock.unlock();
  41.         }
  42.     }
  43. }
[java][/java]

view plaincopy

  1. package com.sishuok.es.test;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. import java.util.concurrent.locks.Condition;
  4. import java.util.concurrent.locks.Lock;
  5. import java.util.concurrent.locks.ReentrantLock;
  6. //生产/消费者模式
  7. public class Basket {
  8.     Lock lock = new ReentrantLock(true);
  9. // 产生Condition对象
  10.     Condition produced = lock.newCondition();
  11.     Condition consumed = lock.newCondition();
  12.     boolean available = false;
  13.     AtomicInteger producedAwaitCounter = new AtomicInteger(0);
  14.     public void produce() throws InterruptedException {
  15.         lock.lock();
  16.         try {
  17.             if (available || producedAwaitCounter.get() > 0) {
  18.                 producedAwaitCounter.incrementAndGet();
  19.                 produced.await(); // 放弃lock进入睡眠
  20.                 producedAwaitCounter.decrementAndGet();
  21.             }
  22.             System.out.println(“Apple produced.”);
  23.             available = true;
  24.             consumed.signal(); // 发信号唤醒等待这个Condition的线程
  25.         } finally {
  26.             lock.unlock();
  27.         }
  28.     }
  29.     public void consume() throws InterruptedException {
  30.         lock.lock();
  31.         try {
  32.             if (!available) {
  33.                 consumed.await(); // 放弃lock进入睡眠
  34.             }
  35.             /* 吃苹果 */
  36.             System.out.println(“Apple consumed.”);
  37.             available = false;
  38.             produced.signal(); // 发信号唤醒等待这个Condition的线程
  39.         } finally {
  40.             lock.unlock();
  41.         }
  42.     }
  43. }

标签