多线程-Part04 | Eloise's Paradise
0%

多线程-Part04

本章节主要介绍多线程中wait-notify相关的知识点.

Wait-Notify正确姿势及虚假唤醒问题

TestBestPracticeStepC_5.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.joshua.waitnotify;

import com.joshua.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

/**
* @author Joshua.H.Brooks
* @description
* @date 2022-05-06 21:39
*/
@Slf4j(topic = "TestBestPracticeStepC_5")
public class TestBestPracticeStepC_5 {
public static final Object room = new Object();
private static boolean hasCigar = false;
private static boolean foodDelivered = false;

public static void main(String[] args) {

new Thread(() -> {
synchronized (room) {
Thread thread = Thread.currentThread();
log.debug("有烟没? [{}]",hasCigar);
if(!hasCigar){
log.debug("没外卖, 先歇会");
try {
room.wait();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("有烟没? [{}]",hasCigar);
if(hasCigar){
log.debug("可以开始干活了");
}else{
log.debug("没干成活");
}

}
}, "小男").start();


new Thread(() -> {
synchronized (room) {
Thread thread = Thread.currentThread();
log.debug("外卖送到没? [{}]",foodDelivered);
if(!foodDelivered){
log.debug("没外卖, 先歇会");
try {
room.wait();
}catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("外卖送到没? [{}]",foodDelivered);
if(foodDelivered){
log.debug("可以开始干活了");
}else{
log.debug("没干成活");
}

}
}, "小女").start();


Sleeper.sleep(1_000);
new Thread(()->{
synchronized (room) {
foodDelivered = true;
log.debug("外卖到了哦");
room.notify();
}
},"送外卖的").start();
}

}

上面的代码中room.notify(); 是随机挑一个room里等待的线程唤醒, 如果唤醒的是“小女”, 那么小男线程的hasCigar还是false, 所以两个等待的线程还是不能干活, 结果如下:

Screenshot 2022-05-07 at 09.51.29

顺着思路很自然是想如果能全部唤醒是不是就能让等待线程继续干活呢? 所以将代码行room.notify();改成room.notifyAll();,结果如下, 发现只有“小女”能继续干活。

Screenshot 2022-05-07 at 09.58.33

那么问题来了, 要怎么才能唤醒小男线程呢? 或者怎么能避免小男线程被虚假唤醒呢? 将小男线程的if(!hasCigar)判断换成while(!hasCigar){即可。换完执行看结果如下:

Screenshot 2022-05-07 at 10.07.25

发现小男线程没有往下执行, 而是进入了下一轮等待, 所以避免了虚假唤醒。

正确姿势

1
2
3
4
5
6
7
8
9
synchronized (lock){
while (!condition) {
lock.wait();
}
}
//另一个线程
synchronized (lock){
lock.notifyAll();
}

同步模式之保护性暂停

概念

保护性暂停(guarded suspension)用在一个线程等待另一个线程的结果。

要点

  • 有一个结果需要从一个线程传递到另一个线程,让他们挂链同一个GuardedObject
  • 如果有结果不断从一个线程传递到另一个线程, 那么可以使用消息队列MQ
  • JDK中join和future使用的就是此模式
  • 因为要等待另一方的结果,所以归类到同步模式

Screenshot 2022-05-07 at 10.20.07

演示

为了演示上述图例,编写如下代码

Downloader.download() 获取百度首页返回内容组装成List 返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.joshua.util;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

/**
* @author Joshua.H.Brooks
* @description 获取百度首页返回内容组装成List<String> 返回
* @date 2022-05-07 10:31
*/
public class Downloader {
public static List<String> download() throws IOException {
HttpURLConnection conn = (HttpURLConnection)new URL("https://www.baidu.com/").openConnection();
ArrayList<String> lines = new ArrayList<>();
try(BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))){
String line;
while ((line = reader.readLine())!=null) {
lines.add(line);
}
}
return lines;
}
}

测试类: GuardedSuspension

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.joshua.waitnotify;

import com.joshua.util.Downloader;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;

/**
* @author Joshua.H.Brooks
* @description 同步模式之保护性暂停
* @date 2022-05-07 10:26
*/
@Slf4j(topic = "c.GuardedSuspension")
public class GuardedSuspension {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(() -> {
log.debug("等待结果");
//获取,当结果返回前阻塞
List<String> list = (List) guardedObject.get();
log.debug("结果返回啦~~~,结果为:\r\n [{}]", list.toString());
}, "t1").start();

new Thread(() -> {
try {
//执行下载,返回结果即关联对象
List<String> list = Downloader.download();
//将关联对象传给另一个线程
log.debug("获取到结果,即将传递给需要的线程");
guardedObject.generate(list);
} catch (IOException e) {
e.printStackTrace();
}
}, "t2").start();
}
}

class GuardedObject {
/**
* 关联的保护对象
*/
private Object response;

/**
* @return 要获取的结果
*/
public Object get() {
synchronized (this) {
while (response == null) {
try {

this.wait();

} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("有人传递结果来了。");
return response;
}
}

/**
* 产生结果
*
* @param response
*/
public void generate(Object response) {
synchronized (this) {
this.notifyAll();
}
this.response = response;
}
}

运行结果如下:

Screenshot 2022-05-07 at 11.30.28

但是,如果等待的线程等待另一线程传回结果的时间过长, 则会影响性能。 所以考虑能否让等待有一个超时限制, 超过了时限就自动返回。则在原有GuardedSuspension类的基础上添加一个超时参数的public Object get(long timeout) {}方法。

优化后的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.joshua.waitnotify;

import com.joshua.util.Downloader;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* @author Joshua.H.Brooks
* @description 同步模式之保护性暂停
* @date 2022-05-07 10:26
*/
@Slf4j(topic = "c.GuardedSuspension")
public class GuardedSuspension {
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(() -> {
log.debug("等待结果");
//获取,当结果返回前阻塞
List<String> list = new ArrayList<>();
if (guardedObject.get(100) instanceof List) {
list = (List) guardedObject.get(100);
} else {
list.add(guardedObject.get(100).toString());
}
log.debug("结果返回啦~~~,结果为:\r\n [{}]", list.toString());
}, "t1").start();

new Thread(() -> {
try {
//执行下载,返回结果即关联对象
List<String> list = Downloader.download();
//将关联对象传给另一个线程
log.debug("获取到结果,即将传递给需要的线程");
guardedObject.generate(list);
} catch (IOException e) {
e.printStackTrace();
}
}, "t2").start();
}
}

@Slf4j(topic = "c.GuardedObject")
class GuardedObject {
/**
* 关联的保护对象
*/
private Object response;

/**
* @return 要获取的结果
*/
public Object get() {
synchronized (this) {
while (response == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("有人传递结果来了。");
return response;
}
}

/**
* @param timeout 等待的超时时间 单位毫秒
* @return 传递的关联结果
*/
public Object get(long timeout) {
synchronized (this) {
//开始时间
long start = System.currentTimeMillis();
//经历时间
long timeElapsed = 0;
while (response == null) {
if (timeElapsed >= timeout) {
break;
}
try {
long timeToWait = timeout - timeElapsed;
log.debug("还需等待 [{}]", timeToWait);
this.wait(timeToWait);
} catch (InterruptedException e) {
e.printStackTrace();
}
timeElapsed = System.currentTimeMillis() - start;
}

}
if (response == null) {
response = "已等待" + timeout + "ms,结果尚未返回。";
}
return response;
}

/**
* 产生结果
*
* @param response
*/
public void generate(Object response) {
synchronized (this) {
this.notifyAll();
}
this.response = response;
}
}

运行看结果:

情况一: 等待超时了, 返回提示信息。

Screenshot 2022-05-07 at 12.08.55

情况二:未超时, 正常返回给需要的线程。

Screenshot 2022-05-07 at 13.42.55

应用

join方法原理

Thread 类里join方法的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Waits for this thread to die.
*
* <p> An invocation of this method behaves in exactly the same
* way as the invocation
*
* <blockquote>
* {@linkplain #join(long) join}{@code (0)}
* </blockquote>
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
public final void join() throws InterruptedException {
join(0);
}

可以看见底层是调了含参数的重载方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Waits at most {@code millis} milliseconds for this thread to
* die. A timeout of {@code 0} means to wait forever.
*
* <p> This implementation uses a loop of {@code this.wait} calls
* conditioned on {@code this.isAlive}. As a thread terminates the
* {@code this.notifyAll} method is invoked. It is recommended that
* applications not use {@code wait}, {@code notify}, or
* {@code notifyAll} on {@code Thread} instances.
*
* @param millis
* the time to wait in milliseconds
*
* @throws IllegalArgumentException
* if the value of {@code millis} is negative
*
* @throws InterruptedException
* if any thread has interrupted the current thread. The
* <i>interrupted status</i> of the current thread is
* cleared when this exception is thrown.
*/
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis(); //记录开始时间
long now = 0;

if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}

if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now; //还需等待多久
if (delay <= 0) { //跳出循环的条件是:无需再等待
break;
}
wait(delay);
now = System.currentTimeMillis() - base; //已经等待多久
}
}
}

