深入理解 Java 并发锁

本文先阐述 Java 中各种锁的概念。

然后,介绍锁的核心实现 AQS。

然后,重点介绍 Lock 和 Condition 两个接口及其实现。并发编程有两个核心问题:同步和互斥。

互斥,即同一时刻只允许一个线程访问共享资源;

同步,即线程之间如何通信、协作。

这两大问题,管程(sychronized)都是能够解决的。J.U.C 包还提供了 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

并发锁简介

确保线程安全最常见的做法是利用锁机制(Locksychronized)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的,线程安全的。

可重入锁

可重入锁,顾名思义,指的是线程可以重复获取同一把锁。即同一个线程在外层方法获取了锁,在进入内层方法会自动获取锁。

如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。

可重入锁可以在一定程度上避免死锁

  • ReentrantLockReentrantReadWriteLock 是可重入锁。这点,从其命名也不难看出。
  • synchronized 也是一个可重入锁

【示例】synchronized 的可重入示例

1
2
3
4
5
6
7
8
synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}

synchronized void setB() throws Exception{
Thread.sleep(1000);
}

上面的代码就是一个典型场景:如果使用的锁不是可重入锁的话,setB 可能不会被当前线程执行,从而造成死锁。

【示例】ReentrantLock 的可重入示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Task {

private int value;
private final Lock lock = new ReentrantLock();

public Task() {
this.value = 0;
}

public int get() {
// 获取锁
lock.lock();
try {
return value;
} finally {
// 保证锁能释放
lock.unlock();
}
}

public void addOne() {
// 获取锁
lock.lock();
try {
// 注意:此处已经成功获取锁,进入 get 方法后,又尝试获取锁,
// 如果锁不是可重入的,会导致死锁
value = 1 + get();
} finally {
// 保证锁能释放
lock.unlock();
}
}

}

公平锁与非公平锁

  • 公平锁 - 公平锁是指 多线程按照申请锁的顺序来获取锁
  • 非公平锁 - 非公平锁是指 多线程不按照申请锁的顺序来获取锁 。这就可能会出现优先级反转(后来者居上)或者饥饿现象(某线程总是抢不过别的线程,导致始终无法执行)。

公平锁为了保证线程申请顺序,势必要付出一定的性能代价,因此其吞吐量一般低于非公平锁。

公平锁与非公平锁 在 Java 中的典型实现:

  • synchronized 只支持非公平锁
  • ReentrantLockReentrantReadWriteLock,默认是非公平锁,但支持公平锁

独享锁与共享锁

独享锁与共享锁是一种广义上的说法,从实际用途上来看,也常被称为互斥锁与读写锁。

  • 独享锁 - 独享锁是指 锁一次只能被一个线程所持有
  • 共享锁 - 共享锁是指 锁可被多个线程所持有

独享锁与共享锁在 Java 中的典型实现:

  • synchronizedReentrantLock 只支持独享锁
  • ReentrantReadWriteLock 其写锁是独享锁,其读锁是共享锁。读锁是共享锁使得并发读是非常高效的,读写,写读 ,写写的过程是互斥的。

悲观锁与乐观锁

乐观锁与悲观锁不是指具体的什么类型的锁,而是处理并发同步的策略

  • 悲观锁 - 悲观锁对于并发采取悲观的态度,认为:不加锁的并发操作一定会出问题悲观锁适合写操作频繁的场景
  • 乐观锁 - 乐观锁对于并发采取乐观的态度,认为:不加锁的并发操作也没什么问题。对于同一个数据的并发操作,是不会发生修改的。在更新数据的时候,会采用不断尝试更新的方式更新数据。乐观锁适合读多写少的场景

悲观锁与乐观锁在 Java 中的典型实现:

  • 悲观锁在 Java 中的应用就是通过使用 synchronizedLock 显示加锁来进行互斥同步,这是一种阻塞同步。

  • 乐观锁在 Java 中的应用就是采用 CAS 机制(CAS 操作通过 Unsafe 类提供,但这个类不直接暴露为 API,所以都是间接使用,如各种原子类)。

偏向锁、轻量级锁、重量级锁

所谓轻量级锁与重量级锁,指的是锁控制粒度的粗细。显然,控制粒度越细,阻塞开销越小,并发性也就越高。

Java 1.6 以前,重量级锁一般指的是 synchronized ,而轻量级锁指的是 volatile

Java 1.6 以后,针对 synchronized 做了大量优化,引入 4 种锁状态: 无锁状态、偏向锁、轻量级锁和重量级锁。锁可以单向的从偏向锁升级到轻量级锁,再从轻量级锁升级到重量级锁 。

  • 偏向锁 - 偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁。降低获取锁的代价。

  • 轻量级锁 - 是指当锁是偏向锁的时候,被另一个线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,提高性能。

  • 重量级锁 - 是指当锁为轻量级锁的时候,另一个线程虽然是自旋,但自旋不会一直持续下去,当自旋一定次数的时候,还没有获取到锁,就会进入阻塞,该锁膨胀为重量级锁。重量级锁会让其他申请的线程进入阻塞,性能降低。

分段锁

分段锁其实是一种锁的设计,并不是具体的一种锁。所谓分段锁,就是把锁的对象分成多段,每段独立控制,使得锁粒度更细,减少阻塞开销,从而提高并发性。这其实很好理解,就像高速公路上的收费站,如果只有一个收费口,那所有的车只能排成一条队缴费;如果有多个收费口,就可以分流了。

Hashtable 使用 synchronized 修饰方法来保证线程安全性,那么面对线程的访问,Hashtable 就会锁住整个对象,所有的其它线程只能等待,这种阻塞方式的吞吐量显然很低。

Java 1.7 以前的 ConcurrentHashMap 就是分段锁的典型案例。ConcurrentHashMap 维护了一个 Segment 数组,一般称为分段桶。

1
final Segment<K,V>[] segments;

当有线程访问 ConcurrentHashMap 的数据时,ConcurrentHashMap 会先根据 hashCode 计算出数据在哪个桶(即哪个 Segment),然后锁住这个 Segment

显示锁和内置锁

