Mutex | 8lovelife's life
0%

Mutex

记录下多线程情况下,如何避免资源竞争带来的问题

资源竞争

多线程在运行期间,若存在数据的共享,不同线程对资源的争抢会造成彼方线程的处理错乱。下面是两个线程将数值从零累加10次的例子(预计结果为10)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
class Count implements Runnable {

private int countResult = 0;

@Override
public void run() {

for (int i = 0; i < 5; i++) {
count();
}

}

private void count() {

try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
int temp = countResult;
countResult++;
System.out.println(Thread.currentThread().getName() + " Count Before " + temp + ", Count After "
+ countResult);

}

public void countResult() {
System.out.println("The result of count is " + countResult);
}

}

Count count = new Count();
Thread t1 = new Thread(count, "T1");
Thread t2 = new Thread(count, "T2");

t1.start();
t2.start();
t1.join();
t2.join();
count.countResult();

// 输出
T1 Count Before 0, Count After 1
T2 Count Before 1, Count After 2
T1 Count Before 2, Count After 3
T2 Count Before 3, Count After 4
T2 Count Before 5, Count After 6
T1 Count Before 4, Count After 6
T2 Count Before 6, Count After 7
T1 Count Before 7, Count After 8
T1 Count Before 8, Count After 9
T2 Count Before 8, Count After 9
The result of count is 9 // 并非是预期的结果 10

image

当线程或进程依赖某一资源(代码中的countResult),会发生资源竞争。使针对资源的操作互斥,能够解决资源竞争带来的结果不可预测

关键字synchronized

Java中提供了Synchronize关键字来解决资源竞争问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private synchronized void count() {

try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
int temp = countResult;
countResult++;
System.out.println(Thread.currentThread().getName() + " Count Before " + temp + ", Count After "
+ countResult);

}

// 将资源依赖的代码方法上加上 synchronized 关键字,问题得到解决

T1 Count Before 0, Count After 1
T1 Count Before 1, Count After 2
T1 Count Before 2, Count After 3
T1 Count Before 3, Count After 4
T1 Count Before 4, Count After 5
T2 Count Before 5, Count After 6
T2 Count Before 6, Count After 7
T2 Count Before 7, Count After 8
T2 Count Before 8, Count After 9
T2 Count Before 9, Count After 10
The result of count is 10

image

synchronized方法与方法块