可以看出 join源码的模式与保护性暂停类似, 只是join等待的是另一个线程的结束, 而上面代码演示的是等待另一个线程传入的结果。原理一致。

Future原理

Screenshot 2022-05-07 at 14.22.45

当有多对线程两两相互关联应用保护性暂停的需求时。如上图Futures就是保存关联对象的集合。左侧的t0, t2, t4 分别等待右侧t1, t3, t5的结果。 就好比现实生活中不同房号的居民分别等待不同快递公司的快递员送达货品。

那么如何维护这个关联对象集合与线程间的对应关系就变得复杂。因此, 设计一个接偶的中间类, 这样不仅能够解藕 消费者(等待结果方) 和 生产者(结果创建方)。 还能同时支持多个任务的管理。

上面的模型图可以用如下代码演示: 居民线程要等待对应的邮递员把对应的信送到,即两两有序

1.邮递员(线程)2. 居民(线程)3. 解藕对象 (用来完成上述t0, t2, t4 和t1, t3, t5的两两关联) 4 主线程测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package com.joshua.waitnotify;

import com.joshua.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.Hashtable;
import java.util.Map;
import java.util.Set;

/**
* @author Joshua.H.Brooks
* @description
* @date 2022-05-07 14:31
*/
public class GuardedSuspension_Multi_8 {
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new People().start();
}
Sleeper.sleep(1_000);
Mailboxes.getIds().forEach((id) -> {
new Postman(id, "内容" + id).start();
});

}
}