Java 1.5 之前,协调对共享对象的访问时可以使用的机制只有 synchronizedvolatile。这两个都属于内置锁,即锁的申请和释放都是由 JVM 所控制。

Java 1.5 之后,增加了新的机制:ReentrantLockReentrantReadWriteLock ,这类锁的申请和释放都可以由程序所控制,所以常被称为显示锁。

以下对比一下显示锁和内置锁的差异:

  • 主动获取锁和释放锁
    • synchronized 不能主动获取锁和释放锁。获取锁和释放锁都是 JVM 控制的。
    • ReentrantLock 可以主动获取锁和释放锁。(如果忘记释放锁,就可能产生死锁)。
  • 响应中断
    • synchronized 不能响应中断。
    • ReentrantLock 可以响应中断。
  • 超时机制
    • synchronized 没有超时机制。
    • ReentrantLock 有超时机制。ReentrantLock 可以设置超时时间,超时后自动释放锁,避免一直等待。
  • 支持公平锁
    • synchronized 只支持非公平锁。
    • ReentrantLock 支持非公平锁和公平锁。
  • 是否支持共享
    • synchronized 修饰的方法或代码块,只能被一个线程访问(独享)。如果这个线程被阻塞,其他线程也只能等待
    • ReentrantLock 可以基于 Condition 灵活的控制同步条件。
  • 是否支持读写分离
    • synchronized 不支持读写锁分离;
    • ReentrantReadWriteLock 支持读写锁,从而使阻塞读写的操作分开,有效提高并发性。

Lock 和 Condition

为何引入 Lock 和 Condition

并发编程领域,有两大核心问题:一个是互斥,即同一时刻只允许一个线程访问共享资源;另一个是同步,即线程之间如何通信、协作。这两大问题,管程都是能够解决的。Java SDK 并发包通过 Lock 和 Condition 两个接口来实现管程,其中 Lock 用于解决互斥问题,Condition 用于解决同步问题

synchronized 是管程的一种实现,既然如此,何必再提供 Lock 和 Condition。

JDK 1.6 以前,synchronized 还没有做优化,性能远低于 Lock。但是,性能不是引入 Lock 的最重要因素。真正关键在于:synchronized 使用不当,可能会出现死锁。

synchronized 无法通过破坏不可抢占条件来避免死锁。原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。

与内置锁 synchronized 不同的是,Lock 提供了一组无条件的、可轮询的、定时的以及可中断的锁操作,所有获取锁、释放锁的操作都是显式的操作。

  • 能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
  • 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  • 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

Lock 接口

Lock 的接口定义如下:

1
2
3
4
5
6
7
8
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
  • lock() - 获取锁。
  • unlock() - 释放锁。
  • tryLock() - 尝试获取锁,仅在调用时锁未被另一个线程持有的情况下,才获取该锁。
  • tryLock(long time, TimeUnit unit) - 和 tryLock() 类似,区别仅在于限定时间,如果限定时间内未获取到锁,视为失败。
  • lockInterruptibly() - 锁未被另一个线程持有,且线程没有被中断的情况下,才能获取锁。
  • newCondition() - 返回一个绑定到 Lock 对象上的 Condition 实例。

Condition

Condition 实现了管程模型里面的条件变量

在单线程中,一段代码的执行可能依赖于某个状态,如果不满足状态条件,代码就不会被执行(典型的场景,如:if ... else ...)。在并发环境中,当一个线程判断某个状态条件时,其状态可能是由于其他线程的操作而改变,这时就需要有一定的协调机制来确保在同一时刻,数据只能被一个线程锁修改,且修改的数据状态被所有线程所感知。

Java 1.5 之前,主要是利用 Object 类中的 waitnotifynotifyAll 配合 synchronized 来进行线程间通信。

waitnotifynotifyAll 需要配合 synchronized 使用,不适用于 Lock。而使用 Lock 的线程,彼此间通信应该使用 Condition 。这可以理解为,什么样的锁配什么样的钥匙。内置锁(synchronized)配合内置条件队列(waitnotifynotifyAll ),显式锁(Lock)配合显式条件队列(Condition

Condition 的特性

Condition 接口定义如下:

1
2
3
4
5
6
7
8
9
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}

其中,awaitsignalsignalAllwaitnotifynotifyAll 相对应,功能也相似。除此以外,Condition 相比内置条件队列( waitnotifynotifyAll ),提供了更为丰富的功能:

  • 每个锁(Lock)上可以存在多个 Condition,这意味着锁的状态条件可以有多个。
  • 支持公平的或非公平的队列操作。
  • 支持可中断的条件等待,相关方法:awaitUninterruptibly()
  • 支持可定时的等待,相关方法:awaitNanos(long)await(long, TimeUnit)awaitUntil(Date)

Condition 的用法

这里以 Condition 来实现一个消费者、生产者模式。

