记录下多线程情况下,如何避免资源竞争带来的问题
资源竞争 多线程在运行期间,若存在数据的共享,不同线程对资源的争抢会造成彼方线程的处理错乱。下面是两个线程将数值从零累加10次的例子(预计结果为10)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 class Count implements Runnable { private int countResult = 0; @Override public void run() { for (int i = 0; i < 5; i++) { count(); } } private void count() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } int temp = countResult; countResult++; System.out.println(Thread.currentThread().getName() + " Count Before " + temp + ", Count After " + countResult); } public void countResult() { System.out.println("The result of count is " + countResult); } } Count count = new Count(); Thread t1 = new Thread(count, "T1"); Thread t2 = new Thread(count, "T2"); t1.start(); t2.start(); t1.join(); t2.join(); count.countResult(); // 输出 T1 Count Before 0, Count After 1 T2 Count Before 1, Count After 2 T1 Count Before 2, Count After 3 T2 Count Before 3, Count After 4 T2 Count Before 5, Count After 6 T1 Count Before 4, Count After 6 T2 Count Before 6, Count After 7 T1 Count Before 7, Count After 8 T1 Count Before 8, Count After 9 T2 Count Before 8, Count After 9 The result of count is 9 // 并非是预期的结果 10
当线程或进程依赖某一资源(代码中的countResult),会发生资源竞争。使针对资源的操作互斥,能够解决资源竞争带来的结果不可预测
关键字synchronized Java中提供了Synchronize关键字来解决资源竞争问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private synchronized void count() { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } int temp = countResult; countResult++; System.out.println(Thread.currentThread().getName() + " Count Before " + temp + ", Count After " + countResult); } // 将资源依赖的代码方法上加上 synchronized 关键字,问题得到解决 T1 Count Before 0, Count After 1 T1 Count Before 1, Count After 2 T1 Count Before 2, Count After 3 T1 Count Before 3, Count After 4 T1 Count Before 4, Count After 5 T2 Count Before 5, Count After 6 T2 Count Before 6, Count After 7 T2 Count Before 7, Count After 8 T2 Count Before 8, Count After 9 T2 Count Before 9, Count After 10 The result of count is 10
synchronized方法与方法块 线程想进入synchronized修饰的代码的前提是先获得对象的监视器锁
synchronized修饰方法,则需要先获得当前实例对象的监视器锁
synchronized修饰代码块,则需要先获得synchronized所指定对象的监视器锁(修饰代码块更方便控制同步的范围)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 class SynchronizedTest { public synchronized void syncMethod() { try { syncBlock(); TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } public void noSyncMethod() { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } public void syncBlock() { synchronized (this) { System.out.println(Thread.currentThread().getName() + ":I am in syncBlock"); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } } SynchronizedTest synchronizedTest = new SynchronizedTest(); Thread tOne = new Thread(() -> synchronizedTest.syncMethod(), "One"); tOne.start(); // 获取对象实例(synchronizedTest)监视器锁,并可重入监视器锁(可以执行syncBlock) Thread tTwo = new Thread(() -> synchronizedTest.syncMethod(), "Two"); tTwo.start(); // 等待tOne释放对象监视器锁,BLOCKED状态 Thread tThree = new Thread(() -> synchronizedTest.noSyncMethod(), "Three"); tThree.start(); // 不需要对象监视器锁即可执行 Thread tFour = new Thread(() -> synchronizedTest.syncBlock(), "Four"); tFour.start(); // 等待释放对象监视器锁,BLOCKED状态 System.out.println("One will be TIMED_WAITING: " + tOne.getState()); System.out.println("Two will be BLOCKED: " + tTwo.getState()); System.out.println("Three will be TIMED_WAITING: " + tThree.getState()); System.out.println("Four will be BLOCKED: " + tFour.getState()); tOne.join(); tTwo.join(); tThree.join(); tFour.join(); // 输出 One:I am in syncBlock One will be TIMED_WAITING: TIMED_WAITING Two will be BLOCKED: BLOCKED Three will be TIMED_WAITING: TIMED_WAITING Four will be BLOCKED: BLOCKED Four:I am in syncBlock Two:I am in syncBlock Process finished with exit code 0
多个线程同时阻塞等待进入同步块,当锁持有者离开同步块,这些等待的线程谁能进入同步块是不确定的,因为同步是不公平的(先来的不一定先进入同步块),这种不公平可能导致Thread Starvation
监视器锁 synchronized到底是什么?为啥可以控制资源竞争。先看看编译后的字节码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 Mac:ThreadExamples mac$ javac SynchronizedMethodBlock.java Mac:ThreadExamples mac$ javap -c SynchronizedMethodBlock.class Compiled from "SynchronizedMethodBlock.java" public class dmztest.ThreadExamples.SynchronizedMethodBlock { public dmztest.ThreadExamples.SynchronizedMethodBlock(); Code: 0: aload_0 1: invokespecial #1 // Method java/lang/Object."<init>":()V 4: return public synchronized void syncMethod(); // 关键字synchronized用在方法上,则由JVM级别实现同步 Code: 0: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc #3 // String I am sync Method ! 5: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 8: return public void syncBlock(); Code: 0: aload_0 1: dup 2: astore_1 // 尝试获得对象的监视器 ,每个对象都拥有一个监视器 // 当monitor为0说明monitorenter能够获得监视器,线程获得监视器后将monitor+1,该线程为对象monitor所有者 // 具有monitor所有者的线程能够重新进入监视器monitor+1。如果非monitor持有者的线程尝试monitorenter将会阻塞,直到monitor=0后获取monitor拥有权 3: monitorenter 4: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream; 7: ldc #5 // String I am sync Block ! 9: invokevirtual #4 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 12: aload_1 13: monitorexit // 释放monitor所有权,只有拥有monitor的线程才可以释放,释放后monitor-1,monitor为0时则线程失去monitor所有权 14: goto 22 17: astore_2 18: aload_1 19: monitorexit // 当代码块中出现异常则会释放monitor拥有权 20: aload_2 21: athrow 22: return Exception table: from to target type 4 14 17 any 17 20 17 any }
关键字synchronized在方法上与在代码块上的功能是一样的,方法上的synchronized成为JVM对此方法上的访问标志,方法块上的synchronized会填充一系列字节码到class中
synchronized锁优化 synchronized是互斥锁(重量级锁),互斥会带来用户态到内核态的线程切换的性能问题。在JDK1.6以后对synchronized进行了优化,synchronized的同步来自JVM的实现,由C++实现CAS,利用CPU指令执行原子操作(cmpxchg汇编指令,当多核CPU情况下会加lock)
锁粗化:将多个连续的锁扩展为范围更大的锁,减少频繁的互斥同步导致的性能开销
锁消除:JVM即时编译器在运行时,通过逃逸分析出”如果一段代码中堆数据永远不会被其他线程访问”则将锁取消
轻量级锁:在没有多线程竞争的情况下,避免重量级的互斥锁,通过CAS完成锁的获取与释放
偏向锁:消除数据在无竞争的情况下执行同步,若下一次同一线程进入,则偏向该线程,无需任何同步操作
适应性自旋:避免线程频繁切换的消耗,使用自旋的方式。适应性自旋的时间会根据上一次获取锁自旋的时间动态变化
不适合synchronized的应用场景
中断阻塞在获取监控锁的线程
公平的获得监视器锁
Lock Lock相比synchronized更加灵活,synchronized无法解决的场景Lock可以解决
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 class ThreadWithInterrupt { private Lock lock = new ReentrantLock(); public void write() { try { lock.lock(); long startTime = System.currentTimeMillis(); System.out.println("写数据。。。"); for (; ; ) { if (System.currentTimeMillis() - startTime > 10000) { break; } } System.out.println("数据写完了!"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void read() throws InterruptedException { System.out.println("等待数据读。。。"); lock.lockInterruptibly(); try { System.out.println("数据读完了。"); } finally { lock.unlock(); } } } ThreadWithInterrupt threadWithInterrupt = new ThreadWithInterrupt(); Thread write = new Thread(() -> threadWithInterrupt.write()); Thread read = new Thread(() -> { try { threadWithInterrupt.read(); } catch (InterruptedException e) { System.out.println("数据未读。"); } }); write.start(); read.start(); new Thread(() -> { long startTime = System.currentTimeMillis(); for (;;) { if (System.currentTimeMillis() - startTime > 500) { System.out.println("写的太慢了,不读了。"); read.interrupt(); break; } } }).start(); // 输出 写数据。。。 等待数据读。。。 写的太慢了,不读了。 数据未读。 数据写完了! Process finished with exit code 0
ReentrantLock 可重入锁,当前线程持有lock后可再次获得此锁。重入锁包括公平锁与不公平锁,公平锁保证先发起的lock请求会先获得锁,后发起的lock请求后获得锁,非公平锁则不能保证“先来先锁”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 class ThreadLockFairOrNon { private Lock lock; private Integer concurrency; public ThreadLockFairOrNon(Boolean isFair, Integer concurrency) { this.lock = new ReentrantLock(isFair); this.concurrency = concurrency; } public Boolean isSeqSorted(List<String> seqs) { if (!String.valueOf(concurrency).equals(seqs.get(concurrency))) { return false; } return true; } public Boolean lockSequence() { List<String> seqs = new ArrayList<>(); CountDownLatch waitForFirstThreadCount = new CountDownLatch(1); CountDownLatch waitForAllThreadCount = new CountDownLatch(concurrency + 1); CountDownLatch waitForSeqThreadCount = new CountDownLatch(concurrency - 1); try { new Thread(() -> { try { lock.lock(); waitForFirstThreadCount.countDown(); seqs.add(Thread.currentThread().getName()); waitForSeqThreadCount.await(); lock.unlock(); waitForAllThreadCount.countDown(); } catch (Exception e) { e.printStackTrace(); } }, "0").start(); waitForFirstThreadCount.await(); for (int j = 1; j <= concurrency - 1; j++) { new Thread(() -> { waitForSeqThreadCount.countDown(); lock.lock(); seqs.add(Thread.currentThread().getName()); lock.unlock(); waitForAllThreadCount.countDown(); }, "" + j).start(); } waitForSeqThreadCount.await(); // 准备争抢 new Thread(() -> { try { lock.lock(); seqs.add(Thread.currentThread().getName()); lock.unlock(); waitForAllThreadCount.countDown(); } catch (Exception e) { e.printStackTrace(); } }, "" + concurrency).start(); waitForAllThreadCount.await(); } catch (InterruptedException e) { e.printStackTrace(); } return isSeqSorted(seqs); } }
FairSync
事例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Integer wrongSeqs = 0; for (int i = 0; i < 1000; i++) { Boolean isSeqsSorted = new ThreadLockFairOrNon(true, 100).lockSequence(); if (!isSeqsSorted) { wrongSeqs++; } } System.out .println("1000 samples cause wrong seqs :" + wrongSeqs + " times"); // 输出 1000 samples cause wrong seqs :0 times
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 final void lock() { acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && ## 排队等待,很公平 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
NonfairSync
事例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Integer wrongSeqs = 0; for (int i = 0; i < 1000; i++) { Boolean isSeqsSorted = new ThreadLockFairOrNon(false, 100).lockSequence(); if (!isSeqsSorted) { wrongSeqs++; } } System.out .println("1000 samples cause wrong seqs :" + wrongSeqs + " times"); // 输出 1000 samples cause wrong seqs :950 times
源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 final void lock() { if (compareAndSetState(0, 1)) ## 插队!不公平 setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { ## 插队!不公平 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
I have lived my life in a dream. - Puyi
The Last Emperor