多线程(一)--- 锁 & 关键字

线程的基本操作及状态转换

创建线程的三种方式

继承 Thread 类

方法一

  • 继承 Thread 类并重写 run 方法
  • 创建线程对象
  • 调用该线程对象的 start() 方法来启动线程

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ThreadDemo{
public static void main(String[] args) {
new ThreadTest().start();
}

}

class ThreadTest extends Thread{
private int i;
@Override
public void run() {
for (; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + " is running: " + i);
}
}
}

方法二

  • 当然也可以直接实现内部类,然后重写 run() 方法,然后适当的可以用 lamda 表达式

demo

1
2
3
4
5
6
7
8
9
10
11
12
    public static void main(String[] args) {
// new ThreadTest().start();
// 2. 直接继承 Thread 类
Thread threadC = new Thread(){
@Override
public void run(){
System.out.println("3.1 get resource c");
super.run();
}
};
threadC.start();
}

实现Runnable接口

方法一

  • 定义一个类实现 Runnable 接口,并重写该接口的 run() 方法
  • 创建 Runnable 实现类的对象,作为创建 Thread 对象的 target 参数,此 Thread 对象才是真正的线程对象,这个跟继承 Thread 不一样,第一种方式是直接 Thread 的实现类就是真正的线程对象,而实现 Runnable 接口得对象在这里并不是真正的线程对象。
  • 调用线程对象的 start() 方法来启动线程

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class RunnableDemo {
public static void main(String[] args) {
RunnableTest runnableTest = new RunnableTest();
new Thread(runnableTest, "线程1").start();
new Thread(runnableTest, "线程2").start();
}
}
class RunnableTest implements Runnable{
private int i = 0;
@Override
public void run() {
for (; i < 100; i++) {
System.out.println(Thread.currentThread().getName() + " is running: " + i);
}
}
}

方法二

  • 直接实现匿名内部类就行了,跟方式一中的方法二一样

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class RunnableDemo {
public static void main(String[] args) {
// RunnableTest runnableTest = new RunnableTest();
// new Thread(runnableTest, "线程1").start();
// new Thread(runnableTest, "线程2").start();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("这里是实现 Runnable 接口的线程");
}
});
thread.start();
}
}

实现 Callable 接口

上述两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。于是java后面又增加了 Callable 和 Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Runnable 和 Callable

  • Runnable 只是一个接口,里面只是声明了一个run方法,可以看到方法的返回值是void,所以线程执行完了没有任何的返回值。
  • Callable 也是一个接口,它里面声明了一个call方法,可以看到它是一个泛型接口,call()函数返回的类型就是传递进来的V类型。线程的执行是异步的,一个线程和另外一个线程的执行是互不干扰的,所以你不可能从别的线程中获得返回值,所以要想获得Callable的返回值就需要用到Future这个接口,Futrue可以监视目标线程调用call的情况,当你调用Future的get()方法以获得结果时,当前线程就开始阻塞,直到call方法结束返回结果。
  • 总而言之,就是三点:
    • Runnable 调用 run方法,Callable 调用 call 方法;
    • Runnable不能有返回值,但是 Callable可以返回一个Future对象;
    • Runnable不可以抛出异常,但是 Callable 可以抛出异常。

Future

Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

下面我们讲解下这五个方法的作用:

cancel方法:用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。

isCancelled方法:表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true

isDone方法:表示任务是否已经完成,若任务完成,则返回true

get()方法:用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回。这里的阻塞需要解释一下,阻塞的是当前调用get方法的线程,直到get方法返回结果才能继续向下执行,如果get方法一直没有返回值,那么当前线程会一直阻塞下去

get(long timeout, TimeUnit unit)方法:获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null,这个就避免了一直获取不到结果使得当前线程一直阻塞的情况发生

也就是说Future提供了三种功能:

  1)判断任务是否完成;

  2)能够中断任务;

  3)能够获取任务执行结果。

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的 FutureTask。

FutureTask

我们先来看一下 FutureTask 的类图:

image-20200206183630930

可以看出 RunnableFuture 继承了 Runnable 接口和 Future 接口,而 FutureTask 实现了 RunnableFuture 接口。所以它既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

实现

  • ExecutorService & Future & Callable