产品类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class Message {

private final Lock lock = new ReentrantLock();

private final Condition producedMsg = lock.newCondition();

private final Condition consumedMsg = lock.newCondition();

private String message;

private boolean state;

private boolean end;

public void consume() {
//lock
lock.lock();
try {
// no new message wait for new message
while (!state) { producedMsg.await(); }

System.out.println("consume message : " + message);
state = false;
// message consumed, notify waiting thread
consumedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - viewMessage");
} finally {
lock.unlock();
}
}

public void produce(String message) {
lock.lock();
try {
// last message not consumed, wait for it be consumed
while (state) { consumedMsg.await(); }

System.out.println("produce msg: " + message);
this.message = message;
state = true;
// new message added, notify waiting thread
producedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - publishMessage");
} finally {
lock.unlock();
}
}

public boolean isEnd() {
return end;
}

public void setEnd(boolean end) {
this.end = end;
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class MessageConsumer implements Runnable {

private Message message;

public MessageConsumer(Message msg) {
message = msg;
}

@Override
public void run() {
while (!message.isEnd()) { message.consume(); }
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class MessageProducer implements Runnable {

private Message message;

public MessageProducer(Message msg) {
message = msg;
}

@Override
public void run() {
produce();
}

public void produce() {
List<String> msgs = new ArrayList<>();
msgs.add("Begin");
msgs.add("Msg1");
msgs.add("Msg2");

for (String msg : msgs) {
message.produce(msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

message.produce("End");
message.setEnd(true);
}

}

测试

1
2
3
4
5
6
7
8
9
10
public class LockConditionDemo {

public static void main(String[] args) {
Message msg = new Message();
Thread producer = new Thread(new MessageProducer(msg));
Thread consumer = new Thread(new MessageConsumer(msg));
producer.start();
consumer.start();
}
}

ReentrantLock

ReentrantLock 类是 Lock 接口的具体实现,与内置锁 synchronized 相同的是,它是一个可重入锁

ReentrantLock 的特性

ReentrantLock 的特性如下:

  • ReentrantLock 提供了与 synchronized 相同的互斥性、内存可见性和可重入性
  • ReentrantLock 支持公平锁和非公平锁(默认)两种模式。
  • ReentrantLock 实现了 Lock 接口,支持了 synchronized 所不具备的灵活性
    • synchronized 无法中断一个正在等待获取锁的线程
    • synchronized 无法在请求获取一个锁时无休止地等待

ReentrantLock 的用法

前文了解了 ReentrantLock 的特性,接下来,我们要讲述其具体用法。

ReentrantLock 的构造方法

ReentrantLock 有两个构造方法:

1
2
public ReentrantLock() {}
public ReentrantLock(boolean fair) {}
  • ReentrantLock() - 默认构造方法会初始化一个非公平锁(NonfairSync)
  • ReentrantLock(boolean) - new ReentrantLock(true) 会初始化一个公平锁(FairSync)

lock 和 unlock 方法

  • lock() - 无条件获取锁。如果当前线程无法获取锁,则当前线程进入休眠状态不可用,直至当前线程获取到锁。如果该锁没有被另一个线程持有,则获取该锁并立即返回,将锁的持有计数设置为 1。
  • unlock() - 用于释放锁

🔔 注意:请务必牢记,获取锁操作 lock() 必须在 try catch 块中进行,并且将释放锁操作 unlock() 放在 finally 块中进行,以保证锁一定被被释放,防止死锁的发生

示例:ReentrantLock 的基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class ReentrantLockDemo {

public static void main(String[] args) {
Task task = new Task();
MyThread tA = new MyThread("Thread-A", task);
MyThread tB = new MyThread("Thread-B", task);
MyThread tC = new MyThread("Thread-C", task);
tA.start();
tB.start();
tC.start();
}

static class MyThread extends Thread {

private Task task;

public MyThread(String name, Task task) {
super(name);
this.task = task;
}

@Override
public void run() {
task.execute();
}

}

static class Task {

private ReentrantLock lock = new ReentrantLock();

public void execute() {
lock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println(lock.toString());

// 查询当前线程 hold 住此锁的次数
System.out.println("\t holdCount: " + lock.getHoldCount());

// 查询正等待获取此锁的线程数
System.out.println("\t queuedLength: " + lock.getQueueLength());

// 是否为公平锁
System.out.println("\t isFair: " + lock.isFair());

// 是否被锁住
System.out.println("\t isLocked: " + lock.isLocked());

// 是否被当前线程持有锁
System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread());

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}

}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A]
holdCount: 1
queuedLength: 2
isFair: false
isLocked: true
isHeldByCurrentThread: true
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C]
holdCount: 1
queuedLength: 1
isFair: false
isLocked: true
isHeldByCurrentThread: true
// ...

tryLock 方法

与无条件获取锁相比,tryLock 有更完善的容错机制。

  • tryLock() - 可轮询获取锁。如果成功,则返回 true;如果失败,则返回 false。也就是说,这个方法无论成败都会立即返回,获取不到锁(锁已被其他线程获取)时不会一直等待。
  • tryLock(long, TimeUnit) - 可定时获取锁。和 tryLock() 类似,区别仅在于这个方法在获取不到锁时会等待一定的时间,在时间期限之内如果还获取不到锁,就返回 false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回 true。

示例:ReentrantLocktryLock() 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public void execute() {
if (lock.tryLock()) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
}

示例:ReentrantLocktryLock(long, TimeUnit) 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute() {
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 获取锁失败");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 获取锁超时");
e.printStackTrace();
}
}

lockInterruptibly 方法

  • lockInterruptibly() - 可中断获取锁。可中断获取锁可以在获得锁的同时保持对中断的响应。可中断获取锁比其它获取锁的方式稍微复杂一些,需要两个 try-catch 块(如果在获取锁的操作中抛出了 InterruptedException ,那么可以使用标准的 try-finally 加锁模式)。
    • 举例来说:假设有两个线程同时通过 lock.lockInterruptibly() 获取某个锁时,若线程 A 获取到了锁,则线程 B 只能等待。若此时对线程 B 调用 threadB.interrupt() 方法能够中断线程 B 的等待过程。由于 lockInterruptibly() 的声明中抛出了异常,所以 lock.lockInterruptibly() 必须放在 try 块中或者在调用 lockInterruptibly() 的方法外声明抛出 InterruptedException

注意:当一个线程获取了锁之后,是不会被 interrupt() 方法中断的。单独调用 interrupt() 方法不能中断正在运行状态中的线程,只能中断阻塞状态中的线程。因此当通过 lockInterruptibly() 方法获取某个锁时,如果未获取到锁,只有在等待的状态下,才可以响应中断。

示例:ReentrantLocklockInterruptibly() 操作

修改上个示例中的 execute() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void execute() {
try {
lock.lockInterruptibly();

for (int i = 0; i < 3; i++) {
// 略...
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中断");
e.printStackTrace();
} finally {
lock.unlock();
}
}

newCondition 方法

newCondition() - 返回一个绑定到 Lock 对象上的 Condition 实例。Condition 的特性和具体方法请阅读🚀Condition

ReentrantLock 的原理

ReentrantLock 通过内存屏障和 happens-before 关系来保证可见性。

happens-before 是 Java 内存模型(JMM)中的核心概念,它定义了操作之间的内存可见性保证,是理解 Java 并发编程的基础。

什么是 Happens-Before?
Happens-Before 关系表示:如果操作 A happens-before 操作 B,那么:

  • A 操作的结果对 B 操作可见
  • A 操作按顺序排在 B 操作之前

🔔注意:这并不一定意味着在时间上 A 真的在 B 之前执行,而是指从内存可见性的角度来看,A 的效果对 B 是可见的。

ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用 读写锁读-读 可以并发,提高性能。 类似于数据库中的 select ...from ... lock in share mode

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的write() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();

public Object read() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("读取");
sleep(1);
return data;
} finally {
log.debug("释放读锁...");
r.unlock();
}
}

public void write() {
log.debug("获取写锁...");
w.lock();
try {
log.debug("写入");
sleep(1);
} finally {
log.debug("释放写锁...");
w.unlock();
}
}
}

