welcome to my blog
问题描述 Java BlockingQueue 阻塞队列的take()和put()方法是线程安全的吗? 多线程下调用take()或者put()方法会出问题吗?
看了BlockingQueue的三个实现类, 发现对应的方法中都使用了锁, 所以不会出现线程安全问题
- //ArrayBlockingQueue的put()方法 public void put(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock; //获取锁 lock.lockInterruptibly(); try { while (count == items.length) //队列满了, 进入阻塞状态 notFull.await(); enqueue(e); } finally { //释放锁 lock.unlock(); } }//ArrayBlockingQueue的take()方法 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; //获取锁 lock.lockInterruptibly(); try { while (count == 0) //队列为空,进入阻塞状态 notEmpty.await(); //弹出元素 return dequeue(); } finally { //释放锁 lock.unlock(); } }
复制代码 LinkedBlockingQueue
- //LinkedBlockingQueue的put()方法 public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获取锁 putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } //元素入队 enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { //释放锁 putLock.unlock(); } if (c == 0) signalNotEmpty(); } //LinkedBlockingQueue的take()方法 public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //获取锁 takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { //释放锁 takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
复制代码 PriorityBlockingQueue
[code]//PriorityBlockingQueue的put()方法public void put(E e) { //锁的操作在offer()方法中 offer(e); // never need to block }//PriorityBlockingQueue的offer()方法public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; //获取锁 lock.lock(); int n, cap; Object[] es; while ((n = size) >= (cap = (es = queue).length)) tryGrow(es, cap); try { final Comparator |
1. 本论坛所提供的信息均来自网络,本网站只提供平台服务,所有账号发表的言论与本网站无关。
2. 其他单位或个人在使用、转载或引用本文时,必须事先获得该帖子作者和本人的同意。
3. 本帖部分内容转载自其他媒体,但并不代表本人赞同其观点和对其真实性负责。
4. 如有侵权,请立即联系,本网站将及时删除相关内容。