深入理解 Java 并发锁
本文先阐述 Java 中各种锁的概念。
然后,介绍锁的核心实现 AQS。
然后,重点介绍 Lock 和 Condition 两个接口及其实现。并发编程有两个核心问题:同步和互斥。
互斥 ,即同一时刻只允许一个线程访问共享资源;
同步 ,即线程之间如何通信、协作。
这两大问题,管程(sychronized
)都是能够解决的。J.U.C 包还提供了 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题 。
并发锁简介 确保线程安全最常见的做法是利用锁机制(Lock
、sychronized
)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。
可重入锁 可重入锁,顾名思义,指的是线程可以重复获取同一把锁 。即同一个线程在外层方法获取了锁,在进入内层方法会自动获取锁。
如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
可重入锁可以在一定程度上避免死锁 。
ReentrantLock
、ReentrantReadWriteLock
是可重入锁 。这点,从其命名也不难看出。
synchronized
也是一个可重入锁 。
【示例】synchronized
的可重入示例
1 2 3 4 5 6 7 8 synchronized void setA () throws Exception{ Thread.sleep(1000 ); setB(); } synchronized void setB () throws Exception{ Thread.sleep(1000 ); }
上面的代码就是一个典型场景:如果使用的锁不是可重入锁的话,setB
可能不会被当前线程执行,从而造成死锁。
【示例】ReentrantLock
的可重入示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class Task { private int value; private final Lock lock = new ReentrantLock (); public Task () { this .value = 0 ; } public int get () { lock.lock(); try { return value; } finally { lock.unlock(); } } public void addOne () { lock.lock(); try { value = 1 + get(); } finally { lock.unlock(); } } }
公平锁与非公平锁
公平锁 - 公平锁是指 多线程按照申请锁的顺序来获取锁 。
非公平锁 - 非公平锁是指 多线程不按照申请锁的顺序来获取锁 。这就可能会出现优先级反转(后来者居上)或者饥饿现象(某线程总是抢不过别的线程,导致始终无法执行)。
公平锁为了保证线程申请顺序,势必要付出一定的性能代价,因此其吞吐量一般低于非公平锁。
公平锁与非公平锁 在 Java 中的典型实现:
synchronized
只支持非公平锁 。
ReentrantLock
、ReentrantReadWriteLock
,默认是非公平锁,但支持公平锁 。
独享锁与共享锁 独享锁与共享锁是一种广义上的说法,从实际用途上来看,也常被称为互斥锁与读写锁。
独享锁 - 独享锁是指 锁一次只能被一个线程所持有 。
共享锁 - 共享锁是指 锁可被多个线程所持有 。
独享锁与共享锁在 Java 中的典型实现:
synchronized
、ReentrantLock
只支持独享锁 。
ReentrantReadWriteLock
其写锁是独享锁,其读锁是共享锁 。读锁是共享锁使得并发读是非常高效的,读写,写读 ,写写的过程是互斥的。
悲观锁与乐观锁 乐观锁与悲观锁不是指具体的什么类型的锁,而是处理并发同步的策略 。
悲观锁 - 悲观锁对于并发采取悲观的态度,认为:不加锁的并发操作一定会出问题 。悲观锁适合写操作频繁的场景 。
乐观锁 - 乐观锁对于并发采取乐观的态度,认为:不加锁的并发操作也没什么问题。对于同一个数据的并发操作,是不会发生修改的 。在更新数据的时候,会采用不断尝试更新的方式更新数据。乐观锁适合读多写少的场景 。
悲观锁与乐观锁在 Java 中的典型实现:
偏向锁、轻量级锁、重量级锁 所谓轻量级锁与重量级锁,指的是锁控制粒度的粗细。显然,控制粒度越细,阻塞开销越小,并发性也就越高。
Java 1.6 以前,重量级锁一般指的是 synchronized
,而轻量级锁指的是 volatile
。
Java 1.6 以后,针对 synchronized
做了大量优化,引入 4 种锁状态: 无锁状态、偏向锁、轻量级锁和重量级锁。锁可以单向的从偏向锁升级到轻量级锁,再从轻量级锁升级到重量级锁 。
偏向锁 - 偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。
轻量级锁 - 是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。
重量级锁 - 是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。
分段锁 分段锁其实是一种锁的设计,并不是具体的一种锁。所谓分段锁,就是把锁的对象分成多段,每段独立控制,使得锁粒度更细,减少阻塞开销,从而提高并发性。这其实很好理解,就像高速公路上的收费站,如果只有一个收费口,那所有的车只能排成一条队缴费;如果有多个收费口,就可以分流了。
Hashtable
使用 synchronized
修饰方法来保证线程安全性,那么面对线程的访问,Hashtable 就会锁住整个对象,所有的其它线程只能等待,这种阻塞方式的吞吐量显然很低。
Java 1.7 以前的 ConcurrentHashMap
就是分段锁的典型案例。ConcurrentHashMap
维护了一个 Segment
数组,一般称为分段桶。
1 final Segment<K,V>[] segments;
当有线程访问 ConcurrentHashMap
的数据时,ConcurrentHashMap
会先根据 hashCode 计算出数据在哪个桶(即哪个 Segment),然后锁住这个 Segment
。
显示锁和内置锁 Java 1.5 之前,协调对共享对象的访问时可以使用的机制只有 synchronized
和 volatile
。这两个都属于内置锁,即锁的申请和释放都是由 JVM 所控制。
Java 1.5 之后,增加了新的机制:ReentrantLock
、ReentrantReadWriteLock
,这类锁的申请和释放都可以由程序所控制,所以常被称为显示锁。
以下对比一下显示锁和内置锁的差异:
主动获取锁和释放锁
synchronized
不能主动获取锁和释放锁。获取锁和释放锁都是 JVM 控制的。
ReentrantLock
可以主动获取锁和释放锁。(如果忘记释放锁,就可能产生死锁)。
响应中断
synchronized
不能响应中断。
ReentrantLock
可以响应中断。
超时机制
synchronized
没有超时机制。
ReentrantLock
有超时机制。ReentrantLock
可以设置超时时间,超时后自动释放锁,避免一直等待。
支持公平锁
synchronized
只支持非公平锁。
ReentrantLock
支持非公平锁和公平锁。
是否支持共享
被 synchronized
修饰的方法或代码块,只能被一个线程访问(独享)。如果这个线程被阻塞,其他线程也只能等待
ReentrantLock
可以基于 Condition
灵活的控制同步条件。
是否支持读写分离
synchronized
不支持读写锁分离;
ReentrantReadWriteLock
支持读写锁,从而使阻塞读写的操作分开,有效提高并发性。
Lock 和 Condition 为何引入 Lock 和 Condition 并发编程领域,有两大核心问题:一个是互斥 ,即同一时刻只允许一个线程访问共享资源;另一个是同步 ,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题 。
synchronized 是管程的一种实现,既然如此,何必再提供 Lock 和 Condition。
JDK 1.6 以前,synchronized 还没有做优化,性能远低于 Lock。但是,性能不是引入 Lock 的最重要因素。真正关键在于:synchronized 使用不当,可能会出现死锁。
synchronized 无法通过破坏不可抢占条件 来避免死锁。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。
与内置锁 synchronized
不同的是,Lock
提供了一组无条件的、可轮询的、定时的以及可中断的锁操作 ,所有获取锁、释放锁的操作都是显式的操作。
能够响应中断 。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
支持超时 。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
非阻塞地获取锁 。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
Lock 接口 Lock
的接口定义如下:
1 2 3 4 5 6 7 8 public interface Lock { void lock () ; void lockInterruptibly () throws InterruptedException; boolean tryLock () ; boolean tryLock (long time, TimeUnit unit) throws InterruptedException; void unlock () ; Condition newCondition () ; }
lock()
- 获取锁。
unlock()
- 释放锁。
tryLock()
- 尝试获取锁,仅在调用时锁未被另一个线程持有的情况下,才获取该锁。
tryLock(long time, TimeUnit unit)
- 和 tryLock()
类似,区别仅在于限定时间,如果限定时间内未获取到锁,视为失败。
lockInterruptibly()
- 锁未被另一个线程持有,且线程没有被中断的情况下,才能获取锁。
newCondition()
- 返回一个绑定到 Lock
对象上的 Condition
实例。
Condition Condition 实现了管程模型里面的条件变量 。
在单线程中,一段代码的执行可能依赖于某个状态,如果不满足状态条件,代码就不会被执行(典型的场景,如:if ... else ...
)。在并发环境中,当一个线程判断某个状态条件时,其状态可能是由于其他线程的操作而改变,这时就需要有一定的协调机制来确保在同一时刻,数据只能被一个线程锁修改,且修改的数据状态被所有线程所感知。
Java 1.5 之前,主要是利用 Object
类中的 wait
、notify
、notifyAll
配合 synchronized
来进行线程间通信。
wait
、notify
、notifyAll
需要配合 synchronized
使用,不适用于 Lock
。而使用 Lock
的线程,彼此间通信应该使用 Condition
。这可以理解为,什么样的锁配什么样的钥匙。内置锁(synchronized
)配合内置条件队列(wait
、notify
、notifyAll
),显式锁(Lock
)配合显式条件队列(Condition
) 。
Condition 的特性 Condition
接口定义如下:
1 2 3 4 5 6 7 8 9 public interface Condition { void await () throws InterruptedException; void awaitUninterruptibly () ; long awaitNanos (long nanosTimeout) throws InterruptedException; boolean await (long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil (Date deadline) throws InterruptedException; void signal () ; void signalAll () ; }
其中,await
、signal
、signalAll
与 wait
、notify
、notifyAll
相对应,功能也相似。除此以外,Condition
相比内置条件队列( wait
、notify
、notifyAll
),提供了更为丰富的功能:
每个锁(Lock
)上可以存在多个 Condition
,这意味着锁的状态条件可以有多个。
支持公平的或非公平的队列操作。
支持可中断的条件等待,相关方法:awaitUninterruptibly()
。
支持可定时的等待,相关方法:awaitNanos(long)
、await(long, TimeUnit)
、awaitUntil(Date)
。
Condition 的用法 这里以 Condition
来实现一个消费者、生产者模式。
产品类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 class Message { private final Lock lock = new ReentrantLock (); private final Condition producedMsg = lock.newCondition(); private final Condition consumedMsg = lock.newCondition(); private String message; private boolean state; private boolean end; public void consume () { lock.lock(); try { while (!state) { producedMsg.await(); } System.out.println("consume message : " + message); state = false ; consumedMsg.signal(); } catch (InterruptedException ie) { System.out.println("Thread interrupted - viewMessage" ); } finally { lock.unlock(); } } public void produce (String message) { lock.lock(); try { while (state) { consumedMsg.await(); } System.out.println("produce msg: " + message); this .message = message; state = true ; producedMsg.signal(); } catch (InterruptedException ie) { System.out.println("Thread interrupted - publishMessage" ); } finally { lock.unlock(); } } public boolean isEnd () { return end; } public void setEnd (boolean end) { this .end = end; } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class MessageConsumer implements Runnable { private Message message; public MessageConsumer (Message msg) { message = msg; } @Override public void run () { while (!message.isEnd()) { message.consume(); } } }
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class MessageProducer implements Runnable { private Message message; public MessageProducer (Message msg) { message = msg; } @Override public void run () { produce(); } public void produce () { List<String> msgs = new ArrayList <>(); msgs.add("Begin" ); msgs.add("Msg1" ); msgs.add("Msg2" ); for (String msg : msgs) { message.produce(msg); try { Thread.sleep(100 ); } catch (InterruptedException e) { e.printStackTrace(); } } message.produce("End" ); message.setEnd(true ); } }
测试
1 2 3 4 5 6 7 8 9 10 public class LockConditionDemo { public static void main (String[] args) { Message msg = new Message (); Thread producer = new Thread (new MessageProducer (msg)); Thread consumer = new Thread (new MessageConsumer (msg)); producer.start(); consumer.start(); } }
ReentrantLock ReentrantLock
类是 Lock
接口的具体实现,与内置锁 synchronized
相同的是,它是一个可重入锁 。
ReentrantLock 的特性 ReentrantLock
的特性如下:
ReentrantLock
提供了与 synchronized
相同的互斥性、内存可见性和可重入性 。
ReentrantLock
支持公平锁和非公平锁 (默认)两种模式。
ReentrantLock
实现了 Lock
接口,支持了 synchronized
所不具备的灵活性 。
synchronized
无法中断一个正在等待获取锁的线程
synchronized
无法在请求获取一个锁时无休止地等待
ReentrantLock 的用法 前文了解了 ReentrantLock
的特性,接下来,我们要讲述其具体用法。
ReentrantLock 的构造方法 ReentrantLock
有两个构造方法:
1 2 public ReentrantLock () {}public ReentrantLock (boolean fair) {}
ReentrantLock()
- 默认构造方法会初始化一个非公平锁(NonfairSync) ;
ReentrantLock(boolean)
- new ReentrantLock(true)
会初始化一个公平锁(FairSync) 。
lock 和 unlock 方法
lock()
- 无条件获取锁 。如果当前线程无法获取锁,则当前线程进入休眠状态不可用,直至当前线程获取到锁。如果该锁没有被另一个线程持有,则获取该锁并立即返回,将锁的持有计数设置为 1。
unlock()
- 用于释放锁 。
🔔 注意:请务必牢记,获取锁操作 lock()
必须在 try catch
块中进行,并且将释放锁操作 unlock()
放在 finally
块中进行,以保证锁一定被被释放,防止死锁的发生 。
示例:ReentrantLock
的基本操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public class ReentrantLockDemo { public static void main (String[] args) { Task task = new Task (); MyThread tA = new MyThread ("Thread-A" , task); MyThread tB = new MyThread ("Thread-B" , task); MyThread tC = new MyThread ("Thread-C" , task); tA.start(); tB.start(); tC.start(); } static class MyThread extends Thread { private Task task; public MyThread (String name, Task task) { super (name); this .task = task; } @Override public void run () { task.execute(); } } static class Task { private ReentrantLock lock = new ReentrantLock (); public void execute () { lock.lock(); try { for (int i = 0 ; i < 3 ; i++) { System.out.println(lock.toString()); System.out.println("\t holdCount: " + lock.getHoldCount()); System.out.println("\t queuedLength: " + lock.getQueueLength()); System.out.println("\t isFair: " + lock.isFair()); System.out.println("\t isLocked: " + lock.isLocked()); System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread()); try { Thread.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { lock.unlock(); } } } }
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A] holdCount: 1 queuedLength: 2 isFair: false isLocked: true isHeldByCurrentThread: true java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C] holdCount: 1 queuedLength: 1 isFair: false isLocked: true isHeldByCurrentThread: true
tryLock 方法 与无条件获取锁相比,tryLock 有更完善的容错机制。
tryLock()
- 可轮询获取锁 。如果成功,则返回 true;如果失败,则返回 false。也就是说,这个方法无论成败都会立即返回 ,获取不到锁(锁已被其他线程获取)时不会一直等待。
tryLock(long, TimeUnit)
- 可定时获取锁 。和 tryLock()
类似,区别仅在于这个方法在获取不到锁时会等待一定的时间 ,在时间期限之内如果还获取不到锁,就返回 false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true。
示例:ReentrantLock
的 tryLock()
操作
修改上个示例中的 execute()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 public void execute () { if (lock.tryLock()) { try { for (int i = 0 ; i < 3 ; i++) { } } finally { lock.unlock(); } } else { System.out.println(Thread.currentThread().getName() + " 获取锁失败" ); } }
示例:ReentrantLock
的 tryLock(long, TimeUnit)
操作
修改上个示例中的 execute()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void execute () { try { if (lock.tryLock(2 , TimeUnit.SECONDS)) { try { for (int i = 0 ; i < 3 ; i++) { } } finally { lock.unlock(); } } else { System.out.println(Thread.currentThread().getName() + " 获取锁失败" ); } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " 获取锁超时" ); e.printStackTrace(); } }
lockInterruptibly 方法
lockInterruptibly()
- 可中断获取锁 。可中断获取锁可以在获得锁的同时保持对中断的响应。可中断获取锁比其它获取锁的方式稍微复杂一些,需要两个 try-catch
块(如果在获取锁的操作中抛出了 InterruptedException
,那么可以使用标准的 try-finally
加锁模式)。
举例来说:假设有两个线程同时通过 lock.lockInterruptibly()
获取某个锁时,若线程 A 获取到了锁,则线程 B 只能等待。若此时对线程 B 调用 threadB.interrupt()
方法能够中断线程 B 的等待过程。由于 lockInterruptibly()
的声明中抛出了异常,所以 lock.lockInterruptibly()
必须放在 try
块中或者在调用 lockInterruptibly()
的方法外声明抛出 InterruptedException
。
注意:当一个线程获取了锁之后,是不会被 interrupt()
方法中断的。单独调用 interrupt()
方法不能中断正在运行状态中的线程,只能中断阻塞状态中的线程。因此当通过 lockInterruptibly()
方法获取某个锁时,如果未获取到锁,只有在等待的状态下,才可以响应中断。
示例:ReentrantLock
的 lockInterruptibly()
操作
修改上个示例中的 execute()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void execute () { try { lock.lockInterruptibly(); for (int i = 0 ; i < 3 ; i++) { } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + "被中断" ); e.printStackTrace(); } finally { lock.unlock(); } }
newCondition 方法 newCondition()
- 返回一个绑定到 Lock
对象上的 Condition
实例。Condition
的特性和具体方法请阅读🚀Condition 。
ReentrantLock 的原理 ReentrantLock 通过内存屏障和 happens-before 关系来保证可见性。
happens-before
是 Java 内存模型(JMM)中的核心概念,它定义了操作之间的内存可见性保证,是理解 Java 并发编程的基础。
什么是 Happens-Before? Happens-Before 关系表示:如果操作 A happens-before 操作 B,那么:
A 操作的结果对 B 操作可见
A 操作按顺序排在 B 操作之前
🔔注意:这并不一定意味着在时间上 A 真的在 B 之前执行,而是指从内存可见性的角度来看,A 的效果对 B 是可见的。
ReentrantReadWriteLock 当读操作远远高于写操作时,这时候使用 读写锁
让 读-读
可以并发,提高性能。 类似于数据库中的 select ...from ... lock in share mode
提供一个 数据容器类
内部分别使用读锁保护数据的 read()
方法,写锁保护数据的write()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class DataContainer { private Object data; private ReentrantReadWriteLock rw = new ReentrantReadWriteLock (); private ReentrantReadWriteLock.ReadLock r = rw.readLock(); private ReentrantReadWriteLock.WriteLock w = rw.writeLock(); public Object read () { log.debug("获取读锁..." ); r.lock(); try { log.debug("读取" ); sleep(1 ); return data; } finally { log.debug("释放读锁..." ); r.unlock(); } } public void write () { log.debug("获取写锁..." ); w.lock(); try { log.debug("写入" ); sleep(1 ); } finally { log.debug("释放写锁..." ); w.unlock(); } } }
测试 读锁-读锁 可以并发
1 2 3 4 5 6 7 8 DataContainer dataContainer = new DataContainer ();new Thread (() -> { dataContainer.read(); }, "t1" ).start(); new Thread (() -> { dataContainer.read(); }, "t2" ).start();
输出结果,从这里可以看到 t2 锁定期间,t1 的读操作不受影响
1 2 3 4 5 6 14:05:14.341 c.DataContainer [t2] - 获取读锁... 14:05:14.341 c.DataContainer [t1] - 获取读锁... 14:05:14.345 c.DataContainer [t1] - 读取 14:05:14.345 c.DataContainer [t2] - 读取 14:05:15.365 c.DataContainer [t2] - 释放读锁... 14:05:15.386 c.DataContainer [t1] - 释放读锁..
测试 读锁-写锁 相互阻塞
1 2 3 4 5 6 7 8 9 DataContainer dataContainer = new DataContainer ();new Thread (() -> { dataContainer.read(); }, "t1" ).start(); Thread.sleep(100 ); new Thread (() -> { dataContainer.write(); }, "t2" ).start();
输出结果
1 2 3 4 5 6 14:04:21.838 c.DataContainer [t1] - 获取读锁... 14:04:21.838 c.DataContainer [t2] - 获取写锁... 14:04:21.841 c.DataContainer [t2] - 写入 14:04:22.843 c.DataContainer [t2] - 释放写锁... 14:04:22.843 c.DataContainer [t1] - 读取 14:04:23.843 c.DataContainer [t1] - 释放读锁...
写锁-写锁 也是相互阻塞的
🔔注意:
读锁不支持条件变量
重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
重入时降级支持:即持有写锁的情况下去获取读锁
StampedLock 该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用 加解读锁
1 2 long stamp = lock.readLock();lock.unlockRead(stamp);
加解写锁
1 2 long stamp = lock.writeLock();lock.unlockWrite(stamp);
乐观读,StampedLock 支持 tryOptimisticRead()
方法(乐观读),读取完毕后需要做一次 戳校验
如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
1 2 3 4 5 long stamp = lock.tryOptimisticRead();if (!lock.validate(stamp)){ }
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class DataContainerStamped { private int data; private final StampedLock lock = new StampedLock (); public DataContainerStamped (int data) { this .data = data; } public int read (int readTime) { long stamp = lock.tryOptimisticRead(); log.debug("optimistic read locking...{}" , stamp); sleep(readTime); if (lock.validate(stamp)) { log.debug("read finish...{}, data:{}" , stamp, data); return data; } log.debug("updating to read lock... {}" , stamp); try { stamp = lock.readLock(); log.debug("read lock {}" , stamp); sleep(readTime); log.debug("read finish...{}, data:{}" , stamp, data); return data; } finally { log.debug("read unlock {}" , stamp); lock.unlockRead(stamp); } } public void write (int newData) { long stamp = lock.writeLock(); log.debug("write lock {}" , stamp); try { sleep(2 ); this .data = newData; } finally { log.debug("write unlock {}" , stamp); lock.unlockWrite(stamp); } } }
测试 读-读 可以优化
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { DataContainerStamped dataContainer = new DataContainerStamped (1 ); new Thread (() -> { dataContainer.read(1 ); }, "t1" ).start(); sleep(0.5 ); new Thread (() -> { dataContainer.read(0 ); }, "t2" ).start(); }
输出结果,可以看到实际没有加读锁
1 2 3 4 15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1
测试 读-写 时优化读补加读锁
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { DataContainerStamped dataContainer = new DataContainerStamped (1 ); new Thread (() -> { dataContainer.read(1 ); }, "t1" ).start(); sleep(0.5 ); new Thread (() -> { dataContainer.write(100 ); }, "t2" ).start(); }
输出结果
1 2 3 4 5 6 7 15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 15:57:00.717 c.DataContainerStamped [t2] - write lock 384 15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 15:57:02.719 c.DataContainerStamped [t1] - read lock 513 15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 15:57:03.719 c.DataContainerStamped [t1] - read unlock 513
🔔注意:
StampedLock 不支持条件变量
StampedLock 不支持可重入
AQS
AbstractQueuedSynchronizer
(简称 AQS )是队列同步器 ,顾名思义,其主要作用是处理同步。它是并发锁和很多同步工具类的实现基石(如 ReentrantLock
、ReentrantReadWriteLock
、CountDownLatch
、Semaphore
、FutureTask
等)。
AQS 的要点 AQS 提供了对独享锁与共享锁的支持 。
在 java.util.concurrent.locks
包中的相关锁(常用的有 ReentrantLock
、 ReadWriteLock
)都是基于 AQS 来实现。这些锁都没有直接继承 AQS,而是定义了一个 Sync
类去继承 AQS。为什么要这样呢?因为锁面向的是使用用户,而同步器面向的则是线程控制,那么在锁的实现中聚合同步器而不是直接继承 AQS 就可以很好的隔离二者所关注的事情。
AQS 的应用 AQS 提供了对独享锁与共享锁的支持 。
独享锁 API 获取、释放独享锁的主要 API 如下:
1 2 3 4 public final void acquire (int arg) public final void acquireInterruptibly (int arg) public final boolean tryAcquireNanos (int arg, long nanosTimeout) public final boolean release (int arg)
acquire
- 获取独占锁。
acquireInterruptibly
- 获取可中断的独占锁。
tryAcquireNanos
- 尝试在指定时间内获取可中断的独占锁。在以下三种情况下回返回:
在超时时间内,当前线程成功获取了锁;
当前线程在超时时间内被中断;
超时时间结束,仍未获得锁返回 false。
release
- 释放独占锁。
共享锁 API 获取、释放共享锁的主要 API 如下:
1 2 3 4 public final void acquireShared (int arg) public final void acquireSharedInterruptibly (int arg) public final boolean tryAcquireSharedNanos (int arg, long nanosTimeout) public final boolean releaseShared (int arg)
acquireShared
- 获取共享锁。
acquireSharedInterruptibly
- 获取可中断的共享锁。
tryAcquireSharedNanos
- 尝试在指定时间内获取可中断的共享锁。
release
- 释放共享锁。
AQS 的原理
ASQ 原理要点:
AQS 使用一个整型的 volatile
变量来 维护同步状态 。状态的意义由子类赋予。
AQS 维护了一个 FIFO 的双链表,用来存储获取锁失败的线程。
AQS 围绕同步状态提供两种基本操作“获取”和“释放”,并提供一系列判断和处理方法,简单说几点:
state 是独占的,还是共享的;
state 被获取后,其他线程需要等待;
state 被释放后,唤醒等待线程;
线程等不及时,如何退出等待。
至于线程是否可以获得 state,如何释放 state,就不是 AQS 关心的了,要由子类具体实现。
死锁 有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁t1 线程
获得 A对象
锁,接下来想获取 B对象
的锁 t2 线程
获得 B对象
锁,接下来想获取 A对象
的锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 Object A = new Object ();Object B = new Object ();Thread t1 = new Thread (() -> { synchronized (A) { log.debug("lock A" ); sleep(1 ); synchronized (B) { log.debug("lock B" ); log.debug("操作..." ); } } }, "t1" ); Thread t2 = new Thread (() -> { synchronized (B) { log.debug("lock B" ); sleep(0.5 ); synchronized (A) { log.debug("lock A" ); log.debug("操作..." ); } } }, "t2" ); t1.start(); t2.start();
输出:
1 2 12:22:06.962 [t2] c.TestDeadLock - lock B 12:22:06.962 [t1] c.TestDeadLock - lock A
活锁 活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class TestLiveLock { static volatile int count = 10 ; static final Object lock = new Object (); public static void main (String[] args) { new Thread (() -> { while (count > 0 ) { sleep(0.2 ); count--; log.debug("count: {}" , count); } }, "t1" ).start(); new Thread (() -> { while (count < 20 ) { sleep(0.2 ); count++; log.debug("count: {}" , count); } }, "t2" ).start(); } }
未完待续 synchronized解决方案 可以使用 synchronized 来解决上述共享带来的问题,俗称对象锁 ,它采用互斥的方式让同一时刻至多只有一个线程能持有对象锁,其它线程再想获取这个【对象锁】时就会阻塞住。这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换。
🔔注意: 虽然 java 中互斥和同步都可以采用 synchronized 关键字来完成,但它们还是有区别的:
互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码
同步是由于线程执行的先后、顺序不同、需要一个线程等待其它线程运行到某个点
语法:
解决共享带来的问题 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 static int i = 0 ;static final Object room = new Object ();public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread (() -> { for (int j = 0 ; j < 5000 ; j++) { synchronized (room) { i++; } } }, "t1" ); Thread t2 = new Thread (() -> { for (int j = 0 ; j < 5000 ; j++) { synchronized (room) { i--; } } }, "t2" ); t1.start(); t2.start(); t1.join(); t2.join(); log.debug("{}" , i); }
方法上的synchronized 变量的线程安全分析 成员变量和静态变量是否线程安全?
如果它们没有共享,则线程安全
如果它们被共享了,根据它们的状态是否能够改变,又分两种情况
如果只有读操作,则线程安全
如果有读写操作,则这段代码是临界区,需要考虑线程安全
局部变量是否线程安全?
局部变量是线程安全的
但局部变量引用的对象则未必
如果该对象没有逃离方法的作用访问,它是线程安全的
如果该对象逃离方法的作用范围,需要考虑线程安全
Park&Unpark 基本使用 它们是 LockSupport 类中的方法
1 2 3 4 5 LockSupport.park(); LockSupport.unpark(暂停线程对象)
eg:
1 2 3 4 5 6 7 8 9 10 11 12 Thread t1 = new Thread (() -> { log.debug("start..." ); sleep(1 ); log.debug("park..." ); LockSupport.park(); log.debug("resume..." ); }, "t1" ); t1.start(); sleep(2 ); log.debug("unpark..." ); LockSupport.unpark(t1)
输出
1 2 3 4 18:42:52.585 c.TestParkUnpark [t1] - start... 18:42:53.589 c.TestParkUnpark [t1] - park... 18:42:54.583 c.TestParkUnpark [main] - unpark... 18:42:54.583 c.TestParkUnpark [t1] - resume...
特点 与 Object 的 wait & notify 相比
wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
park & unpark 可以先 unpark,而 wait & notify 不能先 notify
死锁 有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁t1 线程
获得 A对象
锁,接下来想获取 B对象
的锁 t2 线程
获得 B对象
锁,接下来想获取 A对象
的锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 Object A = new Object ();Object B = new Object ();Thread t1 = new Thread (() -> { synchronized (A) { log.debug("lock A" ); sleep(1 ); synchronized (B) { log.debug("lock B" ); log.debug("操作..." ); } } }, "t1" ); Thread t2 = new Thread (() -> { synchronized (B) { log.debug("lock B" ); sleep(0.5 ); synchronized (A) { log.debug("lock A" ); log.debug("操作..." ); } } }, "t2" ); t1.start(); t2.start();
输出:
1 2 12:22:06.962 [t2] c.TestDeadLock - lock B 12:22:06.962 [t1] c.TestDeadLock - lock A
活锁 活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class TestLiveLock { static volatile int count = 10 ; static final Object lock = new Object (); public static void main (String[] args) { new Thread (() -> { while (count > 0 ) { sleep(0.2 ); count--; log.debug("count: {}" , count); } }, "t1" ).start(); new Thread (() -> { while (count < 20 ) { sleep(0.2 ); count++; log.debug("count: {}" , count); } }, "t2" ).start(); } }
ReentrantLock 相对于 synchronized 它具备如下特点
可中断
可以设置超时时间
可以设置为公平锁
支持多个条件变量
与 synchronized 一样,都支持可重入
基本语法:
1 2 3 4 5 6 7 8 reentrantLock.lock(); try { } finally { reentrantLock.unlock(); }
可重入 可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁。如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 static ReentrantLock lock = new ReentrantLock ();public static void main (String[] args) { method1(); } public static void method1 () { lock.lock(); try { log.debug("execute method1" ); method2(); } finally { lock.unlock(); } } public static void method2 () { lock.lock(); try { log.debug("execute method2" ); method3(); } finally { lock.unlock(); } } public static void method3 () { lock.lock(); try { log.debug("execute method3" ); } finally { lock.unlock(); } }
输出:
1 2 3 execute mothod1 execute mothod2 execute mothod3
可打断 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 ReentrantLock lock = new ReentrantLock ();Thread t1 = new Thread (() -> { log.debug("启动..." ); try { lock.lockInterruptibly(); } catch (InterruptedException e) { e.printStackTrace(); log.debug("等锁的过程中被打断" ); return ; } try { log.debug("获得了锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); log.debug("获得了锁" ); t1.start(); try { sleep(1 ); t1.interrupt(); log.debug("执行打断" ); } finally { lock.unlock(); }
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 18:02:40.520 [main] c.TestInterrupt - 获得了锁 18:02:40.524 [t1] c.TestInterrupt - 启动... 18:02:41.530 [main] c.TestInterrupt - 执行打断 java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchr onizer.java:898) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchron izer.java:1222) at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) at cn.itcast.n4.reentrant.TestInterrupt.lambda$main$0(TestInterrupt.java:17) at java.lang.Thread.run(Thread.java:748) 18:02:41.532 [t1] c.TestInterrupt - 等锁的过程中被打断
锁超时 立刻失败
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 ReentrantLock lock = new ReentrantLock ();Thread t1 = new Thread (() -> { log.debug("启动..." ); if (!lock.tryLock()) { log.debug("获取立刻失败,返回" ); return ; } try { log.debug("获得了锁" ); } finally { lock.unlock(); } }, "t1" ); lock.lock(); log.debug("获得了锁" ); t1.start(); try { sleep(2 ); } finally { lock.unlock(); }
输出:
1 2 3 18:15:02.918 [main] c.TestTimeout - 获得了锁 18:15:02.921 [t1] c.TestTimeout - 启动... 18:15:02.921 [t1] c.TestTimeout - 获取立刻失败,返回
公平锁 ReentrantLock 默认是不公平的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 ReentrantLock lock = new ReentrantLock (false );lock.lock(); for (int i = 0 ; i < 500 ; i++) { new Thread (() -> { lock.lock(); try { System.out.println(Thread.currentThread().getName() + " running..." ); } finally { lock.unlock(); } }, "t" + i).start(); } Thread.sleep(1000 ); new Thread (() -> { System.out.println(Thread.currentThread().getName() + " start..." ); lock.lock(); try { System.out.println(Thread.currentThread().getName() + " running..." ); } finally { lock.unlock(); } }, "强行插入" ).start(); lock.unlock();
强行插入,有机会在中间输出
🔔注意:该实验不一定总能复现
1 2 3 4 5 6 7 8 9 10 11 12 t39 running... t40 running... t41 running... t42 running... t43 running... 强行插入 start... 强行插入 running... t44 running... t45 running... t46 running... t47 running... t49 running...
改为公平锁后:ReentrantLock lock = new ReentrantLock(true);
强行插入,总是在最后输出
1 2 3 4 5 6 7 8 9 10 t465 running... t464 running... t477 running... t442 running... t468 running... t493 running... t482 running... t485 running... t481 running... 强行插入 running...
公平锁一般没有必要,会降低并发度
条件变量 synchronized 中也有条件变量,通过 wait¬ify¬ifyAll
实现的。
ReentrantLock 的条件变量是通过 Condition(await、signal、signalAll)
接口实现的,它提供了比 synchronized 的 wait/notify 更强大和灵活的功能。
Condition 使用要点:
await 前需要获得锁
await 执行后,会释放锁,进入 conditionObject 等待
await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
竞争 lock 锁成功后,从 await 后继续执行
eg:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 ReentrantLock lock = new ReentrantLock (true );static ReentrantLock lock = new ReentrantLock ();static Condition waitCigaretteQueue = lock.newCondition();static Condition waitbreakfastQueue = lock.newCondition();static volatile boolean hasCigrette = false ;static volatile boolean hasBreakfast = false ;public static void main (String[] args) { new Thread (() -> { try { lock.lock(); while (!hasCigrette) { try { waitCigaretteQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("等到了它的烟" ); } finally { lock.unlock(); } }).start(); new Thread (() -> { try { lock.lock(); while (!hasBreakfast) { try { waitbreakfastQueue.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("等到了它的早餐" ); } finally { lock.unlock(); } }).start(); sleep(1 ); sendBreakfast(); sleep(1 ); sendCigarette(); } private static void sendCigarette () { lock.lock(); try { log.debug("送烟来了" ); hasCigrette = true ; waitCigaretteQueue.signal(); } finally { lock.unlock(); } } private static void sendBreakfast () { lock.lock(); try { log.debug("送早餐来了" ); hasBreakfast = true ; waitbreakfastQueue.signal(); } finally { lock.unlock(); } }
输出
1 2 3 4 18:52:27.680 [main] c.TestCondition - 送早餐来了 18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐 18:52:28.683 [main] c.TestCondition - 送烟来了 18:52:28.683 [Thread-0] c.TestCondition - 等到了它的烟
内存
Memory 要解决共享变量在多线程间的【可见性】问题与多条指令执行时的【有序性】问题
内存模型 JMM 即 Java Memory Model,它定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。 JMM 体现在以下几个方面
原子性 - 保证指令不会受到线程上下文切换的影响
可见性 - 保证指令不会受 cpu 缓存的影响
有序性 - 保证指令不会受 cpu 指令并行优化的影响
可见性 退不出的循环
先来看一个现象,main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止:
1 2 3 4 5 6 7 8 9 10 11 12 13 static boolean run = true ;public static void main (String[] args) throws InterruptedException { Thread t = new Thread (() -> { while (run) { } }); t.start(); sleep(1 ); run = false ; }
为什么呢?分析一下:
初始状态, t 线程刚开始从主内存读取了 run 的值到工作内存。
因为 t 线程要频繁从主内存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存中的高速缓存 中,减少对主存中 run 的访问,提高效率。
1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 线程是从自己工作内存中的高速缓存中读取这个变量的值,结果永远是旧值
解决方法
volatile (易变关键字)
它可以用来修饰成员变量 和静态成员变量 ,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取 它的值,线程操作 volatile 变量都是直接操作主存。
可见性 vs 原子性 前面例子体现的实际就是可见性,它保证的是在多个线程之间,一个线程对 volatile 变量的修改对另一个线程可见 , 不能保证原子性,仅用在一个写线程,多个读线程的情况。
比较一下之前我们将线程安全时举的例子:两个线程一个 i++
一个 i--
,只能保证看到最新值,不能解决指令交错。
🔔注意:synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是 synchronized 是属于重量级操作,性能相对更低 如果在前面示例的死循环中加入 System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到对 run 变量的修改了,想一想为什么?
这是因为 System.out.println() 的内部实现实际上起到了内存屏障(Memory Barrier) 的作用。运行到System.out.println()
语句会刷新高速缓存 。
有序性 JVM 会在不影响正确性的前提下,可以调整语句的执行顺序
而volatile 修饰的变量,可以禁用指令重排
happens-before happens-before
是 Java 内存模型(JMM)中的核心概念,它定义了操作之间的内存可见性保证,是理解 Java 并发编程的基础。
什么是 Happens-Before? Happens-Before 关系表示:如果操作 A happens-before 操作 B,那么:
A 操作的结果对 B 操作可见
A 操作按顺序排在 B 操作之前
🔔注意:这并不一定意味着在时间上 A 真的在 B 之前执行,而是指从内存可见性的角度来看,A 的效果对 B 是可见的。
不可变 final
属性用 final 修饰保证了该属性是只读的,不能修改
类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性
工具 线程池 ThreadPoolExecutor 自定义线程池ThreadPoolExecutor
Fork&Join 概念 Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想 ,适用于能够进行任务拆分的 cpu 密集型 运算。 所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计 算,如归并排序、斐波那契数列、都可以用分治思想进行求解。
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运 算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
使用 提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下 面定义了一个对 1~n 之间的整数求和的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf4j(topic = "c.AddTask") class AddTask1 extends RecursiveTask < Integer > { int n; public AddTask1 (int n) { this .n = n; } @Override public String toString () { return "{" + n + '}' ; } @Override protected Integer compute () { if (n == 1 ) { log.debug("join() {}" , n); return n; } AddTask1 t1 = new AddTask1 (n - 1 ); t1.fork(); log.debug("fork() {} + {}" , n, t1); int result = n + t1.join(); log.debug("join() {} + {} = {}" , n, t1, result); return result; } }
然后提交给 ForkJoinPool 来执行
1 2 3 4 public static void main (String[] args) { ForkJoinPool pool = new ForkJoinPool (4 ); System.out.println(pool.invoke(new AddTask1 (5 ))); }
结果:
1 2 3 4 5 6 7 8 9 10 [ForkJoinPool-1-worker-0] - fork() 2 + {1} [ForkJoinPool-1-worker-1] - fork() 5 + {4} [ForkJoinPool-1-worker-0] - join() 1 [ForkJoinPool-1-worker-0] - join() 2 + {1} = 3 [ForkJoinPool-1-worker-2] - fork() 4 + {3} [ForkJoinPool-1-worker-3] - fork() 3 + {2} [ForkJoinPool-1-worker-3] - join() 3 + {2} = 6 [ForkJoinPool-1-worker-2] - join() 4 + {3} = 10 [ForkJoinPool-1-worker-1] - join() 5 + {4} = 15 15
J.U.C AQS AQS (AbstractQueuedSynchronizer) 是 Java 并发包 (java.util.concurrent.locks
) 的核心基础框架,它提供了一个队列同步器 ,用于构建锁和其他同步组件。
核心原理用一个数字表示资源数量,用队列管理等待的线程
1 2 3 4 state = 1 head -> 线程A tail -> 线程C
AQS工作流程
获取资源(排队)
1 2 3 线程A:我要锁! → state=1? → 拿到!(state=1) 线程B:我要锁! → state=1? → 拿不到 → 去排队 线程C:我要锁! → state=1? → 拿不到 → 去排队
释放资源(叫号)
1 2 线程A:用完啦! → state=0 → 叫下一个! 线程B:轮到我了! → state=1 → 拿到!
实现一个简单的锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class SimpleLock { private final Sync sync = new Sync (); private static class Sync extends AbstractQueuedSynchronizer { protected boolean tryAcquire (int arg) { return compareAndSetState(0 , 1 ); } protected boolean tryRelease (int arg) { setState(0 ); return true ; } } public void lock () { sync.acquire(1 ); } public void unlock () { sync.release(1 ); } }
使用这个锁
1 2 3 4 5 6 7 8 9 10 11 SimpleLock lock = new SimpleLock ();private int count = 0 ;public void safeIncrement () { lock.lock(); try { count++; } finally { lock.unlock(); } }
AQS使用时机 什么时候用 AQS?
需要自己造轮子时:
日常开发 :直接用现成的!
1 2 3 4 ReentrantLock lock = new ReentrantLock ();Semaphore semaphore = new Semaphore (5 );CountDownLatch latch = new CountDownLatch (3 );
ReentrantLock原理 ReentrantLock的简化版
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class MyReentrantLock { private final Sync sync = new Sync (); private static class Sync extends AbstractQueuedSynchronizer { protected boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int state = getState(); if (state == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextState = state + acquires; setState(nextState); return true ; } return false ; } protected boolean tryRelease (int releases) { int state = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (state == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(state); return free; } } public void lock () { sync.acquire(1 ); } public void unlock () { sync.release(1 ); } }
读写锁 ReentrantReadWriteLock 当读操作远远高于写操作时,这时候使用 读写锁
让 读-读
可以并发,提高性能。 类似于数据库中的 select ...from ... lock in share mode
提供一个 数据容器类
内部分别使用读锁保护数据的 read()
方法,写锁保护数据的write()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class DataContainer { private Object data; private ReentrantReadWriteLock rw = new ReentrantReadWriteLock (); private ReentrantReadWriteLock.ReadLock r = rw.readLock(); private ReentrantReadWriteLock.WriteLock w = rw.writeLock(); public Object read () { log.debug("获取读锁..." ); r.lock(); try { log.debug("读取" ); sleep(1 ); return data; } finally { log.debug("释放读锁..." ); r.unlock(); } } public void write () { log.debug("获取写锁..." ); w.lock(); try { log.debug("写入" ); sleep(1 ); } finally { log.debug("释放写锁..." ); w.unlock(); } } }
测试 读锁-读锁 可以并发
1 2 3 4 5 6 7 8 DataContainer dataContainer = new DataContainer ();new Thread (() -> { dataContainer.read(); }, "t1" ).start(); new Thread (() -> { dataContainer.read(); }, "t2" ).start();
输出结果,从这里可以看到 t2 锁定期间,t1 的读操作不受影响
1 2 3 4 5 6 14:05:14.341 c.DataContainer [t2] - 获取读锁... 14:05:14.341 c.DataContainer [t1] - 获取读锁... 14:05:14.345 c.DataContainer [t1] - 读取 14:05:14.345 c.DataContainer [t2] - 读取 14:05:15.365 c.DataContainer [t2] - 释放读锁... 14:05:15.386 c.DataContainer [t1] - 释放读锁..
测试 读锁-写锁 相互阻塞
1 2 3 4 5 6 7 8 9 DataContainer dataContainer = new DataContainer ();new Thread (() -> { dataContainer.read(); }, "t1" ).start(); Thread.sleep(100 ); new Thread (() -> { dataContainer.write(); }, "t2" ).start();
输出结果
1 2 3 4 5 6 14:04:21.838 c.DataContainer [t1] - 获取读锁... 14:04:21.838 c.DataContainer [t2] - 获取写锁... 14:04:21.841 c.DataContainer [t2] - 写入 14:04:22.843 c.DataContainer [t2] - 释放写锁... 14:04:22.843 c.DataContainer [t1] - 读取 14:04:23.843 c.DataContainer [t1] - 释放读锁...
写锁-写锁 也是相互阻塞的
🔔注意:
读锁不支持条件变量
重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
重入时降级支持:即持有写锁的情况下去获取读锁
StampedLock 该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用 加解读锁
1 2 long stamp = lock.readLock();lock.unlockRead(stamp);
加解写锁
1 2 long stamp = lock.writeLock();lock.unlockWrite(stamp);
乐观读,StampedLock 支持 tryOptimisticRead()
方法(乐观读),读取完毕后需要做一次 戳校验
如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
1 2 3 4 5 long stamp = lock.tryOptimisticRead();if (!lock.validate(stamp)){ }
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class DataContainerStamped { private int data; private final StampedLock lock = new StampedLock (); public DataContainerStamped (int data) { this .data = data; } public int read (int readTime) { long stamp = lock.tryOptimisticRead(); log.debug("optimistic read locking...{}" , stamp); sleep(readTime); if (lock.validate(stamp)) { log.debug("read finish...{}, data:{}" , stamp, data); return data; } log.debug("updating to read lock... {}" , stamp); try { stamp = lock.readLock(); log.debug("read lock {}" , stamp); sleep(readTime); log.debug("read finish...{}, data:{}" , stamp, data); return data; } finally { log.debug("read unlock {}" , stamp); lock.unlockRead(stamp); } } public void write (int newData) { long stamp = lock.writeLock(); log.debug("write lock {}" , stamp); try { sleep(2 ); this .data = newData; } finally { log.debug("write unlock {}" , stamp); lock.unlockWrite(stamp); } } }
测试 读-读 可以优化
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { DataContainerStamped dataContainer = new DataContainerStamped (1 ); new Thread (() -> { dataContainer.read(1 ); }, "t1" ).start(); sleep(0.5 ); new Thread (() -> { dataContainer.read(0 ); }, "t2" ).start(); }
输出结果,可以看到实际没有加读锁
1 2 3 4 15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1
测试 读-写 时优化读补加读锁
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { DataContainerStamped dataContainer = new DataContainerStamped (1 ); new Thread (() -> { dataContainer.read(1 ); }, "t1" ).start(); sleep(0.5 ); new Thread (() -> { dataContainer.write(100 ); }, "t2" ).start(); }
输出结果
1 2 3 4 5 6 7 15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 15:57:00.717 c.DataContainerStamped [t2] - write lock 384 15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 15:57:02.719 c.DataContainerStamped [t1] - read lock 513 15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 15:57:03.719 c.DataContainerStamped [t1] - read unlock 513
🔔注意:
StampedLock 不支持条件变量
StampedLock 不支持可重入
Semaphore 信号量,用来限制能同时访问共享资源的线程上限
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void main (String[] args) { Semaphore semaphore = new Semaphore (3 ); for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running..." ); sleep(1 ); log.debug("end..." ); } finally { semaphore.release(); } }).start(); } }
输出 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 07:35:15.485 c.TestSemaphore [Thread-2] - running... 07:35:15.485 c.TestSemaphore [Thread-1] - running... 07:35:15.485 c.TestSemaphore [Thread-0] - running... 07:35:16.490 c.TestSemaphore [Thread-2] - end... 07:35:16.490 c.TestSemaphore [Thread-0] - end... 07:35:16.490 c.TestSemaphore [Thread-1] - end... 07:35:16.490 c.TestSemaphore [Thread-3] - running... 07:35:16.490 c.TestSemaphore [Thread-5] - running... 07:35:16.490 c.TestSemaphore [Thread-4] - running... 07:35:17.490 c.TestSemaphore [Thread-5] - end... 07:35:17.490 c.TestSemaphore [Thread-4] - end... 07:35:17.490 c.TestSemaphore [Thread-3] - end... 07:35:17.490 c.TestSemaphore [Thread-6] - running... 07:35:17.490 c.TestSemaphore [Thread-7] - running... 07:35:17.490 c.TestSemaphore [Thread-9] - running... 07:35:18.491 c.TestSemaphore [Thread-6] - end... 07:35:18.491 c.TestSemaphore [Thread-7] - end... 07:35:18.491 c.TestSemaphore [Thread-9] - end... 07:35:18.491 c.TestSemaphore [Thread-8] - running... 07:35:19.492 c.TestSemaphore [Thread-8] - end...
CountdownLatch 用来进行线程同步协作,等待所有线程完成倒计时。 其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一
1 2 3 4 5 6 7 CountDownLatch latch = new CountDownLatch (3 ); latch.countDown(); latch.await();
eg:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch (3 ); new Thread (() -> { log.debug("begin..." ); sleep(1 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }).start(); new Thread (() -> { log.debug("begin..." ); sleep(2 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }).start(); new Thread (() -> { log.debug("begin..." ); sleep(1.5 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }).start(); log.debug("waiting..." ); latch.await(); log.debug("wait end..." ); }
输出
1 2 3 4 5 6 7 8 18:44:00.778 c.TestCountDownLatch [main] - waiting... 18:44:00.778 c.TestCountDownLatch [Thread-2] - begin... 18:44:00.778 c.TestCountDownLatch [Thread-0] - begin... 18:44:00.778 c.TestCountDownLatch [Thread-1] - begin... 18:44:01.782 c.TestCountDownLatch [Thread-0] - end...2 18:44:02.283 c.TestCountDownLatch [Thread-2] - end...1 18:44:02.782 c.TestCountDownLatch [Thread-1] - end...0 18:44:02.782 c.TestCountDownLatch [main] - wait end...
CyclicBarrier 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 CyclicBarrier cb = new CyclicBarrier (2 ); new Thread (() -> { System.out.println("线程1开始.." + new Date ()); try { cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程1继续向下运行..." + new Date ()); }).start(); new Thread (() -> { System.out.println("线程2开始.." + new Date ()); try { Thread.sleep(2000 ); } catch (InterruptedException e) {} try { cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程2继续向下运行..." + new Date ()); }).start();
🔔注意: CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的, CyclicBarrier 可以被比喻为『人满发车』
线程安全集合类概述 线程安全集合类可以分为三大类:
遗留的线程安全集合
使用 Collections 装饰的线程安全集合
Collections.synchronizedCollection
Collections.synchronizedList
Collections.synchronizedMap
Collections.synchronizedSet
Collections.synchronizedNavigableMap
Collections.synchronizedNavigableSet
Collections.synchronizedSortedMap
Collections.synchronizedSortedSet
java.util.concurrent.*
重点介绍 java.util.concurrent.*
下的线程安全集合类,可以发现它们有规律,里面包含三类关键词:Blocking 、CopyOnWrite 、Concurrent
遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出ConcurrentModificationException,不再继续遍历
BlockingQueue ConcurrentLinkedQueue CopyOnWriteArrayList