测试 读锁-读锁 可以并发

1
2
3
4
5
6
7
8
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();

new Thread(() -> {
dataContainer.read();
}, "t2").start();

输出结果,从这里可以看到 t2 锁定期间,t1 的读操作不受影响

1
2
3
4
5
6
14:05:14.341 c.DataContainer [t2] - 获取读锁... 
14:05:14.341 c.DataContainer [t1] - 获取读锁...
14:05:14.345 c.DataContainer [t1] - 读取
14:05:14.345 c.DataContainer [t2] - 读取
14:05:15.365 c.DataContainer [t2] - 释放读锁...
14:05:15.386 c.DataContainer [t1] - 释放读锁..

测试 读锁-写锁 相互阻塞

1
2
3
4
5
6
7
8
9
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();

Thread.sleep(100);
new Thread(() -> {
dataContainer.write();
}, "t2").start();

输出结果

1
2
3
4
5
6
14:04:21.838 c.DataContainer [t1] - 获取读锁... 
14:04:21.838 c.DataContainer [t2] - 获取写锁...
14:04:21.841 c.DataContainer [t2] - 写入
14:04:22.843 c.DataContainer [t2] - 释放写锁...
14:04:22.843 c.DataContainer [t1] - 读取
14:04:23.843 c.DataContainer [t1] - 释放读锁...

写锁-写锁 也是相互阻塞的

🔔注意:

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
  • 重入时降级支持:即持有写锁的情况下去获取读锁

StampedLock

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加解读锁

1
2
long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁

1
2
long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通
过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

1
2
3
4
5
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
// 锁升级
}

 
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();

public DataContainerStamped(int data) {
this.data = data;
}

public int read(int readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("optimistic read locking...{}", stamp);
sleep(readTime);
if (lock.validate(stamp)) {
log.debug("read finish...{}, data:{}", stamp, data);
return data;
}
// 锁升级 - 读锁
log.debug("updating to read lock... {}", stamp);
try {
stamp = lock.readLock();
log.debug("read lock {}", stamp);
sleep(readTime);
log.debug("read finish...{}, data:{}", stamp, data);
return data;
} finally {
log.debug("read unlock {}", stamp);
lock.unlockRead(stamp);
}
}

public void write(int newData) {
long stamp = lock.writeLock();
log.debug("write lock {}", stamp);
try {
sleep(2);
this.data = newData;
} finally {
log.debug("write unlock {}", stamp);
lock.unlockWrite(stamp);
}
}
}

测试 读-读 可以优化

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1);
}, "t1").start();
sleep(0.5);
new Thread(() -> {
dataContainer.read(0);
}, "t2").start();
}

输出结果,可以看到实际没有加读锁

1
2
3
4
15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256
15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1
15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1

测试 读-写 时优化读补加读锁

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1);
}, "t1").start();
sleep(0.5);
new Thread(() -> {
dataContainer.write(100);
}, "t2").start();
}

输出结果

1
2
3
4
5
6
7
15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 
15:57:00.717 c.DataContainerStamped [t2] - write lock 384
15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256
15:57:02.719 c.DataContainerStamped [t2] - write unlock 384
15:57:02.719 c.DataContainerStamped [t1] - read lock 513
15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000
15:57:03.719 c.DataContainerStamped [t1] - read unlock 513

🔔注意:

  • StampedLock 不支持条件变量
  • StampedLock 不支持可重入

AQS

AbstractQueuedSynchronizer(简称 AQS)是队列同步器,顾名思义,其主要作用是处理同步。它是并发锁和很多同步工具类的实现基石(如 ReentrantLockReentrantReadWriteLockCountDownLatchSemaphoreFutureTask 等)。

AQS 的要点

AQS 提供了对独享锁与共享锁的支持

java.util.concurrent.locks 包中的相关锁(常用的有 ReentrantLockReadWriteLock)都是基于 AQS 来实现。这些锁都没有直接继承 AQS,而是定义了一个 Sync 类去继承 AQS。为什么要这样呢?因为锁面向的是使用用户,而同步器面向的则是线程控制,那么在锁的实现中聚合同步器而不是直接继承 AQS 就可以很好的隔离二者所关注的事情。

AQS 的应用

AQS 提供了对独享锁与共享锁的支持

独享锁 API

获取、释放独享锁的主要 API 如下:

1
2
3
4
public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)
  • acquire - 获取独占锁。
  • acquireInterruptibly - 获取可中断的独占锁。
  • tryAcquireNanos - 尝试在指定时间内获取可中断的独占锁。在以下三种情况下回返回:
    • 在超时时间内,当前线程成功获取了锁;
    • 当前线程在超时时间内被中断;
    • 超时时间结束,仍未获得锁返回 false。
  • release - 释放独占锁。

共享锁 API

获取、释放共享锁的主要 API 如下:

1
2
3
4
public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)
  • acquireShared - 获取共享锁。
  • acquireSharedInterruptibly - 获取可中断的共享锁。
  • tryAcquireSharedNanos - 尝试在指定时间内获取可中断的共享锁。
  • release - 释放共享锁。

AQS 的原理

ASQ 原理要点:

  • AQS 使用一个整型的 volatile 变量来 维护同步状态。状态的意义由子类赋予。
  • AQS 维护了一个 FIFO 的双链表,用来存储获取锁失败的线程。

