Java线程基础 | 8lovelife's life
0%

Java线程基础

记录下Java中多线程的使用

线程的生命周期

  1. NEW:线程被创建未启动
  2. RUNNABLE:线程为启动状态,被虚拟机执行或者是等待系统资源中
  3. WAITING:线程调用 Object.wait、Thread.join或LockSupport.park方法,线程进入WAITING状态
  4. TIMED_WAITING:线程调用 Thread.sleep 、Object.wait(10) 、Thread.join(10)、LockSupport.parkNanos 或 LockSupport.parkUntil方法 ,线程进入TIMED_WAITING状态
  5. BLOCKED:线程等待获取锁
  6. TERMINATED:线程死亡

image

线程的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class ThreadStates extends Thread {

@Override
public void run() {

try {
threadCurrentState("RUNNABLE", this);
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

public void entryMonitor() {

try {
synchronized (ThreadStates.class) {
TimeUnit.SECONDS.sleep(3);
}

} catch (Exception e) {
e.printStackTrace();
}

}


public void threadCurrentState(String name, Thread thread) {
System.out.println(name + "'s state is " + thread.getState());
}

public void newRunnableTimedWaitingTerminated() throws InterruptedException {

threadCurrentState("NEW", this);

this.start();

TimeUnit.SECONDS.sleep(1);

threadCurrentState("TIMED_WAITING", this);

Runtime.getRuntime().addShutdownHook(new Thread(() -> threadCurrentState("TERMINATED", this)));

Thread joinT = new Thread(() -> entryMonitor());

joinT.start();

Thread thread = Thread.currentThread();

new Thread(() -> {
try {

Thread blockedT = new Thread(() -> entryMonitor());
blockedT.start();
TimeUnit.SECONDS.sleep(1);
threadCurrentState("BLOCKED", blockedT);

TimeUnit.SECONDS.sleep(1);
threadCurrentState("WAITING", thread);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

joinT.join();

}

}


try {
new ThreadStates().newRunnableTimedWaitingTerminated();
} catch (InterruptedException e) {
e.printStackTrace();
}

// 输出
NEW's state is NEW
RUNNABLE's state is RUNNABLE
TIMED_WAITING's state is TIMED_WAITING
BLOCKED's state is BLOCKED
WAITING's state is WAITING
TERMINATED's state is TERMINATED

Process finished with exit code 0


线程的创建与启动

下面是几种在Java中运行线程的方式

  1. 继承Thread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ThreadExtend extends Thread {

public ThreadExtend() {
super();
}

public ThreadExtend(String name) {
super(name);
}

@Override
public void run() {
System.out.println("I'm "+getName()+", coming from extend");
}

}

new ThreadExtend("ThreadExtend").start();
  1. 实现Runnable接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ThreadRunnable implements Runnable {

private String name;

public ThreadRunnable(String name) {
this.name = name;
}

@Override
public void run() {
System.out.println("I'm "+name+", coming from runnable");

}

}

new Thread(new ThreadRunnable("ThreadRunnable")).start();
  1. 实现Callable接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ThreadWithCallable implements Callable<String> {

@Override
public String call() {
return "Thread Callable";
}

}

ThreadWithCallable threadWithCallable = new ThreadWithCallable();
FutureTask<String> futureTask = new FutureTask(threadWithCallable);
new Thread(futureTask).start();
try {
System.out.println(futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
  1. 基于线程池
1
2
3
4
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(new ThreadExtend());
System.out.println(executorService.submit(new ThreadWithCallable()).get());
executorService.shutdown();

线程常用的相关方法

sleep

sleep()是线程的方法,此方法会使线程睡眠,sleep()不会释放当前线程已经拥有的监视器。若其他线程线程调用了睡眠线程的interrupt()方法或sleep的时间已到,则线程进入Runnable状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
long waitingTime = 10;

Thread sleepThread = new Thread(() -> {

try {
System.out.println("I'm gonna go to bed !");
TimeUnit.SECONDS.sleep(waitingTime);
System.out.println("I was waken up !");
} catch (InterruptedException e) {
System.out.println("I was interrupted !");
}

});

sleepThread.start();
sleepThread.interrupt();

Object#wait & notify/notifyAll

  • wait()是Object类中定义的方法,当线程持有了Object的监视器才可以调用Object的wait()方法,wiat()会释放持有的监视器。假设当前线程获得了对象锁,则当其他线程试图获得此对象锁时,会导致线程阻塞(Blocked)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class WaitNotifyTest implements Serializable {

private List<Integer> queue = new ArrayList<>();
private int CAPACITY = 5;

public static void main(String[] args) {
WaitNotifyTest waitNotify = new WaitNotifyTest();
Thread produceT = new Thread(waitNotify.new Produce());
Thread consumerT = new Thread(waitNotify.new Consumer());
produceT.start();
consumerT.start();
}

class Produce implements Runnable {
int count;

@Override
public void run() {
while (true) {
try {
produce(count++);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

private void produce(int count) throws InterruptedException {

synchronized (queue) {
if (queue.size() == CAPACITY) {
System.out.println("Queue is Full...");
queue.wait();
System.out.println("Produce wait over !");
return;
}
Thread.sleep(1000);
queue.add(count);
System.out.println("Produced :" + count);
queue.notify();
}
}
}

class Consumer implements Runnable {

@Override
public void run() {
while (true) {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private void consume() throws InterruptedException {
synchronized (queue) {
if (queue.isEmpty()) {
System.out.println("Queue is Empty...");
queue.wait();
System.out.println("Consume wait over !");
return;
}

Thread.sleep(1000);
int i = queue.remove(0);
System.out.println("Consumed :" + i);
queue.notify();
}
}
}
}
  • wait()方法
1
2
3
4
5
6
public final void wait() throws InterruptedException {
wait(0); // 是个本地方法
}

public final native void wait(long timeout) throws InterruptedException;
// 其他线程调用notify()/notifyAll()可以唤醒由wait()方法导致waiting的线程(或者wait等待的时间过期)

LockSupport#park & unpark

  • park()功能同wait(),只是wait在调用前必须先获得监视器。对于线程的唤醒来说,unpark可以唤醒指定的线程,而notify/notifyAll无法唤醒指定的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ForWakeUpThread implements Runnable {

private Thread waitForWakeUp;

public ForWakeUpThread(Thread thread) {
this.waitForWakeUp = thread;
}

@Override
public void run() {

try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Wake Up Main Thread !");
LockSupport.unpark(waitForWakeUp);

}

}

ForWakeUpThread wakeUpT = new ForWakeUpThread(Thread.currentThread());
new Thread(wakeUpT).start();
LockSupport.park();
System.out.println("Main Thread Over !");

Condition#await & signal

  • await()支持多个等待队列,监视器锁只有一个等待队列,当notify/notifyAll被调用的时候系统无法知道该唤醒消费者还是生产者。await()与signal()利用了LockSupport#park/unpark方法将线程阻塞与唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public class LockConditionTest implements Serializable {

private List<Integer> queue = new ArrayList<>();
private Lock lock = new ReentrantLock();
private Condition produce = lock.newCondition();
private Condition consume = lock.newCondition();
private int CAPACITY = 5;

public static void main(String[] args) {
LockConditionTest waitNotify = new LockConditionTest();
Thread produceT = new Thread(waitNotify.new Produce());
Thread consumerT = new Thread(waitNotify.new Consumer());
produceT.start();
consumerT.start();
}

class Produce implements Runnable {

int count;

@Override
public void run() {
while (true) {
try {
produce(count++);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}

private void produce(int count) throws InterruptedException {

try {
lock.lock();
if (queue.size() == CAPACITY) {
System.out.println("Queue is Full...");
produce.await();
System.out.println("Produce wait over !");
return;
}
Thread.sleep(1000);
queue.add(count);
System.out.println("Produced :" + count);
consume.signal();
} finally {
lock.unlock();
}
}
}

class Consumer implements Runnable {

@Override
public void run() {
while (true) {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

private void consume() throws InterruptedException {

try {
lock.lock();
if (queue.isEmpty()) {
System.out.println("Queue is Empty...");
consume.await();
System.out.println("Consume wait over !");
return;
}

Thread.sleep(1000);
int i = queue.remove(0);
System.out.println("Consumed :" + i);
produce.signal();
} finally {
lock.unlock();
}
}
}

join

  • join()方法会阻塞调用此方法的线程,当线程执行结束(或者join的时间过期)则等待的线程会继续运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class JoinThreadPartOne implements Runnable {

@Override
public void run() {

try {
TimeUnit.SECONDS.sleep(10);
System.out.println("Thread Part One Is Over !");
} catch (InterruptedException e) {
e.printStackTrace();
}

}

}

Thread partOneT = new Thread(new JoinThreadPartOne());
partOneT.start();
partOneT.join(); // 阻塞主线程,当partOneT执行结束,主线程将继续执行
System.out.println("Main Thread Is Over !");
System.in.read();

  • join方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final void join() throws InterruptedException {
join(0);
}

public final synchronized void join(long millis) // 调用此方法需要获取监视器
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0); // 调用的是wait()方法
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}

priority

线程可以设置优先级,优先级较高的线程优先于具有较低优先级的线程执行(并非绝对的优先,由系统尽量保证,所以线程优先级不能作为编码的逻辑控制)。线程优先级因不同的操作系统而有所不同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ThreadPriority extends Thread {

private int priority;
private boolean shouldYield;
private String name;

public ThreadPriority(int priority, boolean shouldYield, String name) {
this.priority = priority;
setPriority(priority);
this.shouldYield = shouldYield;
this.name = name;
}

@Override
public void run() {

System.out.println("Thread " + name + " With " + priority + " Priority Start");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}

if (shouldYield) {
System.out.println("Thread " + name + " With " + priority + " Priority Yield");
Thread.yield();
}

System.out.println("Thread " + name + " With " + priority + " Priority End");

}
}

ThreadPriority minPriorityT = new ThreadPriority(Thread.MIN_PRIORITY, false,"A");
ThreadPriority maxPriorityT = new ThreadPriority(Thread.MAX_PRIORITY, false,"B");

minPriorityT.start();
maxPriorityT.start();


yield

yield()方法是让出当前线程使用的CPU时间片,也就是当前线程让出的CPU时间片让其他同等优先级的线程使用。让出的意图是否被系统接受(如线程优先级一样)完全看系统心情

1
2
3
4
5
6
7

ThreadPriority minPriorityT = new ThreadPriority(Thread.MIN_PRIORITY, true,"A");
ThreadPriority maxPriorityT = new ThreadPriority(Thread.MIN_PRIORITY, false,"B");

minPriorityT.start();
maxPriorityT.start();

interrupt

中断线程的执行,由操作系统向线程发送事件信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class ThreadInterrupt implements Runnable {

@Override
public void run() {

for(;;) {

boolean isInterrupt = Thread.currentThread().isInterrupted();
if (isInterrupt) {
System.out.println("Current Thread's isInterrupt " + isInterrupt);
return;
}

}

}

}

Thread interruptT = new Thread(new ThreadInterrupt());
interruptT.start();
TimeUnit.SECONDS.sleep(10);
System.out.println("Interrupt ThreadInterrupt !");
interruptT.interrupt();
interruptT.join();

daemon

若线程设置为守护线程,则当虚拟机中的所有用户线程结束或虚拟机退出,相应的守护线程随之结束(当用户线程都结束了,守护线程的使命即终止)。在守护线程中创建的线程也是守护线程,在用户线程中创建的是线程也是用户线程,线程池框架会将守护线程转为用户线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class DaemonThread implements Runnable {

@Override
public void run() {

MemoryMXBean memorymbean = ManagementFactory.getMemoryMXBean();
MemoryUsage usage = memorymbean.getHeapMemoryUsage();
for (; ; ) {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("INIT HEAP: " + usage.getInit() / 1024 / 1024 + "MB");
System.out.println("MAX HEAP: " + usage.getMax() / 1024 / 1024 + "MB");
System.out.println("USE HEAP: " + usage.getUsed() / 1024 / 1024 + "MB");

System.out.println();

} catch (InterruptedException e) {
e.printStackTrace();
}

}

}
}

class AliveThread implements Runnable {

@Override
public void run() {
for(;;) {
try {
TimeUnit.SECONDS.sleep(2);
System.out.println("I'm alive !");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

// 1.System.exit(0) 会退出虚拟机,无论当前是否有守护、用户线程运行
Thread daemonT = new Thread(new DaemonThread());
daemonT.setDaemon(true);
Thread aliveT = new Thread(new AliveThread());
daemonT.start();
aliveT.start();
System.out.println("Main Thread Over !");
TimeUnit.SECONDS.sleep(5);
System.exit(0);

// 1.输出
Main Thread Over !
I'm alive !
INIT HEAP: 64MB
MAX HEAP: 910MB
USE HEAP: 5MB

I'm alive !
INIT HEAP: 64MB
MAX HEAP: 910MB
USE HEAP: 5MB

Process finished with exit code 0


// 2.JVM等待所有用户线程执行完成后才会退出
Thread aliveT = new Thread(new AliveThread());
aliveT.start();
System.out.println("Main Thread Over !");

// 2.输出
Main Thread Over !
I'm alive !
I'm alive !
I'm alive !
I'm alive !
I'm alive !
I'm alive !
I'm alive !(无限执行下去)

// 3.用户线程都执行完成后,JVM退出
Thread daemonT = new Thread(new DaemonThread());
daemonT.setDaemon(true);
System.out.println("Main Thread Over !");

// 3.输出
INIT HEAP: 64MB
MAX HEAP: 910MB
USE HEAP: 5MB

INIT HEAP: 64MB
MAX HEAP: 910MB
USE HEAP: 5MB

Main Thread Over !

Process finished with exit code 0

// 4.守护线程一直守护用户线程
Thread daemonT = new Thread(new DaemonThread());
daemonT.setDaemon(true);
Thread aliveT = new Thread(new AliveThread());
daemonT.start();
aliveT.start();
System.out.println("Main Thread Over !");

// 4.输出
Main Thread Over !
I'm alive !
INIT HEAP: 64MB
MAX HEAP: 910MB
USE HEAP: 6MB

I'm alive !
INIT HEAP: 64MB
MAX HEAP: 910MB
USE HEAP: 6MB

I'm alive !
INIT HEAP: 64MB
MAX HEAP: 910MB
USE HEAP: 6MB

I'm alive !
INIT HEAP: 64MB
MAX HEAP: 910MB
USE HEAP: 6MB(无限执行下去)

多线程可能产生的问题

活跃性故障

Thread Starvation

如果一个线程一直得不到共享的资源(为什么其他线程都会得到资源,而我一直不能。。),这个线程就很“饥饿”,即 Thread Starvation。导致线程饥饿的原因:

  1. 线程进入同步方法造成的阻塞等待,可能导致线程饥饿
  2. 高优先级的线程抢走资源,可能导致低优先级的线程饥饿
  3. 调用了wait()方法的线程,notify()不能保证所有线程的wait都终止,可能导致线程饥饿

imageimageimage

公平锁

公平锁会公平的分配多线程竞争的资源,避免导致线程饥饿

image

Thread Deadlock

假设当前有线程A和B,线程A获得资源AA的锁并尝试获得资源BB的锁,而此时线程B已经获得了资源BB的锁并尝试获得资源AA的锁 –>产生死锁,死锁状态下的线程状态为BLOCKED。如同不懂退让的两只山羊过桥,两只羊各先占一半桥,同时又都需要对方的那一半桥

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class DeadLockOccur {

private Object sourceALock = new Object();

private Object sourceBLock = new Object();

public void needA_B() {

synchronized (sourceALock) {

try {
TimeUnit.SECONDS.sleep(1);

synchronized (sourceBLock) {
TimeUnit.SECONDS.sleep(2);
}

} catch (InterruptedException e) {
e.printStackTrace();
}

}
}


public void needB_A() {

synchronized (sourceBLock) {

try {
TimeUnit.SECONDS.sleep(1);

synchronized (sourceALock) {
TimeUnit.SECONDS.sleep(2);
}

} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
}


DeadLockOccur deadLock = new DeadLockOccur();
Thread ABT = new Thread(() -> deadLock.needA_B());
Thread BAT = new Thread(() -> deadLock.needB_A());

ABT.start();
BAT.start();

TimeUnit.SECONDS.sleep(2);
System.out.println("ABT will be BLOCKED:" + ABT.getState());
System.out.println("BAT will be BLOCKED:" + BAT.getState());

// 输出
ABT will be BLOCKED:BLOCKED
BAT will be BLOCKED:BLOCKED

image

将上例中所需资源一次性锁定可以解决死锁问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private Object sourceABLock = new Object();

public void needA_B() {

synchronized (sourceABLock) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

public void needB_A() {

synchronized (sourceABLock) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

Thread Livelock

线程的活锁指的是线程间都在“为对方考虑”,对方存活就把资源让给对方,这样互相谦让自己的资源,最后反而谁都得不到资源,活锁状态下的线程状态为RUNNABLE。有没有遇到过这种情况:你走在路上迎面来了个人,你躲让到一边想让对方通过,在这同时对方也躲让到了你躲让的这一边(你们又面对面了),如此反复

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Worker {

private String name;
private boolean active;

public Worker(String name, boolean active) {
this.name = name;
this.active = active;
}


public synchronized void work(Worker otherWorker) {

while (active) {

if (otherWorker.active) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " handover the resource to " + otherWorker.name);
continue;
}

System.out.println(name + " working on the resource");
active = false;
}

}

}


Worker w1 = new Worker("Work1", true);
Worker w2 = new Worker("Work2", true);

Thread w1T = new Thread(() -> w1.work(w2));
Thread w2T = new Thread(() -> w2.work(w1));

w1T.start();
w2T.start();


// 输出
Work1 handover the resource to Work2
Work2 handover the resource to Work1
Work1 handover the resource to Work2
Work2 handover the resource to Work1
Work1 handover the resource to Work2
Work2 handover the resource to Work1(无限执行下去)

资源顺序的执行可以避免上述的活锁

What is the weight of a soul? How much does a heart weigh? - Joan Clarke

The Imitation Game