线程想进入synchronized修饰的代码的前提是先获得对象的监视器锁

  1. synchronized修饰方法,则需要先获得当前实例对象的监视器锁
  2. synchronized修饰代码块,则需要先获得synchronized所指定对象的监视器锁(修饰代码块更方便控制同步的范围)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class SynchronizedTest {

public synchronized void syncMethod() {
try {
syncBlock();
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

public void noSyncMethod() {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

public void syncBlock() {

synchronized (this) {
System.out.println(Thread.currentThread().getName() + ":I am in syncBlock");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

}

SynchronizedTest synchronizedTest = new SynchronizedTest();

Thread tOne = new Thread(() -> synchronizedTest.syncMethod(), "One");
tOne.start(); // 获取对象实例(synchronizedTest)监视器锁,并可重入监视器锁(可以执行syncBlock)

Thread tTwo = new Thread(() -> synchronizedTest.syncMethod(), "Two");
tTwo.start(); // 等待tOne释放对象监视器锁,BLOCKED状态

Thread tThree = new Thread(() -> synchronizedTest.noSyncMethod(), "Three");
tThree.start(); // 不需要对象监视器锁即可执行

Thread tFour = new Thread(() -> synchronizedTest.syncBlock(), "Four");
tFour.start(); // 等待释放对象监视器锁,BLOCKED状态

System.out.println("One will be TIMED_WAITING: " + tOne.getState());
System.out.println("Two will be BLOCKED: " + tTwo.getState());
System.out.println("Three will be TIMED_WAITING: " + tThree.getState());
System.out.println("Four will be BLOCKED: " + tFour.getState());
tOne.join();
tTwo.join();
tThree.join();
tFour.join();

// 输出
One:I am in syncBlock
One will be TIMED_WAITING: TIMED_WAITING
Two will be BLOCKED: BLOCKED
Three will be TIMED_WAITING: TIMED_WAITING
Four will be BLOCKED: BLOCKED
Four:I am in syncBlock
Two:I am in syncBlock

Process finished with exit code 0

多个线程同时阻塞等待进入同步块,当锁持有者离开同步块,这些等待的线程谁能进入同步块是不确定的,因为同步是不公平的(先来的不一定先进入同步块),这种不公平可能导致Thread Starvation

监视器锁

synchronized到底是什么?为啥可以控制资源竞争。先看看编译后的字节码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
Mac:ThreadExamples mac$ javac SynchronizedMethodBlock.java
Mac:ThreadExamples mac$ javap -c SynchronizedMethodBlock.class

Compiled from "SynchronizedMethodBlock.java"
public class dmztest.ThreadExamples.SynchronizedMethodBlock {
public dmztest.ThreadExamples.SynchronizedMethodBlock();
Code:
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return

public synchronized void syncMethod(); // 关键字synchronized用在方法上,则由JVM级别实现同步
Code:
0: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
3: ldc #3 // String I am sync Method !
5: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
8: return

public void syncBlock();
Code:
0: aload_0
1: dup
2: astore_1
// 尝试获得对象的监视器 ,每个对象都拥有一个监视器
// 当monitor为0说明monitorenter能够获得监视器,线程获得监视器后将monitor+1,该线程为对象monitor所有者
// 具有monitor所有者的线程能够重新进入监视器monitor+1。如果非monitor持有者的线程尝试monitorenter将会阻塞,直到monitor=0后获取monitor拥有权
3: monitorenter
4: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream;
7: ldc #5 // String I am sync Block !
9: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
12: aload_1
13: monitorexit // 释放monitor所有权,只有拥有monitor的线程才可以释放,释放后monitor-1,monitor为0时则线程失去monitor所有权
14: goto 22
17: astore_2
18: aload_1
19: monitorexit // 当代码块中出现异常则会释放monitor拥有权
20: aload_2
21: athrow
22: return
Exception table:
from to target type
4 14 17 any
17 20 17 any
}

关键字synchronized在方法上与在代码块上的功能是一样的,方法上的synchronized成为JVM对此方法上的访问标志,方法块上的synchronized会填充一系列字节码到class中

synchronized锁优化

synchronized是互斥锁(重量级锁),互斥会带来用户态到内核态的线程切换的性能问题。在JDK1.6以后对synchronized进行了优化,synchronized的同步来自JVM的实现,由C++实现CAS,利用CPU指令执行原子操作(cmpxchg汇编指令,当多核CPU情况下会加lock)

  1. 锁粗化:将多个连续的锁扩展为范围更大的锁,减少频繁的互斥同步导致的性能开销
  2. 锁消除:JVM即时编译器在运行时,通过逃逸分析出”如果一段代码中堆数据永远不会被其他线程访问”则将锁取消
  3. 轻量级锁:在没有多线程竞争的情况下,避免重量级的互斥锁,通过CAS完成锁的获取与释放
  4. 偏向锁:消除数据在无竞争的情况下执行同步,若下一次同一线程进入,则偏向该线程,无需任何同步操作
  5. 适应性自旋:避免线程频繁切换的消耗,使用自旋的方式。适应性自旋的时间会根据上一次获取锁自旋的时间动态变化

不适合synchronized的应用场景

  1. 中断阻塞在获取监控锁的线程
  2. 公平的获得监视器锁

Lock

Lock相比synchronized更加灵活,synchronized无法解决的场景Lock可以解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class ThreadWithInterrupt {

private Lock lock = new ReentrantLock();

public void write() {

try {

lock.lock();
long startTime = System.currentTimeMillis();
System.out.println("写数据。。。");

for (; ; ) {
if (System.currentTimeMillis() - startTime > 10000) {
break;
}
}

System.out.println("数据写完了!");

} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}

}

public void read() throws InterruptedException {

System.out.println("等待数据读。。。");
lock.lockInterruptibly();

try {

System.out.println("数据读完了。");

} finally {
lock.unlock();
}

}

}


ThreadWithInterrupt threadWithInterrupt = new ThreadWithInterrupt();
Thread write = new Thread(() -> threadWithInterrupt.write());
Thread read = new Thread(() -> {

try {
threadWithInterrupt.read();
} catch (InterruptedException e) {
System.out.println("数据未读。");
}

});

write.start();
read.start();

new Thread(() -> {
long startTime = System.currentTimeMillis();
for (;;) {
if (System.currentTimeMillis() - startTime > 500) {
System.out.println("写的太慢了,不读了。");
read.interrupt();
break;
}
}
}).start();

// 输出
写数据。。。
等待数据读。。。
写的太慢了,不读了。
数据未读。
数据写完了!

Process finished with exit code 0

ReentrantLock

可重入锁,当前线程持有lock后可再次获得此锁。重入锁包括公平锁与不公平锁,公平锁保证先发起的lock请求会先获得锁,后发起的lock请求后获得锁,非公平锁则不能保证“先来先锁”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class ThreadLockFairOrNon {

private Lock lock;

private Integer concurrency;

public ThreadLockFairOrNon(Boolean isFair, Integer concurrency) {

this.lock = new ReentrantLock(isFair);
this.concurrency = concurrency;

}

public Boolean isSeqSorted(List<String> seqs) {

if (!String.valueOf(concurrency).equals(seqs.get(concurrency))) {

return false;
}

return true;
}

public Boolean lockSequence() {

List<String> seqs = new ArrayList<>();
CountDownLatch waitForFirstThreadCount = new CountDownLatch(1);
CountDownLatch waitForAllThreadCount = new CountDownLatch(concurrency + 1);
CountDownLatch waitForSeqThreadCount = new CountDownLatch(concurrency - 1);
try {
new Thread(() -> {

try {
lock.lock();
waitForFirstThreadCount.countDown();
seqs.add(Thread.currentThread().getName());
waitForSeqThreadCount.await();
lock.unlock();
waitForAllThreadCount.countDown();
} catch (Exception e) {
e.printStackTrace();
}


}, "0").start();

waitForFirstThreadCount.await();

for (int j = 1; j <= concurrency - 1; j++) {

new Thread(() -> {
waitForSeqThreadCount.countDown();
lock.lock();
seqs.add(Thread.currentThread().getName());
lock.unlock();
waitForAllThreadCount.countDown();

}, "" + j).start();

}

waitForSeqThreadCount.await();

// 准备争抢
new Thread(() -> {

try {
lock.lock();
seqs.add(Thread.currentThread().getName());
lock.unlock();
waitForAllThreadCount.countDown();

} catch (Exception e) {
e.printStackTrace();
}

}, "" + concurrency).start();

waitForAllThreadCount.await();

} catch (InterruptedException e) {
e.printStackTrace();
}

return isSeqSorted(seqs);

}

}

FairSync

image

  1. 事例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Integer wrongSeqs = 0;

for (int i = 0; i < 1000; i++) {
Boolean isSeqsSorted = new ThreadLockFairOrNon(true, 100).lockSequence();

if (!isSeqsSorted) {
wrongSeqs++;
}

}

System.out
.println("1000 samples cause wrong seqs :" + wrongSeqs + " times");

// 输出
1000 samples cause wrong seqs :0 times
  1. 源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final void lock() {
acquire(1);
}

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && ## 排队等待,很公平
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

NonfairSync

image

  1. 事例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Integer wrongSeqs = 0;

for (int i = 0; i < 1000; i++) {
Boolean isSeqsSorted = new ThreadLockFairOrNon(false, 100).lockSequence();

if (!isSeqsSorted) {
wrongSeqs++;
}

}

System.out
.println("1000 samples cause wrong seqs :" + wrongSeqs + " times");

// 输出
1000 samples cause wrong seqs :950 times
  1. 源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
final void lock() {
if (compareAndSetState(0, 1)) ## 插队!不公平
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}


public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) { ## 插队!不公平
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

I have lived my life in a dream. - Puyi

The Last Emperor