AQS 围绕同步状态提供两种基本操作“获取”和“释放”,并提供一系列判断和处理方法,简单说几点:

  • state 是独占的,还是共享的;
  • state 被获取后,其他线程需要等待;
  • state 被释放后,唤醒等待线程;
  • 线程等不及时,如何退出等待。

至于线程是否可以获得 state,如何释放 state,就不是 AQS 关心的了,要由子类具体实现。

死锁

有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁
t1 线程 获得 A对象 锁,接下来想获取 B对象 的锁 t2 线程 获得 B对象 锁,接下来想获取 A对象 的锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
synchronized(A) {
log.debug("lock A");
sleep(1);
synchronized(B) {
log.debug("lock B");
log.debug("操作...");
}
}
}, "t1");

Thread t2 = new Thread(() -> {
synchronized(B) {
log.debug("lock B");
sleep(0.5);
synchronized(A) {
log.debug("lock A");
log.debug("操作...");
}
}
}, "t2");
t1.start();
t2.start();

输出:

1
2
12:22:06.962 [t2] c.TestDeadLock - lock B 
12:22:06.962 [t1] c.TestDeadLock - lock A

活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestLiveLock {
static volatile int count = 10;
static final Object lock = new Object();

public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
sleep(0.2);
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}

未完待续

synchronized解决方案

可以使用 synchronized 来解决上述共享带来的问题,俗称对象锁,它采用互斥的方式让同一时刻至多只有一个线程能持有对象锁,其它线程再想获取这个【对象锁】时就会阻塞住。这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换。

🔔注意:
虽然 java 中互斥和同步都可以采用 synchronized 关键字来完成,但它们还是有区别的:

  • 互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码
  • 同步是由于线程执行的先后、顺序不同、需要一个线程等待其它线程运行到某个点

语法:

解决共享带来的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int i = 0;
static final Object room = new Object();

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int j = 0; j < 5000; j++) {
synchronized(room) {
i++;
}
}
}, "t1");

Thread t2 = new Thread(() -> {
for (int j = 0; j < 5000; j++) {
synchronized(room) {
i--;
}
}
}, "t2");

t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}", i);
}

方法上的synchronized

变量的线程安全分析

成员变量和静态变量是否线程安全?

  • 如果它们没有共享,则线程安全
  • 如果它们被共享了,根据它们的状态是否能够改变,又分两种情况
    • 如果只有读操作,则线程安全
    • 如果有读写操作,则这段代码是临界区,需要考虑线程安全

局部变量是否线程安全?

  • 局部变量是线程安全的
  • 但局部变量引用的对象则未必
    • 如果该对象没有逃离方法的作用访问,它是线程安全的
    • 如果该对象逃离方法的作用范围,需要考虑线程安全

Park&Unpark

基本使用
它们是 LockSupport 类中的方法

1
2
3
4
5
// 暂停当前线程
LockSupport.park();

// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

eg:

1
2
3
4
5
6
7
8
9
10
11
12
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();

sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1)

输出

1
2
3
4
18:42:52.585 c.TestParkUnpark [t1] - start... 
18:42:53.589 c.TestParkUnpark [t1] - park...
18:42:54.583 c.TestParkUnpark [main] - unpark...
18:42:54.583 c.TestParkUnpark [t1] - resume...

特点
与 Object 的 wait & notify 相比

  • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

死锁

有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁
t1 线程 获得 A对象 锁,接下来想获取 B对象 的锁 t2 线程 获得 B对象 锁,接下来想获取 A对象 的锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Object A = new Object();
Object B = new Object();
Thread t1 = new Thread(() -> {
synchronized(A) {
log.debug("lock A");
sleep(1);
synchronized(B) {
log.debug("lock B");
log.debug("操作...");
}
}
}, "t1");

Thread t2 = new Thread(() -> {
synchronized(B) {
log.debug("lock B");
sleep(0.5);
synchronized(A) {
log.debug("lock A");
log.debug("操作...");
}
}
}, "t2");
t1.start();
t2.start();

输出:

1
2
12:22:06.962 [t2] c.TestDeadLock - lock B 
12:22:06.962 [t1] c.TestDeadLock - lock A

活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class TestLiveLock {
static volatile int count = 10;
static final Object lock = new Object();

public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
sleep(0.2);
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}

ReentrantLock

相对于 synchronized 它具备如下特点

  • 可中断
  • 可以设置超时时间
  • 可以设置为公平锁
  • 支持多个条件变量

与 synchronized 一样,都支持可重入

基本语法:

1
2
3
4
5
6
7
8
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
可重入

可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁。如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static ReentrantLock lock = new ReentrantLock();

public static void main(String[] args) {
method1();
}

public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}

public static void method2() {
lock.lock();
try {
log.debug("execute method2");
method3();
} finally {
lock.unlock();
}
}

public static void method3() {
lock.lock();
try {
log.debug("execute method3");
} finally {
lock.unlock();
}
}

输出:

1
2
3
execute mothod1
execute mothod2
execute mothod3
可打断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
//注意如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");

lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
} finally {
lock.unlock();
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
18:02:40.520 [main] c.TestInterrupt - 获得了锁 
18:02:40.524 [t1] c.TestInterrupt - 启动...
18:02:41.530 [main] c.TestInterrupt - 执行打断
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchr
onizer.java:898)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchron
izer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at cn.itcast.n4.reentrant.TestInterrupt.lambda$main$0(TestInterrupt.java:17)
at java.lang.Thread.run(Thread.java:748)
18:02:41.532 [t1] c.TestInterrupt - 等锁的过程中被打断
锁超时

立刻失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
//tryLock(1, TimeUnit.SECONDS) 可以设置定时
if (!lock.tryLock()) {
log.debug("获取立刻失败,返回");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");

lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(2);
} finally {
lock.unlock();
}

输出:

1
2
3
18:15:02.918 [main] c.TestTimeout - 获得了锁 
18:15:02.921 [t1] c.TestTimeout - 启动...
18:15:02.921 [t1] c.TestTimeout - 获取立刻失败,返回
公平锁