@Slf4j(topic = "c.People")
class People extends Thread {
@Override
public void run() {
GuardedObject2 go = Mailboxes.createGuardedObject2();
log.debug("开始收信 id [{}]", go.getId());
//等待邮递员送信5000ms
Object mail = go.get(5000);
log.debug("收到信件 ID:{}", go.getId(), mail);
}
}

@Slf4j(topic = "c.Postman")
class Postman extends Thread {
private int id;
private String mail;

public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}

/**
* 1。获取即将要送的这封信是哪封
* 2。然后打印即将送的日志
* 3。最后唤醒一个people线程。 (可能会虚假唤醒, 但是被虚假唤醒的线程会进入下一轮wait)
*/
@Override
public void run() {
GuardedObject2 go = Mailboxes.popSentMail(id);
log.debug("送信 id: {}", go.getId());
go.generate(mail);
}
}

/**
* 该对象用来解藕
*/
@Slf4j(topic = "c.Mailboxes")
class Mailboxes {
private static Map<Integer, GuardedObject2> boxes = new Hashtable<>();
/**
* 此id就是来将people-id和postman-id对应上的键
*/
private static int id = 1;

/**
* 因为id会被多个线程访问, 也就是临界区变量, 所以要synchronized修饰保证安全性
*
* @return
*/
private static synchronized int generateId() {
return id++;
}

/**
* 用remove是因为场景是一次性的, 邮递员将信递给用户后就应该从原有集合去除, 否则该集合会一直增大最终堆内存溢出
*
* @param id
* @return
*/
public static GuardedObject2 popSentMail(int id) {
return boxes.remove(id);
}

/**
* 多个关联对象放进集合便于管理。
*
* @return
*/
public static GuardedObject2 createGuardedObject2() {
GuardedObject2 go = new GuardedObject2(generateId());
boxes.put(go.getId(), go);
return go;
}

/**
* 用来通知主线程应该创建多少个邮递员送信线程
*
* @return
*/
public static Set<Integer> getIds() {
return boxes.keySet();
}
}