第一种方式是使用继承了ExecutorService的线程池ThreadPoolExecutor中的submit方法,将Callable直接提交创建Future

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class CallableDemo{
public static void main(String[] args) {
// ExecutorService & Callable
ExecutorService service = Executors.newSingleThreadExecutor();
// 或者用这个
// ExecutorService service = Executors.newCachedThreadPool();
Future<String> future = service.submit((Callable) () -> "通过实现Callable接口");
System.out.println(future.isDone());
service.shutdown();
try {
String result = future.get();
System.out.println(result);
System.out.println(future.isDone());

} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
  • ExecutorService & futureTask & Callable

第二种方式就是使用 futureTask,具体看 demo 吧

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class CallableDemo {
public static void main(String[] args) {
// FutureTask & Callable
CallableTest callableTest = new CallableTest();
FutureTask<Integer> futureTask = new FutureTask<>(callableTest);
// futureTask 可以采用两种方式运行线程
// 1. 直接new Thread
// new Thread(futureTask).start();
// 2. 将FutureTask 对象放入 executorService 中
ExecutorService service = Executors.newCachedThreadPool();
service.submit(futureTask);
service.shutdown();
try {
System.out.println("子线程的返回值: " + futureTask.get(3, TimeUnit.SECONDS));
} catch (InterruptedException | TimeoutException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

class CallableTest implements Callable {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i < 101; i++) {
sum += i;
}
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " is running: " + sum);
return sum;
}
}
  • futureTask & Callable

因为 futureTask 实现了 run() 方法,所以可以直接 new Thread 启动。

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class CallableDemo {
public static void main(String[] args) {
// FutureTask & Callable
CallableTest callableTest = new CallableTest();
FutureTask<Integer> futureTask = new FutureTask<>(callableTest);
// futureTask 可以采用两种方式运行线程
// 1. 直接new Thread
new Thread(futureTask).start();
// 2. 将FutureTask 对象放入 executorService 中
// ExecutorService service = Executors.newCachedThreadPool();
// service.submit(futureTask);
// service.shutdown();
try {
System.out.println("子线程的返回值: " + futureTask.get(3, TimeUnit.SECONDS));
} catch (InterruptedException | TimeoutException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}

class CallableTest implements Callable {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 1; i < 101; i++) {
sum += i;
}
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " is running: " + sum);
return sum;
}
}

三种方式的比较

1.实现Runnable/Callable接口相比继承Thread类的优势

(1)适合多个线程进行资源共享

(2)可以避免java中单继承的限制

(3)增加程序的健壮性,代码和数据独立

(4)线程池只能放入 Runable 或 Callable 接口实现类,不能直接放入继承 Thread 的类

2.Callable和Runnable的区别

​ (1) Callable 重写的是 call() 方法, Runnable 重写的方法是 run() 方法

​ (2) call() 方法执行后可以有返回值,run() 方法没有返回值

​ (3) call() 方法可以抛出异常,run() 方法不可以

​ (4) 运行 Callable 任务可以拿到一个 Future 对象,表示异步计算的结果 。通过 Future 对象可以了解任务执行情况,可取消任务的执行,还可获取执行结果

https://zhuanlan.zhihu.com/p/88933756

https://juejin.im/post/5ae6cf7a518825670960fcc2

Synchronized

超详细例子讲述 Synchronized

形象讲述对象锁和类锁区别

Synchronized

  • 主要是类锁和对象锁的区别
  • synchronized特点:保证内存可见性、操作原子性

  • synchronized影响性能的原因

    • 1、加锁解锁操作需要额外操作;
    • 2、互斥同步对性能最大的影响是阻塞的实现,因为阻塞涉及到的挂起线程和恢复线程的操作都需要转入内核态中完成(用户态与内核态的切换的性能代价是比较大的)
  • synchronized锁:对象头中的Mark Word根据锁标志位的不同而被复用

    • 偏向锁:在只有一个线程执行同步块时提高性能。Mark Word存储锁偏向的线程ID,以后该线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需简单比较ThreadID。特点:只有等到线程竞争出现才释放偏向锁,持有偏向锁的线程不会主动释放偏向锁。之后的线程竞争偏向锁,会先检查持有偏向锁的线程是否存活,如果不存货,则对象变为无锁状态,重新偏向;如果仍存活,则偏向锁升级为轻量级锁,此时轻量级锁由原持有偏向锁的线程持有,继续执行其同步代码,而正在竞争的线程会进入自旋等待获得该轻量级锁
    • 轻量级锁:在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,尝试拷贝锁对象目前的Mark Word到栈帧的Lock Record,若拷贝成功:虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock record里的owner指针指向对象的Mark Word。若拷贝失败:若当前只有一个等待线程,则可通过自旋稍微等待一下,可能持有轻量级锁的线程很快就会释放锁。 但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁膨胀为重量级锁
    • 重量级锁:指向互斥量(mutex),底层通过操作系统的mutex lock实现。等待锁的线程会被阻塞,由于Linux下Java线程与操作系统内核态线程一一映射,所以涉及到用户态和内核态的切换、操作系统内核态中的线程的阻塞和恢复。

Volatile

volatile可见性原理底层分析(视频)

image-20200213184715267

https://juejin.im/post/5ae9b41b518825670b33e6c4

https://www.infoq.cn/article/java-memory-model-4/

是否能重排序 第二个操作
第一个操作 普通读 / 写 volatile 读 volatile 写
普通读 / 写 NO
volatile 读 NO NO( NO
volatile 写 NO NO(

从上表我们可以看出:

  • 当第二个操作是 volatile 写时,不管第一个操作是什么,都不能重排序。这个规则确保 volatile 写之前的操作不会被编译器重排序到 volatile 写之后。
  • 当第一个操作是 volatile 读时,不管第二个操作是什么,都不能重排序。这个规则确保 volatile 读之后的操作不会被编译器重排序到 volatile 读之前。
  • 当第一个操作是 volatile 写,第二个操作是 volatile 读时,不能重排序。

为了实现 volatile 的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。对于编译器来说,发现一个最优布置来最小化插入屏障的总数几乎不可能,为此,JMM 采取保守策略。下面是基于保守策略的 JMM 内存屏障插入策略:

  • 在每个 volatile 写操作的前面插入一个 StoreStore 屏障。
  • 在每个 volatile 写操作的后面插入一个 StoreLoad 屏障。(可以允许普通)
  • 在每个 volatile 读操作的后面插入一个 LoadLoad 屏障。
  • 在每个 volatile 读操作的后面插入一个 LoadStore 屏障。

上述内存屏障插入策略非常保守,但它可以保证在任意处理器平台,任意的程序中都能得到正确的 volatile 内存语义。

下面是保守策略下,volatile 写插入内存屏障后生成的指令序列示意图:

img

上图中的 StoreStore 屏障可以保证在 volatile 写之前,其前面的所有普通写操作已经对任意处理器可见了。这是因为 StoreStore 屏障将保障上面所有的普通写在 volatile 写之前刷新到主内存。

这里比较有意思的是 volatile 写后面的 StoreLoad 屏障。这个屏障的作用是避免 volatile 写与后面可能有的 volatile 读 / 写操作重排序。因为编译器常常无法准确判断在一个 volatile 写的后面,是否需要插入一个 StoreLoad 屏障(比如,一个 volatile 写之后方法立即 return)。为了保证能正确实现 volatile 的内存语义,JMM 在这里采取了保守策略:在每个 volatile 写的后面或在每个 volatile 读的前面插入一个 StoreLoad 屏障。从整体执行效率的角度考虑,JMM 选择了在每个 volatile 写的后面插入一个 StoreLoad 屏障。因为 volatile 写 - 读内存语义的常见使用模式是:一个写线程写 volatile 变量,多个读线程读同一个 volatile 变量。当读线程的数量大大超过写线程时,选择在 volatile 写之后插入 StoreLoad 屏障将带来可观的执行效率的提升。从这里我们可以看到 JMM 在实现上的一个特点:首先确保正确性,然后再去追求执行效率。

下面是在保守策略下,volatile 读插入内存屏障后生成的指令序列示意图:

img

上图中的 LoadLoad 屏障用来禁止处理器把上面的 volatile 读与下面的普通读重排序。LoadStore 屏障用来禁止处理器把上面的 volatile 读与下面的普通写重排序。

上述 volatile 写和 volatile 读的内存屏障插入策略非常保守。在实际执行时,只要不改变 volatile 写 - 读的内存语义,编译器可以根据具体情况省略不必要的屏障。下面我们通过具体的示例代码来说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class VolatileBarrierExample {
int a;
volatile int v1 = 1;
volatile int v2 = 2;

void readAndWrite() {
int i = v1; // 第一个 volatile 读
int j = v2; // 第二个 volatile 读
a = i + j; // 普通写
v1 = i + 1; // 第一个 volatile 写
v2 = j * 2; // 第二个 volatile 写
}

// 其他方法
}

针对 readAndWrite() 方法,编译器在生成字节码时可以做如下的优化:

img

注意,最后的 StoreLoad 屏障不能省略。因为第二个 volatile 写之后,方法立即 return。此时编译器可能无法准确断定后面是否会有 volatile 读或写,为了安全起见,编译器常常会在这里插入一个 StoreLoad 屏障。

上面的优化是针对任意处理器平台,由于不同的处理器有不同“松紧度”的处理器内存模型,内存屏障的插入还可以根据具体的处理器内存模型继续优化。以 x86 处理器为例,上图中除最后的 StoreLoad 屏障外,其它的屏障都会被省略。

前面保守策略下的 volatile 读和写,在 x86 处理器平台可以优化成:

img

前文提到过,x86 处理器仅会对写 - 读操作做重排序。X86 不会对读 - 读,读 - 写和写 - 写操作做重排序,因此在 x86 处理器中会省略掉这三种操作类型对应的内存屏障。在 x86 中,JMM 仅需在 volatile 写后面插入一个 StoreLoad 屏障即可正确实现 volatile 写 - 读的内存语义。这意味着在 x86 处理器中,volatile 写的开销比 volatile 读的开销会大很多(因为执行 StoreLoad 屏障开销会比较大)。

AQS

AQS源码分析—独占模式&共享模式

AQS分析一AQS分析二

美团团队:从ReentrantLock的实现看AQS的原理及应用

全称是 AbstractQueuedSynchronizer(抽象队列同步器),是通过一个先进先出的队列(存储等待的线程)来实现同步器的一个框架,Lock、ReentrantLock、Semaphore 等等都是基于 AQS 实现的。

整体框架

img

  • 上图中有颜色的为Method,无颜色的为Attribution。
  • 总的来说,AQS框架共分为五层,自上而下由浅入深,从AQS对外暴露的API到底层基础数据。
  • 当有自定义同步器接入时,只需重写第一层所需要的部分方法即可,不需要关注底层具体的实现流程。当自定义同步器进行加锁或者解锁操作时,先经过第一层的API进入AQS内部方法,然后经过第二层进行锁的获取,接着对于获取锁失败的流程,进入第三层和第四层的等待队列处理,而这些处理方式均依赖于第五层的基础数据提供层。

下面我们会从整体到细节,从流程到方法逐一剖析AQS框架,主要分析过程如下:

AQS 原理概述

原理概述

AQS核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。

CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。

img

AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。

AQS 的结构大概可总结为以下 3 部分:

  1. 用 volatile 修饰的整数类型的 state 状态,用于表示同步状态,提供 getState 和 setState 来操作同步状态;
  2. 提供了一个 FIFO 等待队列,实现线程间的竞争和等待,这是 AQS 的核心;
  3. AQS 内部提供了各种基于 CAS 原子操作方法,如 compareAndSetState 方法,并且提供了锁操作的acquire和release方法。

数据结构 & 重要变量和类

先来看下AQS中最基本的数据结构——Node,Node即为上面CLH变体队列中的节点。

img

1
2
3
4
5
6
//AQS等待队列的头结点,AQS的等待队列是基于一个双向链表来实现的,这个头结点并不包含具体的线程是一个空结点(注意不是null)
private transient volatile Node head;
//AQS等待队列的尾部结点
private transient volatile Node tail;
//AQS同步器状态,也可以说是锁的状态,注意volatile修饰证明这个变量状态要对多线程可见
private volatile int state;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static final class Node {
//下面两个属性都是说明这个结点是共享模式还是独占模式
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

//下面这四个属性就是说明结点的状态
static final int CANCELLED = 1;//由于超时或中断,节点已被取消
static final int SIGNAL = -1;//表示下一个节点是通过park阻塞的,需要通过unpark唤醒
static final int CONDITION = -2;//表示线程在等待条件变量(先获取锁,加入到条件等待队列,然后释放锁,等待条件变量满足条件;只有重新获取锁之 后才能返回)
static final int PROPAGATE = -3;//表示后续结点会传播唤醒的操作,共享模式下起作用
//当前节点在队列中的状态,就是上述几个数值
volatile int waitStatus;
//前驱结点(双链表)
volatile Node prev;
//后继结点(双链表)
volatile Node next;
// 结点所包装的线程
volatile Thread thread;
// 对于Condtion表示下一个等待条件变量的节点
// 其它情况下用于是区分共享模式和独占模式
Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

//取得前驱结点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
//null的时候抛出异常
throw new NullPointerException();
else
return p;
}

Node() {
}

Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}

两种锁模式

AQS支持两种锁一种是独占锁(独占模式),一种是共享锁(共享模式)

  • 独占锁:比如像ReentrantLock就是一种独占锁模式,多个线程去同时抢一个锁,只有一个线程能抢到这个锁,其他线程就只能阻塞等待锁被释放后重新竞争锁。
  • 共享锁:比如像读写锁里面的读锁,一个锁可以同时被多个线程拥有(多个线程可以同时拥有读锁),再比如Semaphore 设置一个资源数目(可以理解为一个锁能同时被多少个线程拥有)。
共享锁跟独占锁可以同时存在,比如读写锁,读锁、写锁分别对应共享锁和独占锁

队列节点状态

队列中的 Node,有一个 waitStatus,用来表示该节点对应的线程状态。

  • CANCELLED = 1;取消状态,如果当前线程的 前置节点 状态为 CANCELLED,则表明前置节点已经等待超时或者已经被中断了,这时需要将其从等待队列中删除。
  • SIGNAL = -1;等待触发状态,如果当前线程的 前置节点 状态为 SIGNAL,则表明当前线程需要阻塞。
  • CONDITION = -2;用于 condition,也就是线程间通信,类似于 Synchronized 中的 wait/notify 机制,在 Lock 中是用 condition 来完成的, waitStatus = -2 表示该线程在进队列之前就已经获取到了锁,然后再加入到条件等待队列中,然后释放锁资源,等到条件满足了,就再次获取锁,然后进行线程相应的操作。
  • PROPAGATE = -3;状态需要向后传播,表示 releaseShared 需要被传播给后续节点,仅在共享锁模式下使用。

可以这么理解:head 节点可以表示成当前持有锁的线程的节点,其余线程竞争锁失败后,会加入到队尾,tail 始终指向队列的最后一个节点。

独占模式

独占锁的原理是如果有线程获取到锁,那么其它线程只能是获取锁失败,然后进入等待队列中等待被唤醒。

1
2
3
4
5
6
7
8
// 获取锁方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 释放锁方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

可以看到,在 AQS 中并没有提供获取锁和释放锁的方法,需要实现类自己去实现这两个方法,在这里只是抛出了异常而已。

获取锁

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

源码解读:

  1. 通过 tryAcquire(arg) 方法尝试获取锁,这个方法需要实现类自己实现获取锁的逻辑,获取锁成功后则不执行后面加入等待队列的逻辑了;
  2. 如果尝试获取锁失败后,则执行 addWaiter(Node.EXCLUSIVE) 方法将当前线程封装成一个 Node 节点对象,并加入队列尾部;
  3. 把当前线程执行封装成 Node 节点后,继续执行 acquireQueued 的逻辑,该逻辑主要是判断当前节点的前置节点是否是头节点,来尝试获取锁,如果获取锁成功,则当前节点就会成为新的头节点,这也是获取锁的核心逻辑。

继续看 addWaiter(Node.EXCLUSIVE):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node addWaiter(Node mode) {
// 创建一个基于当前线程的节点,该节点是 Node.EXCLUSIVE 独占式类型
// 注意这里的构造函数,第二个参数就是 NextWaiter,用来区分是共享模式还是独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 这里先判断队尾是否为空,如果不为空则直接将节点加入队尾
if (pred != null) {
node.prev = pred;
// 这里必须采用 CAS,因为可能有多个线程加入同时加入到队尾
// 采取 CAS 操作,将当前节点设置为队尾节点,由于采用了 CAS 原子操作,无论并发怎么修改,都有且只有一条线程可以修改成功,其余都将执行后面的enq方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

简单来说 addWaiter(Node mode) 方法做了以下事情:

  1. 创建基于当前线程的独占式类型的节点;
  2. 利用 CAS 原子操作,将节点加入队尾。

我们继续看 enq(Node node) 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node enq(final Node node) {
// 自旋操作
for (;;) {
Node t = tail;
// 如果队尾节点为空,那么进行CAS操作初始化队列
if (t == null) {
// 这里很关键,即如果队列为空,那么此时必须初始化队列,初始化一个空的节点表示队列头,用于表示当前正在执行的节点,头节点即表示当前正在运行的节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 这一步也是采取CAS操作,将当前节点加入队尾,如果失败的话,自旋继续修改直到成功为止
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

enq(Node node)主要做了三件事:

  1. 采用了自旋机制,没成功就一直循环进行
  2. 若队尾节点为空,就先初始化队列,然后因为自旋,再在 head 节点后将当前节点加入队尾
  3. 若对尾结点不为空,则采取 CAS ,将当前节点加入队尾
  4. 注意返回值是当前节点的 pred 节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 线程中断标记字段
boolean interrupted = false;
for (;;) {
// 获取当前节点的 pred 节点
final Node p = node.predecessor();
// 如果 pred 节点为 head 节点,那么再次尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取锁之后,那么当前节点也就成为了 head 节点
setHead(node);
p.next = null; // help GC
failed = false;
// 不需要挂起,返回 false
return interrupted;
}
// 获取锁失败,则进入挂起逻辑
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

acquireQueued 主要做了两件事:

  1. 判断当前节点的 pred 节点是否是 head 节点,如果是,说明下一个执行的线程就是该线程,所以就可以去尝试获取锁(自旋的过程),如果获取了锁,就将当前节点置为 head;
  2. 获取锁如果失败了,就得进入挂起逻辑,即进入 shouldParkAfterFailedAcquire(p, node)。

接下来我们继续看挂起逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果 pred 节点为 SIGNAL 状态,返回true,说明当前节点需要挂起
return true;
// 如果ws > 0,说明节点状态为CANCELLED,需要从队列中删除
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果是其它状态,则操作CAS统一改成SIGNAL状态
// 由于这里waitStatus的值只能是0或者PROPAGATE,所以我们将节点设置为SIGNAL,重新循环一次判断
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

shouldParkAfterFailedAcquire(Node pred, Node node) 主要做了三件事:

  1. 判断当前节点的 pred 的 waitStatus 是否为 SIGNAL,如果是,则说明当前节点可以挂起;
  2. 如果不是 SIGNAL 状态,若是 CANCELLED 状态,则需要将该节点从队列中删除;
  3. 否则,需要将该节点的前置节点置为 SIGNAL,再从 acquireQueued 方法自旋操作循环一次判断。

通俗来说就是:根据 pred 节点状态来判断当前节点是否可以挂起,如果该方法返回 false,那么挂起条件还没准备好,就会重新进入 acquireQueued(final Node node, int arg) 的自旋体,重新进行判断。如果返回 true,那就说明当前线程可以进行挂起操作了,那么就会继续执行挂起。

继续看挂起逻辑:

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

LockSupport 是用来创建锁和其他同步类的基本线程阻塞原语。LockSupport 提供 park() 和 unpark() 方法实现阻塞线程和解除线程阻塞。release 释放锁方法逻辑会调用 LockSupport.unPark 方法来唤醒后继节点。

最后看一下 cancelAcquire():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void cancelAcquire(Node node) {
// 将无效节点过滤
if (node == null)
return;
// 设置该节点不关联任何线程,也就是虚节点
node.thread = null;
Node pred = node.prev;
// 通过前驱节点,跳过取消状态的node
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 把当前node的状态设置为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
// 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果当前节点不是head的后继节点
//1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}

具体可参考: https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

整体逻辑图如下

img

释放锁

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

思路还是很好理解的,先尝试释放锁,这个 tryRelease() 需要自己实现,如果头结点不为空,且 waitStatus != 0 (因为 addWaiter 方法默认的节点状态为 0,为 0 说明此时节点还没有进入就绪状态),就可以执行唤醒下个节点的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 将头节点的状态设置为0
// 这里会尝试清除头节点的状态,改为初始状态
compareAndSetWaitStatus(node, ws, 0);

// 后继节点
Node s = node.next;
// 如果后继节点为null,或者已经被取消了
if (s == null || s.waitStatus > 0) {
s = null;
// for循环从队列尾部一直往前找可以唤醒的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒后继节点
LockSupport.unpark(s.thread);
}

unparkSuccessor(Node node) 主要做了二件事:

  1. 将头结点的 waitStatus 置为初始状态 0 ;
  2. 找后继节点,如果后继节点是 null 或者 waitStatus = Node.Cancelled,就直接从队尾开始向前遍历,找到最靠近头结点的下一个符合条件的节点,并将其唤醒。

共享模式

获取锁

1
2
3
4
5
6
public final void acquireShared(int arg) {
// 尝试获取共享锁,小于0表示获取失败
if (tryAcquireShared(arg) < 0)
// 执行获取锁失败的逻辑
doAcquireShared(arg);
}

跟独占模式差不多, tryAcquireShared() 需要实现类自己去实现,这里只要 >=0 说明共享锁的资源还有,就说明获取锁成功,否则就执行获取锁失败的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void doAcquireShared(int arg) {
// 添加共享锁类型节点到队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再次尝试获取共享锁
int r = tryAcquireShared(arg);
// 如果在这里成功获取共享锁,会进入共享锁唤醒逻辑
if (r >= 0) {
// 共享锁唤醒逻辑
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 与独占锁相同的挂起逻辑
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

跟独占锁基本一样的逻辑,也是先将该线程包装成 Node,加入队尾,然后不断自旋,判断当前节点的前置节点是否为 head,如果是 head,就尝试获取锁,当然这里有一个不同的地方,那就是原来的 setHead() 变成了 setHeadAndPropagate(),因为这里不仅要将获取到锁的节点置为头结点,同时只要共享锁还拥有资源,就需要去唤醒后续共享锁节点。下面我们具体看看 setHeadAndPropagate(node, r)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void setHeadAndPropagate(Node node, int propagate) {
// 头节点
Node h = head;
// 设置当前节点为新的头节点
// 这里不需要加锁操作,因为获取共享锁后,会从FIFO队列中依次唤醒队列,并不会产生并发安全问题
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 后继节点
Node s = node.next;
// 如果后继节点为空或者后继节点为共享类型,则进行唤醒后继节点
// 这里后继节点为空意思是只剩下当前头节点了
if (s == null || s.isShared())
doReleaseShared();
}
}

主要做了两件事:

  1. 将当前节点置为 head,这个跟独占锁是一样的;
  2. 寻找后续的共享锁线程,然后将其唤醒。

我们注意到,这里唤醒调用的是 doReleaseShared(),也就是说释放共享锁也是调用的这个方法,那么是怎么做到唤醒多个拥有共享锁线程呢?我们注意到首先是将当前获取到锁的线程置为 head,然后只要还有资源,就唤醒该线程,

释放锁

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
// 由用户自行实现释放锁条件
if (tryReleaseShared(arg)) {
// 执行释放锁
doReleaseShared();
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void doReleaseShared() {
for (;;) {
// 从头节点开始执行唤醒操作
// 这里需要注意,如果从setHeadAndPropagate方法调用该方法,那么这里的head是新的头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
// 初始化节点状态
//这里需要CAS原子操作,因为setHeadAndPropagate和releaseShared这两个方法都会顶用doReleaseShared,避免两次unpark唤醒操作
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 如果初始化节点状态失败,继续循环执行
continue; // loop to recheck cases
// 执行唤醒操作
// 注意这里是唤醒头结点的下一个节点
// 唤醒了下一个节点之后,下一个节点就会去尝试获取锁,形成一个循环
unparkSuccessor(h);
}
// 这里个人认为是为 setHeadAndPropagate()中 s == null准备的,当只有头结点的时候
// 后继节点没有所以不需要唤醒,但是我们必须把这种可传递的状态显示表现出来
// 确保后续可以传递给后继节点
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果在唤醒的过程中头节点没有更改,退出循环
// 这里防止其它线程又设置了头节点,说明其它线程获取了共享锁,会继续循环操作
if (h == head) // loop if head changed
break;
}
}

注:上面的setHeadAndPropagate()方法表示等待队列中的线程成功获取到共享锁,这时候它需要调用doReleaseShared() 唤醒它后面的共享节点(如果有),但是当通过 releaseShared() 方法去调用doReleaseShared() 释放一个共享锁的时候,接下来等待独占锁跟共享锁的线程都可以被唤醒进行尝试获取。

总结

逻辑图

img

获取锁的过程:

  1. 当线程调用acquireShared()申请获取锁资源时,如果成功,则进入临界区。
  2. 当获取锁失败时,则创建一个共享类型的节点并进入一个FIFO等待队列,然后被挂起等待唤醒。
  3. 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。

释放锁过程:

  1. 当线程调用releaseShared()进行锁资源释放时,如果释放成功,则唤醒队列中等待的节点,如果有的话。

比较

跟独占锁相比,共享锁的主要特征在于当一个在等待队列中的共享节点成功获取到锁以后(它获取到的是共享锁),既然是共享,那它必须要依次唤醒后面所有可以跟它一起共享当前锁资源的节点,毫无疑问,这些节点必须也是在等待共享锁(这是大前提,如果等待的是独占锁,那前面已经有一个共享节点获取锁了,它肯定是获取不到的)。当共享锁被释放的时候,可以用读写锁为例进行思考,当一个读锁被释放,此时不论是读锁还是写锁都是可以竞争资源的。

https://juejin.im/post/5cd2b58f6fb9a032332b45aa

https://www.cnblogs.com/lfls/p/7599863.html

面试题目

Q:某个线程获取锁失败的后续流程是什么呢?

A:存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。

Q:既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

A:是CLH变体的FIFO双端队列。

Q:处于排队等候机制中的线程,什么时候可以有机会获取锁呢?

A:当前节点的前置节点为 head,可以尝试获取锁。

Q:如果处于排队等候机制中的线程一直无法获取锁,需要一直等待么?还是有别的策略来解决这一问题?

A:线程所在节点的状态会变成取消状态,取消状态的节点会从队列中释放。

Q:Lock函数通过Acquire方法进行加锁,但是具体是如何加锁的呢?

A:AQS的Acquire会调用tryAcquire方法,tryAcquire由各个自定义同步器实现,通过tryAcquire完成加锁过程。

Tip: 自我实现一个锁,利用 AQS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package 多线程.Concurrent包理解;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class AQS {
public static void main(String[] args) {
return;
}
}
class MutexDemo {
private static Mutex mutex = new Mutex();

public static void main(String[] args) {
for (int i = 0; i < 10 ; i++){
Thread thread = new Thread(() -> {
mutex.lock();
try {
System.out.println("Current Thread:" + Thread.currentThread().getName());
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
mutex.unlock();
}
});
thread.start();
}
}
}

class Mutex implements Lock, java.io.Serializable {

// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Reports whether in locked state
protected boolean isHeldExclusively() {
return getState() == 1;
}

public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// Provides a Condition
Condition newCondition() {
return new ConditionObject();
}

// Deserializes properly 反序列化
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();

public void lock() {
sync.acquire(1);
}

public boolean tryLock() {
return sync.tryAcquire(1);
}

public void unlock() {
sync.release(1);
}

public Condition newCondition() {
return sync.newCondition();
}

public boolean isLocked() {
return sync.isHeldExclusively();
}

public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}

ReentrantLock

可重入性、公平与非公平

ReentranLock的特点

重点还是:

  1. 可重入性的实现
  2. 公平锁和非公平锁的区别

其他的基本上就是 AQS 的独占锁实现而已

可重入性的实现

在可重入性方面,个人觉得还是针对于同一把锁而言的,不同锁去谈重入性没有意义,但是有个特例,就是 ReentrantReadWriteLock,因为其 ReadLock 和 WriteLock 是属于同一把大锁下的两把小锁,它们之间的重入性有些特殊,也就是同一把写锁肯定是可以重入的,同一把读锁肯定也是可以重入的,但是注意的是读锁下不可重入写锁,因为读-写互斥,但是有个锁降级,也就是 写锁可以重入读锁,将写锁降级为读锁(先获取写锁,然后获取读锁,然后释放写锁,然后再释放读锁)。

言归正传,不同锁的 ReentrantLock,之间不会有影响, 类似于下面的 demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package 多线程.Concurrent包理解;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockDemo {
public static final ReentrantLock lock1 = new ReentrantLock();
public static final ReentrantLock lock2 = new ReentrantLock();


public static void main(String[] args) throws InterruptedException {
for(int i = 0;i < 10;i++){
Thread thread = new Thread(() -> {
lock1.lock();
lock2.lock();
try{

System.out.println("current:" + Thread.currentThread().getName() );
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock2.unlock();
lock1.unlock();
}
});
thread.start();
}
}
}

我们以非公平锁为例,看看可重入的代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//1. 如果该锁未被任何线程占有,该锁能被当前线程获取
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//2.若被占有,检查占有线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
// 3. 再次获取,计数加一
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryRelease(int releases) {
//1. 同步状态减1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//2. 只有当同步状态为0时,锁成功被释放,返回true
free = true;
setExclusiveOwnerThread(null);
}
// 3. 锁未被完全释放,返回false
setState(c);
return free;
}

公平锁与非公平锁

ReentrantLock支持两种锁:公平锁非公平锁何谓公平性,是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求上的绝对时间顺序,满足FIFO。ReentrantLock的构造方法无参时是构造非公平锁,源码为:

1
2
3
public ReentrantLock() {
sync = new NonfairSync();
}

另外还提供了另外一种方式,可传入一个boolean值,true时为公平锁,false时为非公平锁,源码为:

1
2
3
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

在上面非公平锁获取时(nonfairTryAcquire方法)只是简单的获取了一下当前状态做了一些逻辑处理,并没有考虑到当前同步队列中线程等待的情况。我们来看看公平锁的处理逻辑是怎样的,核心方法为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

其实就是多了一个方法 hasQueuedPredecessors(),方法名就可知道该方法用来判断当前节点在同步队列中是否有前驱节点的判断,如果有前驱节点说明有线程比当前线程更早的请求资源,根据公平性,当前线程请求资源失败。如果当前节点没有前驱节点的话,再才有做后面的逻辑判断的必要性。公平锁每次都是从同步队列中的第一个节点获取到锁,而非公平性锁则不一定,有可能刚释放锁的线程能再次获取到锁

公平锁 VS 非公平锁

  1. 公平锁每次获取到锁为同步队列中的第一个节点,保证请求资源时间上的绝对顺序,而非公平锁有可能刚释放锁的线程下次继续获取该锁,则有可能导致其他线程永远无法获取到锁,造成“饥饿”现象
  2. 公平锁为了保证时间上的绝对顺序,需要频繁的上下文切换,而非公平锁会降低一定的上下文切换,降低性能开销。因此,ReentrantLock默认选择的是非公平锁,则是为了减少一部分上下文切换,保证了系统更大的吞吐量

与 Synchronized 的区别

可重入性

从名字上理解,ReenTrantLock的字面意思就是再进入的锁,其实synchronized关键字所使用的锁也是可重入的,两者关于这个的区别不大。两者都是同一个线程没进入一次,锁的计数器都自增1,所以要等到锁的计数器下降为0时才能释放锁。

锁的实现

Synchronized是依赖于JVM实现的,而ReenTrantLock是JDK实现的,有什么区别,说白了就类似于操作系统来控制实现和用户自己敲代码实现的区别。前者的实现是比较难见到的,后者有直接的源码可供阅读。

性能的区别

在Synchronized优化以前,synchronized的性能是比ReenTrantLock差很多的,但是自从Synchronized引入了偏向锁,轻量级锁(自旋锁)后,两者的性能就差不多了,在两种方法都可用的情况下,官方甚至建议使用synchronized,其实synchronized的优化我感觉就借鉴了ReenTrantLock中的CAS技术。都是试图在用户态就把加锁问题解决,避免进入内核态的线程阻塞。

功能区别

便利性:很明显Synchronized的使用比较方便简洁,并且由编译器去保证锁的加锁和释放,而ReenTrantLock需要手工声明来加锁和释放锁,为了避免忘记手工释放锁造成死锁,所以最好在finally中声明释放锁。

锁的细粒度和灵活度:很明显ReenTrantLock优于Synchronized

ReenTrantLock独有的能力

  1. ReenTrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。所谓的公平锁就是先等待的线程先获得锁。

  2. ReenTrantLock提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。

  3. ReenTrantLock提供了一种能够中断等待锁的线程的机制,通过lock.lockInterruptibly()来实现这个机制。

ReenTrantLock实现的原理

简单来说,ReenTrantLock的实现是一种自旋锁,通过循环调用CAS操作来实现加锁。它的性能比较好也是因为避免了使线程进入内核态的阻塞状态。想尽办法避免线程进入内核的阻塞状态是我们去分析和理解锁设计的关键钥匙。

什么情况下使用ReenTrantLock

答案是,如果你需要实现ReenTrantLock的三个独有功能时。

————————————————
版权声明:本文为CSDN博主「qq838642798」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq838642798/article/details/65441415

ReentrantReadWriteLock

写锁—-真的是写的超棒!

读锁—-不说了太赞了!!关注这个作者

锁降级问题

这个才是完整继承了 AQS 基本所有方法的实现类,因为它不像 ReentrantLock 只有独占锁,它可以同时拥有独占锁和共享锁,并且这两个锁还有一些额外的限制,也就是 写-读互斥(当然锁降级除外),写-写互斥,读-读共享。所以这个很有必要详细说一下。

主要还是三个特点:

  1. 公平性选择:支持非公平性(默认)和公平的锁获取方式,吞吐量还是非公平优于公平;
  2. 重入性:支持重入,读锁获取后能再次获取,写锁获取之后能够再次获取写锁,同时也能够获取读锁;
  3. 锁降级:遵循获取写锁,获取读锁再释放写锁的次序,写锁能够降级成为读锁

要弄清楚上述三个特点,就得解决以下几个问题:

  1. 写锁和读锁是如何记录读写状态的;
  2. 写锁是如何获取和释放的;
  3. 读锁是如何获取和释放的;
  4. 锁降级的规则是什么。

读写锁如何记录状态

之前都是用 state 这个变量来记录锁的获取情况,这里也是一样,但是这里比较特殊,使用了(Integer 32位)低 16 位来存储写锁的获取情况,高 16 位用来存储读锁的获取情况。

写锁的获取和释放

写锁由于是独占锁,并且不能锁升级(也就是不允许读锁内嵌套写锁),所以也就比较简单了。

tryAcquire()

写锁独占锁,所以需要实现的就是 AQS 中的 tryAcquire()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 如果有读锁,此时是获取不到写锁的。当有写锁时,判断重入次数。
// 当写锁空闲,读锁空闲,公平模式下,如果队列中有等待的,不会抢锁。非公平模式下,必抢锁。
protected final boolean tryAcquire(int acquires) {
// 写
Thread current = Thread.currentThread();
int c = getState();
// 用 state & 65535 得到低 16 位的值。
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 如果 state 不是0,且低16位是0,说明了什么?说明写锁是空闲的,读锁被霸占了。那么也不能拿锁,返回 fasle。
// 如果低 16 位不是0,说明写锁被霸占了,并且,如果持有锁的不是当前线程,那么这次拿锁是失败的。返回 fasle。
// 总之,当只有读锁时,就不能获取写锁。当有写锁,可能是写锁降级,也可能是正常的拥有写锁,此时就 // 必须是重入锁。
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 到这一步了,只会是写重入锁。如果写重入次数超过最大值 65535,就会溢出。
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 将 state + 1
setState(c + acquires);
return true;
}
// 当 state 是 0 的时候,那么就可以获取锁了。
// writerShouldBlock 判断是否需要锁。非公平情况下,返回 false。公平情况下,根据 hasQueuedPredecessors 结果判断。
// 当队列中有锁等待了,就返回 false 了。
// 当是非公平锁的时候,或者队列中没有等待节点的时候,尝试用 CAS 修改 state。
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 修改成功 state 后,修改锁的持有线程。
setExclusiveOwnerThread(current);

最难理解的就是if (w == 0 || current != getExclusiveOwnerThread()) 这一步,具体的流程分析如下图:

ReentrantReadWriteLock_tryAcquired

tryRelease()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryRelease(int releases) {
// 是否持有当前锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 计算 state 值
int nextc = getState() - releases;
// 计算写锁的状态,如果是0,说明是否成功。
boolean free = exclusiveCount(nextc) == 0;
// 释放成功,设置持有锁的线程为 null。
if (free)
setExclusiveOwnerThread(null);
// 设置 state
setState(nextc);
return free;
}

这里还是很简单的,只是有一个地方需要注意:

1
2
// 计算写锁的状态,如果是0,说明是否成功。
boolean free = exclusiveCount(nextc) == 0;

这里计算的只是 state 变量的低 16 的值,而不是整个 state 的值。虽然写的时候,必然是串行的,但这里计算的仍然是低 16 位的。

读锁的获取和释放

获取读锁的过程是获取共享锁的过程。

tryAcquireShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
protected final int11 tryAcquireShared(int unused) {

Thread current = Thread.currentThread();
int c = getState();
// exclusiveCount(c) != 0 ---》 用 state & 65535 得到低 16 位的值。如果不是0,说明写锁别持有了。
// getExclusiveOwnerThread() != current----> 不是当前线程
// 如果写锁被霸占了,且持有线程不是当前线程,返回 false,加入队列。获取写锁失败。
// 反之,如果持有写锁的是当前线程,就可以继续获取读锁了。
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current)
// 获取锁失败
return -1;
// 如果写锁没有被霸占,则将高16位移到低16位。
int r = sharedCount(c);// c >>> 16
// !readerShouldBlock() 和写锁的逻辑一样(根据公平与否策略和队列是否含有等待节点)
// 不能大于 65535,且 CAS 修改成功
if (!readerShouldBlock() && r < 65535 && compareAndSetState(c, c + 65536)) {
// 如果读锁是空闲的, 获取锁成功。
if (r == 0) {
// 将当前线程设置为第一个读锁线程
firstReader = current;
// 计数器为1
firstReaderHoldCount = 1;

}// 如果读锁不是空闲的,且第一个读线程是当前线程。获取锁成功。
else if (firstReader == current) {//
// 将计数器加一
firstReaderHoldCount++;
} else {// 如果不是第一个线程,获取锁成功。
// cachedHoldCounter 代表的是最后一个获取读锁的线程的计数器。
HoldCounter rh = cachedHoldCounter;
// 如果最后一个线程计数器是 null 或者不是当前线程,那么就新建一个 HoldCounter 对象
if (rh == null || rh.tid != getThreadId(current))
// 给当前线程新建一个 HoldCounter
cachedHoldCounter = rh = readHolds.get();
// 如果不是 null,且 count 是 0,就将上个线程的 HoldCounter 覆盖本地的。
else if (rh.count == 0)
readHolds.set(rh);
// 对 count 加一
rh.count++;
}
return 1;
}
// 死循环获取读锁。包含锁降级策略。
return fullTryAcquireShared(current);
}

总结一下上面代码的逻辑吧!

  1. 判断写锁是否空闲。
  2. 如果不是空闲,且当前线程不是持有写锁的线程,则返回 -1 ,表示抢锁失败。如果是空闲的,进入第三步。如果是当前线程,进入第三步。
  3. 判断持有读锁的数量是否超过 65535,然后使用 CAS 设置 int 高 16 位的值,也就是加一。
  4. 如果设置成功,且是第一次获取读锁,就设置 firstReader 相关的属性(为了性能提升)。
  5. 如果不是第一次,当当前线程就是第一次获取读锁的线程,对 “第一次获取读锁线程计数器” 加 1.
  6. 如果都不是,则获取最后一个读锁的线程计数器,判断这个计数器是不是当前线程的。如果是,加一,如果不是,自己创建一个新计数器,并更新 “最后读取的线程计数器”(也是为了性能考虑)。最后加一。返回成功。
  7. 如果上面的判断失败了(CAS 设置失败,或者队列有等待的线程(公平情况下))。就调用 fullTryAcquireShared 方法死循环执行上面的步骤。

步骤还是有点多哈,画个图吧,更清晰一点。

img

可以看到读锁对 AQS 的共享锁做了两个优化:

  1. 引进了 firstReaderHoldCount 和 cachedHoldCounter ,可以加快计数的性能;
  2. 引入锁降级机制,对读多写少且读更重要的场景很适用。

继续看 fullTryAcquireShared():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
final int fullTryAcquireShared(Thread current) {
/*
* 这段代码与tryAcquireShared中的代码有部分重复,但整体更简单。
*/
HoldCounter rh = null;
// 死循环
for (;;) {
int c = getState();
// 如果存在写锁
if (exclusiveCount(c) != 0) {
// 并且不是当前线程,获取锁失败,反之,如果持有写锁的是当前线程,那么就会进入下面的逻辑。
// 反之,如果存在写锁,但持有写锁的是当前线程。那么就继续尝试获取读锁。
if (getExclusiveOwnerThread() != current)
return -1;
// 如果写锁空闲,且可以获取读锁。
} else if (readerShouldBlock()) {
// 第一个读线程是当前线程
if (firstReader == current) {
// 如果不是当前线程
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
// 从 ThreadLocal 中取出计数器
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 如果读锁次数达到 65535 ,抛出异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 尝试对 state 加 65536, 也就是设置读锁,实际就是对高16位加一。
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 如果读锁是空闲的
if (sharedCount(c) == 0) {
// 设置第一个读锁
firstReader = current;
// 计数器为 1
firstReaderHoldCount = 1;
// 如果不是空闲的,查看第一个线程是否是当前线程。
} else if (firstReader == current) {
firstReaderHoldCount++;// 更新计数器
} else {// 如果不是当前线程
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))// 如果最后一个读计数器所属线程不是当前线程。
// 自己创建一个。
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 对计数器 ++
rh.count++;
// 更新缓存计数器。
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

基本上和 tryAcquireShared() 差不多,只不过这里有一个自旋的过程。

  • firstReader 是获取读锁的第一个线程。如果只有一个线程获取读锁,很明显,使用这样一个变量速度更快。

  • firstReaderHoldCountfirstReader的计数器。同上。

  • cachedHoldCounter是最后一个获取到读锁的线程计数器,每当有新的线程获取到读锁,这个变量都会更新。这个变量的目的是:当最后一个获取读锁的线程重复获取读锁,或者释放读锁,就会直接使用这个变量,速度更快,相当于缓存。

tryReleaseShared()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 如果是第一个线程
if (firstReader == current) {
// 如果是 1,将第一个线程设置成 null。结束。
if (firstReaderHoldCount == 1)
firstReader = null;
// 如果不是 1,减一操作
else
firstReaderHoldCount--;
} else {//如果不是当前线程
HoldCounter rh = cachedHoldCounter;
// 如果缓存是 null 或者缓存所属线程不是当前线程,则当前线程不是最后一个读锁。
if (rh == null || rh.tid != getThreadId(current))
// 获取当前线程的计数器
rh = readHolds.get();
int count = rh.count;
// 如果计数器小于等于一,就直接删除计数器
if (count <= 1) {
readHolds.remove();
// 如果计数器的值小于等于0,说明有问题了,抛出异常
if (count <= 0)
throw unmatchedUnlockException();
}
// 对计数器减一
--rh.count;
}
for (;;) {// 死循环使用 CAS 修改状态
int c = getState();
// c - 65536, 其实就是减去一个读锁。对高16位减一。
int nextc = c - SHARED_UNIT;
// 修改 state 状态。
if (compareAndSetState(c, nextc))
// 修改成功后,如果是 0,表示读锁和写锁都空闲,则可以唤醒后面的等待线程
return nextc == 0;
}
}

释放还是很简单的,步骤如下:

  1. 如果当前线程是第一个持有读锁的线程,则只需要操作 firstReaderHoldCount 减一。如果不是,进入第二步。
  2. 获取到缓存计数器(最后一个线程的计数器),如果匹配到当前线程,就减一。如果不匹配,进入第三步。
  3. 获取当前线程自己的计数器(由于每个线程都会多次获取到锁,所以,每个线程必须保存自己的计数器。)。
  4. 做减一操作。
  5. 死循环修改 state 变量。

锁降级

重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。

锁降级的必要性主要有两个观点: 「个人还是倾向于第一种观点」

  1. 是为了保证数据的可见性,类似于volatile,只有写锁降级才能保证我读到的数据是我这次写数据后的数据,否则就可能发生其他线程先抢写锁,导致读到的数据并不是我这个线程写后的数据,当前线程无法感知数据更新。

img

  1. 特殊的写锁重入机制,JDK 使用 先获取写入锁,然后获取读取锁,最后释放写入锁 这个步骤,是为了提高获取锁的效率。尤其是读锁的效率,不需要等到写锁释放,也无需跟写锁竞争共享资源。

img

Condition

源码部分讲的很好,分析的很透彻

简介部分写的还行,但是源码分析的非常糟糕

简介

任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

Q:为什么 wait/notify 等方法定义在 Object 方法内?

A:Java中所有的类和对象逻辑上都对应有一个锁和监视器,也就是说在Java中一切对象都可以用来线程的同步、所以这些管程(监视器)的“过程”方法定义在Object中一点也不奇怪。

一句话归纳,就是 Lock 搭配 Condition(await/signal),Synchronized(监视器) 搭配 wait/notify。

从整体上来看Object的wait和notify/notify是与对象监视器配合完成线程间的等待/通知机制,而Condition与Lock配合完成等待通知机制,前者是java底层级别的,后者是语言级别的,具有更高的可控制性和扩展性。两者除了在使用方式上不同外,在功能特性上还是有很多的不同:

  1. 前置条件不同,Objec方式的对象监视器需要获得锁,而 Condition 的前提是要拿到 Lock;

  2. Condition能够支持不响应中断,而通过使用Object方式不支持;

  3. Condition能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个;
  4. Condition能够支持超时时间的设置,而Object不支持。

参照Object的wait和notify/notifyAll方法,Condition也提供了同样的方法:

针对Object的wait方法

  1. void await() throws InterruptedException:当前线程进入等待状态,如果其他线程调用condition的signal或者signalAll方法并且当前线程获取Lock从await方法返回,如果在等待状态中被中断会抛出被中断异常;
  2. long awaitNanos(long nanosTimeout):当前线程进入等待状态直到被通知,中断或者超时
  3. boolean await(long time, TimeUnit unit)throws InterruptedException:同第二种,支持自定义时间单位
  4. boolean awaitUntil(Date deadline) throws InterruptedException:当前线程进入等待状态直到被通知,中断或者到了某个时间

针对Object的notify/notifyAll方法

  1. void signal():唤醒一个等待在condition上的线程,将该线程从等待队列中转移到同步队列中,如果在同步队列中能够竞争到Lock则可以从等待方法中返回。
  2. void signalAll():与1的区别在于能够唤醒所有等待在condition上的线程

源码分析

创建一个 Condition 对象,是通过 lock.newCondition(),new出来一个 ConditionObject 对象,而这个恰好又是 AQS 的内部类,前面我们说过,condition是要和lock配合使用的,也就是condition和lock是绑定在一起的,而lock的实现原理又依赖于AQS,自然而然 ConditionObject 作为AQS的一个内部类无可厚非。

我们在 AQS 讲过,nextWaiter 既可以用来标志 Condition 队列中的下一个,又可以标志是独占锁/共享锁,很明显从这里就可以看出等待队列是一个单向队列,不同于同步队列是一个双向队列,并且等待队列并没有头结点,不像同步队列是有一个头结点的。还有一点需要再强调一次,对象Object对象监视器上只能拥有一个同步队列和一个等待队列,而并发包中的Lock拥有一个同步队列和多个等待队列

img

注: 图中的条件队列即等待队列。条件队列(等待队列)的属性如下,用头尾指针控制整个队列。

1
2
3
4
5
6
7
8
9
10
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter; // 条件队列的头节点
/** Last node of condition queue. */
private transient Node lastWaiter; // 条件队列的尾节点

/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }

Condition的实现主要包括:条件队列、等待和通知。其中条件队列放的是AQS里的Node数据结构,使用nextWaiter来维护条件队列。等待和通知共有7个方法。

1
2
3
4
5
6
7
8

signal() //唤醒该条件队列的头节点。
signalAll() //唤醒该条件队列的所有节点。
awaitUninterruptibly() //等待,此方法无法被中断,必须通过唤醒才能解除阻塞。
await() //当前线程进入等待。
awaitNanos(long) //当前线程进入等待,有超时时间,入参的单位为纳秒。
awaitUntil(Date) //当先线程进入等待,直到当前时间超过入参的时间。
await(long, TimeUnit) //当前线程进入等待,有超时时间,入参可以自己设置时间单位。

这些方法其实大同小异,因此本文只对常用的signal()、signalAll()和await()方法展开详解。搞懂了这3个方法,搞懂其他几个方法也基本没什么阻碍。

await()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void await() throws InterruptedException { // 阻塞当前线程,直接被唤醒或被中断
if (Thread.interrupted()) // 如果当前线程被中断过,则抛出中断异常
throw new InterruptedException();
Node node = addConditionWaiter(); // 添加一个waitStatus为CONDITION的节点到条件队列尾部
int savedState = fullyRelease(node); // 释放操作。我们知道只有在拥有锁(acquire成功)的时候才能调用await()方法,因此,调用await()方法的线程的节点必然是同步队列的头节点。所以,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的条件队列中。
int interruptMode = 0; // 0为正常,被中断值为THROW_IE或REINTERRUPT
while (!isOnSyncQueue(node)) { // isOnSyncQueue:判断node是否在同步队列(注意和条件队列区分。调用signal方法会将节点从条件队列移动到同步队列,因此这边就可以跳出while循环)
LockSupport.park(this); // node如果不在同步队列则进行park(阻塞当前线程)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查线程被唤醒是否是因为被中断,如果是则跳出循环,否则会进行下一次循环,因为被唤醒前提是进入同步队列,所以下一次循环也必然会跳出循环
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // acquireQueued返回true代表被中断过,如果中断模式不是THROW_IE,则必然为REINTERRUPT(见上面的checkInterruptWhileWaiting方法)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters(); // 移除waitStatus为CANCELLED的节点
if (interruptMode != 0) // 如果跳出while循环是因为被中断
reportInterruptAfterWait(interruptMode); // 则根据interruptMode,选择抛出InterruptedException 或 重新中断当前线程
}

总结一下整体的流程:

  1. 判断线程是否被中断,是的话就抛出异常;
  2. 然后将给线程包装成节点,且 WaitStatus == Condition,这个由 addConditionWaiter() 完成,下面会讲;
  3. 然后将同步队列中的节点删除,其实就相当于是将同步队列的头结点加入到条件队列中的尾结点,这个是由fullRelease() 完成的;
  4. 然后判断线程是否在同步队列中,这个使用的是 isOnSyncQueue(),第一次肯定是已经不在了,因为调用了 fullRelease() 已经将其从同步队列中删除了,进入循环后,就会阻塞该线程,如何退出呢,那就需要 signal了;
  5. 当调用了 signal 后,程序会跳出循环,此时 signal 函数已经将该节点从条件队列加回至同步队列中了,我们只需要调用 acquireQueued() 尝试获取锁就可以了。

接下来,讲一下刚才用到的几个方法:

addConditionWaiter()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private Node addConditionWaiter() { // 添加一个waitStatus为CONDITION的节点到条件队列尾部
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); // 移除waitStatus不为CONDITION的节点(条件队列里的节点waitStatus都为CONDITION)
t = lastWaiter; // 将t赋值为移除了waitStatus不为CONDITION后的尾节点(上面进行了移除操作,因此尾节点可能会发生变化)
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); // 以当前线程新建一个waitStatus为CONDITION的节点
if (t == null) // t为空,代表条件队列为空
firstWaiter = node; // 将头节点赋值为node
else
t.nextWaiter = node; // 否则,队列不为空。将t(原尾节点)的后继节点赋值为node
lastWaiter = node; // 将node赋值给尾节点,即将node放到条件队列的尾部。这里没有用CAS来保证原子性,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的
return node;
}

这个函数的主体功能就是添加一个waitStatus为CONDITION的节点到条件队列尾部,具体实现步骤是:

  1. 如果条件队列的尾节点不为null并且waitStatus不为CONDITION,则调用unlinkCancelledWaiters方法(详解见下文unlinkCancelledWaiters方法)移除waitStatus不为CONDITION的节点(条件队列里的节点waitStatus都为CONDITION),并将t赋值为移除了waitStatus不为CONDITION后的尾节点(上面进行了移除操作,因此尾节点可能会发生变化);
  2. 新建一个节点,存储当前线程;
  3. 添加到队尾,注意这里不需要使用 CAS。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void unlinkCancelledWaiters() { // 从条件队列移除所有waitStatus不为CONDITION的节点
Node t = firstWaiter; // t赋值为条件队列的尾节点
Node trail = null;
while (t != null) {
Node next = t.nextWaiter; // 向下遍历
if (t.waitStatus != Node.CONDITION) { // 如果t的waitStatus不为CONDITION
t.nextWaiter = null; // 断开t与t后继节点的关联
if (trail == null) // 如果trail为null,则将firstWaiter赋值为next节点,此时还没有遍历到waitStatus为CONDITION的节点,因此直接移动firstWaiter的指针即可移除前面的节点
firstWaiter = next;
else
trail.nextWaiter = next; // 否则将trail的后继节点设为next节点。此时,trail节点到next节点中的所有节点被移除(包括t节点,但可能不止t节点。因为,trail始终指向遍历过的最后一个waitStatus为CONDITION,因此只需要将trail的后继节点设置为next,即可将trail之后到next之前的所有节点移除)
if (next == null)
lastWaiter = trail;
}
else
trail = t; // 如果t的waitStatus为CONDITION,则将trail赋值为t,trail始终指向遍历过的最后一个waitStatus为CONDITION
t = next; // t指向下一个节点
}
}

fullRelease()

主体也就是 AQS 的 release 方法,将其从同步队列中删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final int fullyRelease(Node node) { // 释放锁
boolean failed = true;
try {
int savedState = getState(); // 当前的同步状态
if (release(savedState)) { // 独占模式下release(一般指释放锁)
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED; // 如果release失败则将该节点的waitStatus设置为CANCELLED
}
}

isOnSyncQueue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final boolean isOnSyncQueue(Node node) {    // 判断node是否再同步队列中
if (node.waitStatus == Node.CONDITION || node.prev == null) // 如果waitStatus为CONDITION 或 node没有前驱节点,则必然不在同步队列,直接返回false
return false;
if (node.next != null) // 如果有后继节点,必然是在同步队列中,返回true
return true;
return findNodeFromTail(node); // 返回node是否为同步队列节点,如果是返回true,否则返回false
}


// 从同步队列的尾节点开始向前遍历,如果node为同步队列节点则返回true,否则返回false
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}

signal()

1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively()) // 检查当前线程是否为独占模式同步器的所有者
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 唤醒条件队列的头节点
}

因为条件队列,即等待队列也是 FIFO,头结点必然是等的最久的一个,所以每次唤醒都是唤醒头结点。

具体步骤:

  1. 检查当前线程是否为独占模式同步器的所有者,在ReentrantLock中即检查当前线程是否为拥有锁的线程。如果不是,则抛IllegalMonitorStateException。
  2. 拿到条件队列的头节点,如果不为null,则调用doSignal方法(详解见下文doSignal方法)唤醒头节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doSignal(Node first) { // 将条件队列的头节点移到同步队列
do {
if ( (firstWaiter = first.nextWaiter) == null) // 将first节点赋值为first节点的后继节点(相当于移除first节点),如果first节点的后继节点为空,则将lastWaiter赋值为null
lastWaiter = null;
first.nextWaiter = null; // 断开first节点对first节点后继节点的关联
} while (!transferForSignal(first) && // transferForSignal:将first节点从条件队列移动到同步队列
(first = firstWaiter) != null); // 如果transferForSignal失败,并且first节点不为null,则向下遍历条件队列的节点,直到节点成功移动到同步队列 或者 firstWaiter为null
}


// 将node节点从条件队列移动到同步队列,如果成功则返回true。
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
// 如果不能更改节点的waitStatus,则表示该节点已被取消,返回false
return false;
// 否则,调用enq方法将node添加到同步队列,注意:enq方法返回的节点是node的前驱节点
Node p = enq(node);
int ws = p.waitStatus; // 将ws赋值为node前驱节点的等待状态
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果node前驱节点的状态为CANCELLED(ws>0)
//或 使用CAS将waitStatus修改成SIGNAL失败,则代表node的前驱节点无法来唤醒node节点,因此直接调用LockSupport.unpark方法唤醒node节点
// 注意,unparkSuccessor(node)是唤醒该节点的下一个节点,而LockSupport.lock则是直接唤醒该节点
LockSupport.unpark(node.thread);
return true;
}

signalAll()

1
2
3
4
5
6
7
public final void signalAll() {
if (!isHeldExclusively()) // 检查当前线程是否为独占模式同步器的所有者
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first); // 唤醒条件队列的所有节点
}
1
2
3
4
5
6
7
8
9
private void doSignalAll(Node first) {  // 将条件队列的所有节点移到同步队列
lastWaiter = firstWaiter = null; // 因为要移除条件队列的所有节点到同步队列,因此这边直接将firstWaiter和lastWaiter赋值为null
do {
Node next = first.nextWaiter; // next赋值为first节点的后继节点
first.nextWaiter = null; // 断开first节点对first节点后继节点的关联
transferForSignal(first); // transferForSignal:将first节点从条件队列移动到同步队列
first = next; // first赋值为next节点
} while (first != null); // 循环遍历,将条件队列的所有节点移动到同步队列
}

至此,Condition 就分析完了,我们可以用 Condition 写一个 生产者-消费者的demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package 多线程.Producer_Consumer;


import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class await_signal {
private final static Lock lock = new ReentrantLock();
private final static Condition producer_condition = lock.newCondition();
private final static Condition consumer_condition = lock.newCondition();
private final static LinkedList<Long> linkedList = new LinkedList<>();
private final static int MAX_CAPACITY = 100;

public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 10; i++) {
while (true) {
await_signal.producer();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < 14; i++) {
while (true) {
await_signal.consumer();
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();

}

private static void producer() {
try {
lock.lock();
if (linkedList.size() > MAX_CAPACITY) {
producer_condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
long value = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + ":-------PRODUCER-------- " + value);
linkedList.addLast(value);
consumer_condition.signalAll();
lock.unlock();
}
}

private static void consumer() {
try {
lock.lock();
if (linkedList.size() == 0) {
consumer_condition.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
long value = linkedList.pop();
System.out.println(Thread.currentThread().getName() + "::-------CONSUMER-------- " + value);
producer_condition.signalAll();
lock.unlock();
}
}
}

LockSupport & Unsafe

LockSupport.park() vs Object.wait() vs Thread.sleep()

  1. 从释放资源看,三者在阻塞线程后,LockSupport.park() 会阻塞线程并且不会释放锁资源,注意,本身 park() 是不会释放资源的,我们在同步队列中中就是用 park() 来使得线程挂起(wait状态),等待资源。Object.wait() (也是wait状态)会阻塞当前线程,并且让出时间片和锁资源,等待被 notify 才获得 monitor,拿到锁资源。Thread.sleep() 会让线程sleep(sleep状态),同样阻塞当前线程,但是不会让出锁资源;

  2. 使用的前提条件看。Object.wait() 必须在 Synchronized 里用,LockSupport.park() 可以在任何地方用,Thread.sleep() 当然也是地方都能用;

  3. 从中断异常看。LockSupport.park() 不需要捕获中断异常,注意,它不需要捕获,并且它会从中断中醒来,将线程由 interrupted —> running,所以外部调用时需要谨慎,这里提醒一下,尤其是在 StampedLock 中使用的时候,如果 writeLock 一直在使用锁资源,而readLock在等待锁资源后开始挂起,突然将readLock线程中断的话,因为 StampedLock.readLock() 不处理中断,所以直接将线程由 wait –> running,此时线程会一直自旋获取锁,但是由于锁一直被 writeLock,此时 CPU就会爆炸。Object.wait() 声明抛出了中断异常,调用者需要捕获或者再抛出,Thread.sleep()方法声明上也是抛出了InterruptedException中断异常,所以调用者需要捕获这个异常或者再抛出;

  4. 唤醒方式看。LockSupport.park() 可以通过 LockSupport.unpark() 或者中断唤醒,Object.wait() 则需要调用 另一个线程执行 notify() 进行唤醒,Thread.sleep() 只能自己醒过来;

  5. 补充:如果在wait()之前执行了notify()会怎样?抛出IllegalMonitorStateException异常

    如果在park()之前执行了unpark()会怎样?线程不会被阻塞,直接跳过park(),继续执行后续内容。

简介

LockSupport下的很多方法,例如我们之前常常提及的 LockSupport.park()、LockSupport.unpark() 都是用了 Unsafe 类下的 park() 和 unpark() 方法。

image-20200227133328691

1
2
3
4
5
6
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
1
2
3
4
private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}
1
2
3
4
5
public static Object getBlocker(Thread t) {
if (t == null)
throw new NullPointerException();
return UNSAFE.getObjectVolatile(t, parkBlockerOffset);
}

可以看到,基本上每个方法的底层都是使用了 Unsafe 类的方法,所以接下来主要分析一下 Unsafe。

Unsafe简介

https://tech.meituan.com/2019/02/14/talk-about-java-magic-class-unsafe.html

属于 sun.misc.Unsafe。

采用饿汉式的单例模式,但Unsafe类做了限制,如果是普通的调用的话,它会抛出一个SecurityException异常;只有由主类加载器加载的类才能调用这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static native void registerNatives();
static {
registerNatives();
sun.reflect.Reflection.registerMethodsToFilter(Unsafe.class, "getUnsafe");
}

private Unsafe() {}

private static final Unsafe theUnsafe = new Unsafe();

@CallerSensitive
public static Unsafe getUnsafe() {
Class<?> caller = Reflection.getCallerClass();
if (!VM.isSystemDomainLoader(caller.getClassLoader()))
throw new SecurityException("Unsafe");
return theUnsafe;
}

功能介绍

img

如上图所示,Unsafe提供的API大致可分为内存操作、CAS、Class相关、对象操作、线程调度、系统信息获取、内存屏障、数组操作等几类,下面将对其相关方法和应用场景进行详细介绍。

内存操作

主要是管控堆外的一些内存操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//分配内存, 相当于C++的malloc函数
public native long allocateMemory(long bytes);
//扩充内存
public native long reallocateMemory(long address, long bytes);
//释放内存
public native void freeMemory(long address);
//在给定的内存块中设置值
public native void setMemory(Object o, long offset, long bytes, byte value);
//内存拷贝
public native void copyMemory(Object srcBase, long srcOffset, Object destBase, long destOffset, long bytes);
//获取给定地址值,忽略修饰限定符的访问限制。与此类似操作还有: getInt,getDouble,getLong,getChar等
public native Object getObject(Object o, long offset);
//为给定地址设置值,忽略修饰限定符的访问限制,与此类似操作还有: putInt,putDouble,putLong,putChar等
public native void putObject(Object o, long offset, Object x);
//获取给定地址的byte类型的值(当且仅当该内存地址为allocateMemory分配时,此方法结果为确定的)
public native byte getByte(long address);
//为给定地址设置byte类型的值(当且仅当该内存地址为allocateMemory分配时,此方法结果才是确定的)
public native void putByte(long address, byte x);

通常,我们在Java中创建的对象都处于堆内内存(heap)中,堆内内存是由JVM所管控的Java进程内存,并且它们遵循JVM的内存管理机制,JVM会采用垃圾回收机制统一管理堆内存。与之相对的是堆外内存,存在于JVM管控之外的内存区域,Java中对堆外内存的操作,依赖于Unsafe提供的操作堆外内存的native方法。

使用堆外内存的原因

  • 对垃圾回收停顿的改善。由于堆外内存是直接受操作系统管理而不是JVM,所以当我们使用堆外内存时,即可保持较小的堆内内存规模。从而在GC时减少回收停顿对于应用的影响。
  • 提升程序I/O操作的性能。通常在I/O通信过程中,会存在堆内内存到堆外内存的数据拷贝操作,对于需要频繁进行内存间数据拷贝且生命周期较短的暂存数据,都建议存储到堆外内存。

典型应用

DirectByteBuffer是Java用于实现堆外内存的一个重要类,通常用在通信过程中做缓冲池,如在Netty、MINA等NIO框架中应用广泛。DirectByteBuffer对于堆外内存的创建、使用、销毁等逻辑均由Unsafe提供的堆外内存API来实现。

下图为DirectByteBuffer构造函数,创建DirectByteBuffer的时候,通过Unsafe.allocateMemory分配内存、Unsafe.setMemory进行内存初始化,而后构建Cleaner对象用于跟踪DirectByteBuffer对象的垃圾回收,以实现当DirectByteBuffer被垃圾回收时,分配的堆外内存一起被释放。

img

那么如何通过构建垃圾回收追踪对象Cleaner实现堆外内存释放呢?

Cleaner继承自Java四大引用类型之一的虚引用PhantomReference(众所周知,无法通过虚引用获取与之关联的对象实例,且当对象仅被虚引用引用时,在任何发生GC的时候,其均可被回收),通常PhantomReference与引用队列ReferenceQueue结合使用,可以实现虚引用关联对象被垃圾回收时能够进行系统通知、资源清理等功能。如下图所示,当某个被Cleaner引用的对象将被回收时,JVM垃圾收集器会将此对象的引用放入到对象引用中的pending链表中,等待Reference-Handler进行相关处理。其中,Reference-Handler为一个拥有最高优先级的守护线程,会循环不断的处理pending链表中的对象引用,执行Cleaner的clean方法进行相关清理工作。

Tip: 这块基本不懂啊……等复习完多线程得恶补一下 JVM 的相关知识了…

img

CAS 相关

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* CAS
* @param o 包含要修改field的对象
* @param offset 对象中某field的偏移量
* @param expected 期望值
* @param update 更新值
* @return true | false
*/
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object update);

public final native boolean compareAndSwapInt(Object o, long offset, int expected,int update);

public final native boolean compareAndSwapLong(Object o, long offset, long expected, long update);

什么是CAS? 即比较并替换,实现并发算法时常用到的一种技术。CAS操作包含三个操作数——内存位置、预期原值及新值。执行CAS操作的时候,将内存位置的值与预期原值比较,如果相匹配,那么处理器会自动将该位置值更新为新值,否则,处理器不做任何操作。我们都知道,CAS是一条CPU的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe提供的CAS方法(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg。

线程调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14

//取消阻塞线程
public native void unpark(Object thread);
//阻塞线程
public native void park(boolean isAbsolute, long time);
//获得对象锁(可重入锁)
@Deprecated
public native void monitorEnter(Object o);
//释放对象锁
@Deprecated
public native void monitorExit(Object o);
//尝试获取对象锁
@Deprecated
public native boolean tryMonitorEnter(Object o);

如上源码说明中,方法park、unpark即可实现线程的挂起与恢复,将一个线程进行挂起是通过park方法实现的,调用park方法后,线程将一直阻塞直到超时或者中断等条件出现;unpark可以终止一个挂起的线程,使其恢复正常。

Java锁和同步器框架的核心类AbstractQueuedSynchronizer,就是通过调用LockSupport.park()LockSupport.unpark()实现线程的阻塞和唤醒的,而LockSupport的park、unpark方法实际是调用Unsafe的park、unpark方式来实现。

Class相关

此部分主要提供Class和它的静态字段的操作相关方法,包含静态字段内存定位、定义类、定义匿名类、检验&确保初始化等。

1
2
3
4
5
6
7
8
9
10
11
12
//获取给定静态字段的内存地址偏移量,这个值对于给定的字段是唯一且固定不变的
public native long staticFieldOffset(Field f);
//获取一个静态类中给定字段的对象指针
public native Object staticFieldBase(Field f);
//判断是否需要初始化一个类,通常在获取一个类的静态属性的时候(因为一个类如果没初始化,它的静态属性也不会初始化)使用。 当且仅当ensureClassInitialized方法不生效时返回false。
public native boolean shouldBeInitialized(Class<?> c);
//检测给定的类是否已经初始化。通常在获取一个类的静态属性的时候(因为一个类如果没初始化,它的静态属性也不会初始化)使用。
public native void ensureClassInitialized(Class<?> c);
//定义一个类,此方法会跳过JVM的所有安全检查,默认情况下,ClassLoader(类加载器)和ProtectionDomain(保护域)实例来源于调用者
public native Class<?> defineClass(String name, byte[] b, int off, int len, ClassLoader loader, ProtectionDomain protectionDomain);
//定义一个匿名类
public native Class<?> defineAnonymousClass(Class<?> hostClass, byte[] data, Object[] cpPatches);

对象操作

此部分主要包含对象成员属性相关操作及非常规的对象实例化方式等相关方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//返回对象成员属性在内存地址相对于此对象的内存地址的偏移量
public native long objectFieldOffset(Field f);
//获得给定对象的指定地址偏移量的值,与此类似操作还有:getInt,getDouble,getLong,getChar等
public native Object getObject(Object o, long offset);
//给定对象的指定地址偏移量设值,与此类似操作还有:putInt,putDouble,putLong,putChar等
public native void putObject(Object o, long offset, Object x);
//从对象的指定偏移量处获取变量的引用,使用volatile的加载语义
public native Object getObjectVolatile(Object o, long offset);
//存储变量的引用到对象的指定的偏移量处,使用volatile的存储语义
public native void putObjectVolatile(Object o, long offset, Object x);
//有序、延迟版本的putObjectVolatile方法,不保证值的改变被其他线程立即看到。只有在field被volatile修饰符修饰时有效
public native void putOrderedObject(Object o, long offset, Object x);
//绕过构造方法、初始化代码来创建对象
public native Object allocateInstance(Class<?> cls) throws InstantiationException;
  • 常规对象实例化方式:我们通常所用到的创建对象的方式,从本质上来讲,都是通过new机制来实现对象的创建。但是,new机制有个特点就是当类只提供有参的构造函数且无显示声明无参构造函数时,则必须使用有参构造函数进行对象构造,而使用有参构造函数时,必须传递相应个数的参数才能完成对象实例化。
  • 非常规的实例化方式:而Unsafe中提供allocateInstance方法,仅通过Class对象就可以创建此类的实例对象,而且不需要调用其构造函数、初始化代码、JVM安全检查等。它抑制修饰符检测,也就是即使构造器是private修饰的也能通过此方法实例化,只需提类对象即可创建相应的对象。由于这种特性,allocateInstance在java.lang.invoke、Objenesis(提供绕过类构造器的对象生成方式)、Gson(反序列化时用到)中都有相应的应用。

数组相关

这部分主要介绍与数据操作相关的arrayBaseOffset与arrayIndexScale这两个方法,两者配合起来使用,即可定位数组中每个元素在内存中的位置。

1
2
3
4
//返回数组中第一个元素的偏移地址
public native int arrayBaseOffset(Class<?> arrayClass);
//返回数组中一个元素占用的大小
public native int arrayIndexScale(Class<?> arrayClass);

这两个与数据操作相关的方法,在java.util.concurrent.atomic 包下的AtomicIntegerArray(可以实现对Integer数组中每个元素的原子性操作)中有典型的应用,如下图AtomicIntegerArray源码所示,通过Unsafe的arrayBaseOffset、arrayIndexScale分别获取数组首元素的偏移地址base及单个元素大小因子scale。

内存屏障

在Java 8中引入,用于定义内存屏障(也称内存栅栏,内存栅障,屏障指令等,是一类同步屏障指令,是CPU或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作),避免代码重排序。

在Java 8中引入了一种锁的新机制——StampedLock,它可以看成是读写锁的一个改进版本。StampedLock提供了一种乐观读锁的实现,这种乐观读锁类似于无锁的操作,完全不会阻塞写线程获取写锁,从而缓解读多写少时写线程“饥饿”现象。由于StampedLock提供的乐观读锁不阻塞写线程获取读锁,当线程共享变量从主内存load到线程工作内存时,会存在数据不一致问题,所以当使用StampedLock的乐观读锁时,需要遵从如下图用例中使用的模式来确保数据的一致性。

img

系统相关

这部分包含两个获取系统相关信息的方法。

1
2
3
4
//返回系统指针的大小。返回值为4(32位系统)或 8(64位系统)。
public native int addressSize();
//内存页的大小,此值为2的幂次方。
public native int pageSize();

各种锁复习

各种锁的对比

偏向锁、轻量级锁、自旋锁、重量级锁

并发编程之锁优化

Thank you for your accept. mua!
-------------本文结束感谢您的阅读-------------