ReentrantLock 默认是不公平的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ReentrantLock lock = new ReentrantLock(false);//不公平

lock.lock();
for (int i = 0; i < 500; i++) {
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "t" + i).start();
}

// 1s 之后去争抢锁
Thread.sleep(1000);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " start...");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "强行插入").start();
lock.unlock();

强行插入,有机会在中间输出

🔔注意:该实验不一定总能复现

1
2
3
4
5
6
7
8
9
10
11
12
t39 running... 
t40 running...
t41 running...
t42 running...
t43 running...
强行插入 start...
强行插入 running...
t44 running...
t45 running...
t46 running...
t47 running...
t49 running...

改为公平锁后:ReentrantLock lock = new ReentrantLock(true);
强行插入,总是在最后输出

1
2
3
4
5
6
7
8
9
10
t465 running... 
t464 running...
t477 running...
t442 running...
t468 running...
t493 running...
t482 running...
t485 running...
t481 running...
强行插入 running...

公平锁一般没有必要,会降低并发度

条件变量

synchronized 中也有条件变量,通过 wait&notify&notifyAll 实现的。

ReentrantLock 的条件变量是通过 Condition(await、signal、signalAll) 接口实现的,它提供了比 synchronized 的 wait/notify 更强大和灵活的功能。

Condition 使用要点:

  • await 前需要获得锁
  • await 执行后,会释放锁,进入 conditionObject 等待
  • await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
  • 竞争 lock 锁成功后,从 await 后继续执行

eg:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
ReentrantLock lock = new ReentrantLock(true);
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;

public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();

new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();

sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}

private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}

private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}

输出

1
2
3
4
18:52:27.680 [main] c.TestCondition - 送早餐来了 
18:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐
18:52:28.683 [main] c.TestCondition - 送烟来了
18:52:28.683 [Thread-0] c.TestCondition - 等到了它的烟

内存

Memory 要解决共享变量在多线程间的【可见性】问题与多条指令执行时的【有序性】问题

内存模型

JMM 即 Java Memory Model,它定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。
JMM 体现在以下几个方面

  • 原子性 - 保证指令不会受到线程上下文切换的影响
  • 可见性 - 保证指令不会受 cpu 缓存的影响
  • 有序性 - 保证指令不会受 cpu 指令并行优化的影响

可见性

退不出的循环

先来看一个现象,main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止:

1
2
3
4
5
6
7
8
9
10
11
12
13
static boolean run = true;

public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
while (run) {
// ....
}
});
t.start();

sleep(1);
run = false; // 线程t不会如预想的停下来
}

为什么呢?分析一下:

  1. 初始状态, t 线程刚开始从主内存读取了 run 的值到工作内存。
  2. 因为 t 线程要频繁从主内存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存中的高速缓存中,减少对主存中 run 的访问,提高效率。
  3. 1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 线程是从自己工作内存中的高速缓存中读取这个变量的值,结果永远是旧值

解决方法

volatile(易变关键字)

它可以用来修饰成员变量静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取
它的值,线程操作 volatile 变量都是直接操作主存。

可见性 vs 原子性
前面例子体现的实际就是可见性,它保证的是在多个线程之间,一个线程对 volatile 变量的修改对另一个线程可见, 不能保证原子性,仅用在一个写线程,多个读线程的情况。

比较一下之前我们将线程安全时举的例子:两个线程一个 i++ 一个 i-- ,只能保证看到最新值,不能解决指令交错。

🔔注意:synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是 synchronized 是属于重量级操作,性能相对更低
如果在前面示例的死循环中加入 System.out.println() 会发现即使不加 volatile 修饰符,线程 t 也能正确看到对 run 变量的修改了,想一想为什么?

这是因为 System.out.println() 的内部实现实际上起到了内存屏障(Memory Barrier) 的作用。运行到System.out.println()语句会刷新高速缓存

有序性

JVM 会在不影响正确性的前提下,可以调整语句的执行顺序

volatile 修饰的变量,可以禁用指令重排

happens-before

happens-before 是 Java 内存模型(JMM)中的核心概念,它定义了操作之间的内存可见性保证,是理解 Java 并发编程的基础。

什么是 Happens-Before?
Happens-Before 关系表示:如果操作 A happens-before 操作 B,那么:

  • A 操作的结果对 B 操作可见
  • A 操作按顺序排在 B 操作之前

🔔注意:这并不一定意味着在时间上 A 真的在 B 之前执行,而是指从内存可见性的角度来看,A 的效果对 B 是可见的。

不可变

final

  • 属性用 final 修饰保证了该属性是只读的,不能修改
  • 类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性

工具

线程池

ThreadPoolExecutor
自定义线程池ThreadPoolExecutor
Fork&Join

概念
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型
运算。
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计
算,如归并排序、斐波那契数列、都可以用分治思想进行求解。

Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运
算效率

Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

使用
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下
面定义了一个对 1~n 之间的整数求和的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j(topic = "c.AddTask")
class AddTask1 extends RecursiveTask < Integer > {

int n;

public AddTask1(int n) {
this.n = n;
}

@Override
public String toString() {
return "{" + n + '}';
}

@Override
protected Integer compute() {
// 如果 n 已经为 1,可以求得结果了
if (n == 1) {
log.debug("join() {}", n);
return n;
}

// 将任务进行拆分(fork)
AddTask1 t1 = new AddTask1(n - 1);
t1.fork();
log.debug("fork() {} + {}", n, t1);

// 合并(join)结果
int result = n + t1.join();
log.debug("join() {} + {} = {}", n, t1, result);
return result;
}
}

然后提交给 ForkJoinPool 来执行

1
2
3
4
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new AddTask1(5)));
}

结果:

1
2
3
4
5
6
7
8
9
10
[ForkJoinPool-1-worker-0] - fork() 2 + {1} 
[ForkJoinPool-1-worker-1] - fork() 5 + {4}
[ForkJoinPool-1-worker-0] - join() 1
[ForkJoinPool-1-worker-0] - join() 2 + {1} = 3
[ForkJoinPool-1-worker-2] - fork() 4 + {3}
[ForkJoinPool-1-worker-3] - fork() 3 + {2}
[ForkJoinPool-1-worker-3] - join() 3 + {2} = 6
[ForkJoinPool-1-worker-2] - join() 4 + {3} = 10
[ForkJoinPool-1-worker-1] - join() 5 + {4} = 15
15