@Slf4j(topic = "c.GuardedObject2")
class GuardedObject2 {

/**
* 关联对象的id
*/
private int id;

public GuardedObject2(int id) {
this.id = id;
}

public int getId() {
return id;
}

/**
* 关联的保护对象
*/
private Object response;

/**
* @return 要获取的结果
*/
public Object get() {
synchronized (this) {
while (response == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("有人传递结果来了。");
return response;
}
}

/**
* @param timeout 等待的超时时间 单位毫秒
* @return 传递的关联结果
*/
public Object get(long timeout) {
synchronized (this) {
//开始时间
long start = System.currentTimeMillis();
//经历时间
long timeElapsed = 0;
while (response == null) {
if (timeElapsed >= timeout) {
break;
}
try {
long timeToWait = timeout - timeElapsed;
log.debug("还需等待ms: {}", timeToWait);
this.wait(timeToWait);
} catch (InterruptedException e) {
e.printStackTrace();
}
timeElapsed = System.currentTimeMillis() - start;
}

}
if (response == null) {
response = "已等待" + timeout + "ms,结果尚未返回。";
}
return response;
}

/**
* 产生结果
*
* @param response
*/
public void generate(Object response) {
synchronized (this) {
this.notify();
}
this.response = response;
}
}

测试结果(如下)可以看出无论是哪个ID都是按照这样的顺序: 开始收信1–> 送信1 –> 收到信 1

Screenshot 2022-05-08 at 11.04.57

异步模式之生产者/消费者

主要特点:

  • 与前面保护性暂停的多关联对象不同, 不需要产生结果和消费结果的线程一一对应
  • 消费队列可以用来平衡生产和消费的线程资源
  • 生产者仅负责产生结果,不关心数据如何处理,而消费者专心处理结果数据
  • 消息队列谁有容量限制的,满载时不会再加入数据,空栽时不会再消费数据
  • JDK中各种阻塞队列就是采用的这种模式

⚠️ 之所以称之为异步是因为该模式中消费者消费可能延时。 如下图中t3产生的消息t3(d)没有被及时消费,而是先消费队列前面的t1(a).

Screenshot 2022-05-08 at 11.14.40

代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package com.joshua.waitnotify;

import com.joshua.util.Sleeper;
import lombok.extern.slf4j.Slf4j;

import java.util.LinkedList;

/**
* @author Joshua.H.Brooks
* @description 用来演示生产者/消费者
* @date 2022-05-08 11:23
*/
@Slf4j(topic = "c.ProducerConsumer")
public class MQ_9 {
public static void main(String[] args) {
MessageQ mq = new MessageQ(3);
/**
* 生产者
*/
for (int i = 0; i < 5; i++) {
// lambda表达式引用外部变量必须是final的, 所以在这里定一个id 方便后续使用
final int id = i;
new Thread(() -> {
mq.put(new Message(id, String.valueOf(id)));
}, "生产者-" + i).start();
}
/**
* 消费者
*/

new Thread(() -> {
while (true) {
Sleeper.sleep(1_000);
Message msg = mq.take();
}
}, "消费者").start();


}
}

/**
* 消息队列模型实体
*/
@Slf4j(topic = "c.MQ")
class MessageQ {
private LinkedList<Message> llist = new LinkedList<>();
private int capacity;

public MessageQ(int capacity) {
this.capacity = capacity;
}

/**
* 如果list是空则等待, 否则从队列头部取出第一个元素返回
*
* @return
*/
public Message take() {
synchronized (llist) {
while (llist.isEmpty()) {
try {
log.debug("队列为空, 消费者线程需等待");
llist.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message msg = llist.removeFirst();
log.debug("已消费消息" + msg);
llist.notifyAll();
return msg;
}
}

/**
* 如果list是满则等待, 否则从队列尾部存入一个元素
*
* @return
*/
public void put(Message message) {
synchronized (llist) {
while (llist.size() == capacity) {
try {
log.debug("队列已满, 生产者线程需等待");
llist.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
llist.addLast(message);
log.debug("已生产消息" + message);
llist.notifyAll();
}
}

}

/**
* 消息实体
*/
final class Message {
private int id;
private String value;

public int getId() {
return id;
}

public String getValue() {
return value;
}

public Message(int id, String value) {
this.id = id;
this.value = value;
}

@Override
public String toString() {
return "Message{" +
"id=" + id +
", value='" + value + '\'' +
'}';
}
}

测试结果:

生产者消费者模式测试结果

-------------本文结束感谢您的阅读-------------