J.U.C

AQS

AQS (AbstractQueuedSynchronizer) 是 Java 并发包 (java.util.concurrent.locks) 的核心基础框架,它提供了一个队列同步器,用于构建锁和其他同步组件。

核心原理
用一个数字表示资源数量,用队列管理等待的线程

1
2
3
4
// 核心就这3个东西:
state = 1 // 资源数量(比如:1把锁)
head -> 线程A // 排队队列的头
tail -> 线程C // 排队队列的尾
AQS工作流程
  1. 获取资源(排队)
1
2
3
线程A:我要锁! → state=1? → 拿到!(state=1)
线程B:我要锁! → state=1? → 拿不到 → 去排队
线程C:我要锁! → state=1? → 拿不到 → 去排队
  1. 释放资源(叫号)
1
2
线程A:用完啦! → state=0 → 叫下一个!
线程B:轮到我了! → state=1 → 拿到!

实现一个简单的锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

// 最简单的锁实现
public class SimpleLock {
private final Sync sync = new Sync();

// 继承AQS,只需要重写2个方法!
private static class Sync extends AbstractQueuedSynchronizer {
// 尝试获取锁
protected boolean tryAcquire(int arg) {
// CAS操作:如果state是0(没人占用),就改成1(我占用)
return compareAndSetState(0, 1);
}

// 尝试释放锁
protected boolean tryRelease(int arg) {
// 直接把state设回0(释放锁)
setState(0);
return true;
}
}

// 对外提供的锁方法
public void lock() {
sync.acquire(1); // AQS帮你处理排队逻辑
}

public void unlock() {
sync.release(1); // AQS帮你唤醒下一个线程
}
}

使用这个锁

1
2
3
4
5
6
7
8
9
10
11
SimpleLock lock = new SimpleLock();
private int count = 0;

public void safeIncrement() {
lock.lock(); // 获取锁
try {
count++; // 安全操作
} finally {
lock.unlock(); // 释放锁
}
}
AQS使用时机

什么时候用 AQS?

需要自己造轮子时:

  • 自定义锁
  • 自定义同步工具
  • 特殊资源控制

日常开发:直接用现成的!

1
2
3
4
// 99%的情况用这些就够了:
ReentrantLock lock = new ReentrantLock();
Semaphore semaphore = new Semaphore(5);
CountDownLatch latch = new CountDownLatch(3);
ReentrantLock原理

ReentrantLock的简化版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 这就是ReentrantLock的核心思想!
public class MyReentrantLock {
private final Sync sync = new Sync();

private static class Sync extends AbstractQueuedSynchronizer {
protected boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int state = getState();

if (state == 0) {
// 没人占用,我占用
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 重入:我已经占用了,再次获取
int nextState = state + acquires;
setState(nextState);
return true;
}
return false;
}

protected boolean tryRelease(int releases) {
int state = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();

boolean free = false;
if (state == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(state);
return free;
}
}

public void lock() { sync.acquire(1); }
public void unlock() { sync.release(1); }
}

读写锁

ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用 读写锁读-读 可以并发,提高性能。 类似于数据库中的 select ...from ... lock in share mode

提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的write() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();

public Object read() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("读取");
sleep(1);
return data;
} finally {
log.debug("释放读锁...");
r.unlock();
}
}

public void write() {
log.debug("获取写锁...");
w.lock();
try {
log.debug("写入");
sleep(1);
} finally {
log.debug("释放写锁...");
w.unlock();
}
}
}

测试 读锁-读锁 可以并发

1
2
3
4
5
6
7
8
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();

new Thread(() -> {
dataContainer.read();
}, "t2").start();

输出结果,从这里可以看到 t2 锁定期间,t1 的读操作不受影响

1
2
3
4
5
6
14:05:14.341 c.DataContainer [t2] - 获取读锁... 
14:05:14.341 c.DataContainer [t1] - 获取读锁...
14:05:14.345 c.DataContainer [t1] - 读取
14:05:14.345 c.DataContainer [t2] - 读取
14:05:15.365 c.DataContainer [t2] - 释放读锁...
14:05:15.386 c.DataContainer [t1] - 释放读锁..

测试 读锁-写锁 相互阻塞

1
2
3
4
5
6
7
8
9
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();

Thread.sleep(100);
new Thread(() -> {
dataContainer.write();
}, "t2").start();

输出结果

1
2
3
4
5
6
14:04:21.838 c.DataContainer [t1] - 获取读锁... 
14:04:21.838 c.DataContainer [t2] - 获取写锁...
14:04:21.841 c.DataContainer [t2] - 写入
14:04:22.843 c.DataContainer [t2] - 释放写锁...
14:04:22.843 c.DataContainer [t1] - 读取
14:04:23.843 c.DataContainer [t1] - 释放读锁...

写锁-写锁 也是相互阻塞的

🔔注意:

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
  • 重入时降级支持:即持有写锁的情况下去获取读锁
StampedLock

该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用
加解读锁

1
2
long stamp = lock.readLock();
lock.unlockRead(stamp);

加解写锁

1
2
long stamp = lock.writeLock();
lock.unlockWrite(stamp);

乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通
过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。

1
2
3
4
5
long stamp = lock.tryOptimisticRead();
// 验戳
if(!lock.validate(stamp)){
// 锁升级
}

 
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();

public DataContainerStamped(int data) {
this.data = data;
}

public int read(int readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("optimistic read locking...{}", stamp);
sleep(readTime);
if (lock.validate(stamp)) {
log.debug("read finish...{}, data:{}", stamp, data);
return data;
}
// 锁升级 - 读锁
log.debug("updating to read lock... {}", stamp);
try {
stamp = lock.readLock();
log.debug("read lock {}", stamp);
sleep(readTime);
log.debug("read finish...{}, data:{}", stamp, data);
return data;
} finally {
log.debug("read unlock {}", stamp);
lock.unlockRead(stamp);
}
}

public void write(int newData) {
long stamp = lock.writeLock();
log.debug("write lock {}", stamp);
try {
sleep(2);
this.data = newData;
} finally {
log.debug("write unlock {}", stamp);
lock.unlockWrite(stamp);
}
}
}

测试 读-读 可以优化

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1);
}, "t1").start();
sleep(0.5);
new Thread(() -> {
dataContainer.read(0);
}, "t2").start();
}

输出结果,可以看到实际没有加读锁

1
2
3
4
15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 
15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256
15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1
15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1

测试 读-写 时优化读补加读锁

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
DataContainerStamped dataContainer = new DataContainerStamped(1);
new Thread(() -> {
dataContainer.read(1);
}, "t1").start();
sleep(0.5);
new Thread(() -> {
dataContainer.write(100);
}, "t2").start();
}

输出结果

1
2
3
4
5
6
7
15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 
15:57:00.717 c.DataContainerStamped [t2] - write lock 384
15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256
15:57:02.719 c.DataContainerStamped [t2] - write unlock 384
15:57:02.719 c.DataContainerStamped [t1] - read lock 513
15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000
15:57:03.719 c.DataContainerStamped [t1] - read unlock 513

🔔注意:

  • StampedLock 不支持条件变量
  • StampedLock 不支持可重入
Semaphore

信号量,用来限制能同时访问共享资源的线程上限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
// 1. 创建 semaphore 对象
Semaphore semaphore = new Semaphore(3);

// 2. 10个线程同时运行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
// 3. 获取许可
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("running...");
sleep(1);
log.debug("end...");
} finally {
// 4. 释放许可
semaphore.release();
}
}).start();
}
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 07:35:15.485 c.TestSemaphore [Thread-2] - running... 
07:35:15.485 c.TestSemaphore [Thread-1] - running...
07:35:15.485 c.TestSemaphore [Thread-0] - running...
07:35:16.490 c.TestSemaphore [Thread-2] - end...
07:35:16.490 c.TestSemaphore [Thread-0] - end...
07:35:16.490 c.TestSemaphore [Thread-1] - end...
07:35:16.490 c.TestSemaphore [Thread-3] - running...
07:35:16.490 c.TestSemaphore [Thread-5] - running...
07:35:16.490 c.TestSemaphore [Thread-4] - running...
07:35:17.490 c.TestSemaphore [Thread-5] - end...
07:35:17.490 c.TestSemaphore [Thread-4] - end...
07:35:17.490 c.TestSemaphore [Thread-3] - end...
07:35:17.490 c.TestSemaphore [Thread-6] - running...
07:35:17.490 c.TestSemaphore [Thread-7] - running...
07:35:17.490 c.TestSemaphore [Thread-9] - running...
07:35:18.491 c.TestSemaphore [Thread-6] - end...
07:35:18.491 c.TestSemaphore [Thread-7] - end...
07:35:18.491 c.TestSemaphore [Thread-9] - end...
07:35:18.491 c.TestSemaphore [Thread-8] - running...
07:35:19.492 c.TestSemaphore [Thread-8] - end...

CountdownLatch

用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一

1
2
3
4
5
6
7
CountDownLatch latch = new CountDownLatch(3);  // 初始计数=3

// 线程完成任务时:
latch.countDown(); // 计数减1

// 等待的线程:
latch.await(); // 阻塞,直到计数=0

eg:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);

new Thread(() -> {
log.debug("begin...");
sleep(1);
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();

new Thread(() -> {
log.debug("begin...");
sleep(2);
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();

new Thread(() -> {
log.debug("begin...");
sleep(1.5);
latch.countDown();
log.debug("end...{}", latch.getCount());
}).start();

log.debug("waiting...");
latch.await();
log.debug("wait end...");
}

输出

1
2
3
4
5
6
7
8
18:44:00.778 c.TestCountDownLatch [main] - waiting... 
18:44:00.778 c.TestCountDownLatch [Thread-2] - begin...
18:44:00.778 c.TestCountDownLatch [Thread-0] - begin...
18:44:00.778 c.TestCountDownLatch [Thread-1] - begin...
18:44:01.782 c.TestCountDownLatch [Thread-0] - end...2
18:44:02.283 c.TestCountDownLatch [Thread-2] - end...1
18:44:02.782 c.TestCountDownLatch [Thread-1] - end...0
18:44:02.782 c.TestCountDownLatch [main] - wait end...
CyclicBarrier

循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行

new Thread(() -> {
System.out.println("线程1开始.." + new Date());
try {
cb.await(); // 当个数不足时,等待
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程1继续向下运行..." + new Date());
}).start();

new Thread(() -> {
System.out.println("线程2开始.." + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {}
try {
cb.await(); // 2 秒后,线程个数够2,继续运行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程2继续向下运行..." + new Date());
}).start();

🔔注意: CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的, CyclicBarrier 可以被比喻为『人满发车』

线程安全集合类概述

线程安全集合类可以分为三大类:

  1. 遗留的线程安全集合
  • Hashtable
  • Vector
  1. 使用 Collections 装饰的线程安全集合
  • Collections.synchronizedCollection
  • Collections.synchronizedList
  • Collections.synchronizedMap
  • Collections.synchronizedSet
  • Collections.synchronizedNavigableMap
  • Collections.synchronizedNavigableSet
  • Collections.synchronizedSortedMap
  • Collections.synchronizedSortedSet
  1. java.util.concurrent.*

重点介绍 java.util.concurrent.* 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词:BlockingCopyOnWriteConcurrent

  • Blocking 大部分实现基于锁,并提供用来阻塞的方法

  • CopyOnWrite 修改开销相对较重

  • Concurrent 类型的容器

  • 内部很多操作使用 CAS 优化,一般可以提供较高吞吐量

  • 弱一致性

    • 遍历时弱一致性:当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
    • 求大小弱一致性:size 操作未必是 100% 准确
    • 读取弱一致性

遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出ConcurrentModificationException,不再继续遍历

BlockingQueue
ConcurrentLinkedQueue
CopyOnWriteArrayList