本章节主要介绍多线程中wait-notify相关的知识点.
Wait-Notify正确姿势及虚假唤醒问题 TestBestPracticeStepC_5.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package com.joshua.waitnotify;import com.joshua.util.Sleeper;import lombok.extern.slf4j.Slf4j;@Slf 4j(topic = "TestBestPracticeStepC_5" )public class TestBestPracticeStepC_5 { public static final Object room = new Object(); private static boolean hasCigar = false ; private static boolean foodDelivered = false ; public static void main (String[] args) { new Thread(() -> { synchronized (room) { Thread thread = Thread.currentThread(); log.debug("有烟没? [{}]" ,hasCigar); if (!hasCigar){ log.debug("没外卖, 先歇会" ); try { room.wait(); }catch (InterruptedException e) { e.printStackTrace(); } } log.debug("有烟没? [{}]" ,hasCigar); if (hasCigar){ log.debug("可以开始干活了" ); }else { log.debug("没干成活" ); } } }, "小男" ).start(); new Thread(() -> { synchronized (room) { Thread thread = Thread.currentThread(); log.debug("外卖送到没? [{}]" ,foodDelivered); if (!foodDelivered){ log.debug("没外卖, 先歇会" ); try { room.wait(); }catch (InterruptedException e) { e.printStackTrace(); } } log.debug("外卖送到没? [{}]" ,foodDelivered); if (foodDelivered){ log.debug("可以开始干活了" ); }else { log.debug("没干成活" ); } } }, "小女" ).start(); Sleeper.sleep(1_000 ); new Thread(()->{ synchronized (room) { foodDelivered = true ; log.debug("外卖到了哦" ); room.notify(); } },"送外卖的" ).start(); } }
上面的代码中room.notify(); 是随机挑一个room里等待的线程唤醒, 如果唤醒的是“小女 ”, 那么小男线程的hasCigar还是false , 所以两个等待的线程还是不能干活, 结果如下:
顺着思路很自然是想如果能全部唤醒是不是就能让等待线程继续干活呢? 所以将代码行room.notify(); 改成room.notifyAll(); ,结果如下, 发现只有“小女 ”能继续干活。
那么问题来了, 要怎么才能唤醒小男线程呢? 或者怎么能避免小男线程被虚假唤醒呢? 将小男线程的if(!hasCigar) 判断换成while(!hasCigar){ 即可。换完执行看结果如下:
发现小男线程 没有往下执行, 而是进入了下一轮等待, 所以避免了虚假唤醒。
正确姿势
1 2 3 4 5 6 7 8 9 synchronized (lock){ while (!condition) { lock.wait(); } } synchronized (lock){ lock.notifyAll(); }
同步模式之保护性暂停 概念 保护性暂停(guarded suspension)用在一个线程等待另一个线程的结果。
要点
有一个结果需要从一个线程传递到另一个线程,让他们挂链同一个GuardedObject
如果有结果不断从一个线程传递到另一个线程, 那么可以使用消息队列MQ
JDK中join和future使用的就是此模式
因为要等待另一方的结果,所以归类到同步模式
演示 为了演示上述图例,编写如下代码
Downloader.download() 获取百度首页返回内容组装成List 返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.joshua.util;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.HttpURLConnection;import java.net.URL;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.List;public class Downloader { public static List<String> download () throws IOException { HttpURLConnection conn = (HttpURLConnection)new URL("https://www.baidu.com/" ).openConnection(); ArrayList<String> lines = new ArrayList<>(); try (BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))){ String line; while ((line = reader.readLine())!=null ) { lines.add(line); } } return lines; } }
测试类: GuardedSuspension
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package com.joshua.waitnotify;import com.joshua.util.Downloader;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.util.List;@Slf 4j(topic = "c.GuardedSuspension" )public class GuardedSuspension { public static void main (String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { log.debug("等待结果" ); List<String> list = (List) guardedObject.get(); log.debug("结果返回啦~~~,结果为:\r\n [{}]" , list.toString()); }, "t1" ).start(); new Thread(() -> { try { List<String> list = Downloader.download(); log.debug("获取到结果,即将传递给需要的线程" ); guardedObject.generate(list); } catch (IOException e) { e.printStackTrace(); } }, "t2" ).start(); } } class GuardedObject { private Object response; public Object get () { synchronized (this ) { while (response == null ) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("有人传递结果来了。" ); return response; } } public void generate (Object response) { synchronized (this ) { this .notifyAll(); } this .response = response; } }
运行结果如下:
但是,如果等待的线程等待另一线程传回结果的时间过长, 则会影响性能。 所以考虑能否让等待有一个超时限制, 超过了时限就自动返回。则在原有GuardedSuspension 类的基础上添加一个超时参数的public Object get(long timeout) {} 方法。
优化后的代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 package com.joshua.waitnotify;import com.joshua.util.Downloader;import lombok.extern.slf4j.Slf4j;import java.io.IOException;import java.util.ArrayList;import java.util.List;@Slf 4j(topic = "c.GuardedSuspension" )public class GuardedSuspension { public static void main (String[] args) { GuardedObject guardedObject = new GuardedObject(); new Thread(() -> { log.debug("等待结果" ); List<String> list = new ArrayList<>(); if (guardedObject.get(100 ) instanceof List) { list = (List) guardedObject.get(100 ); } else { list.add(guardedObject.get(100 ).toString()); } log.debug("结果返回啦~~~,结果为:\r\n [{}]" , list.toString()); }, "t1" ).start(); new Thread(() -> { try { List<String> list = Downloader.download(); log.debug("获取到结果,即将传递给需要的线程" ); guardedObject.generate(list); } catch (IOException e) { e.printStackTrace(); } }, "t2" ).start(); } } @Slf 4j(topic = "c.GuardedObject" )class GuardedObject { private Object response; public Object get () { synchronized (this ) { while (response == null ) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("有人传递结果来了。" ); return response; } } public Object get (long timeout) { synchronized (this ) { long start = System.currentTimeMillis(); long timeElapsed = 0 ; while (response == null ) { if (timeElapsed >= timeout) { break ; } try { long timeToWait = timeout - timeElapsed; log.debug("还需等待 [{}]" , timeToWait); this .wait(timeToWait); } catch (InterruptedException e) { e.printStackTrace(); } timeElapsed = System.currentTimeMillis() - start; } } if (response == null ) { response = "已等待" + timeout + "ms,结果尚未返回。" ; } return response; } public void generate (Object response) { synchronized (this ) { this .notifyAll(); } this .response = response; } }
运行看结果:
情况一: 等待超时了, 返回提示信息。
情况二:未超时, 正常返回给需要的线程。
应用 join方法原理 Thread 类里join方法的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public final void join () throws InterruptedException { join(0 ); }
可以看见底层是调了含参数的重载方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public final synchronized void join (long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0 ; if (millis < 0 ) { throw new IllegalArgumentException("timeout value is negative" ); } if (millis == 0 ) { while (isAlive()) { wait(0 ); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0 ) { break ; } wait(delay); now = System.currentTimeMillis() - base; } } }
可以看出 join源码的模式与保护性暂停类似, 只是join等待的是另一个线程的结束, 而上面代码演示的是等待另一个线程传入的结果。原理一致。
Future原理
当有多对线程两两相互关联应用保护性暂停的需求时。如上图Futures就是保存关联对象的集合。左侧的t0, t2, t4 分别等待右侧t1, t3, t5的结果。 就好比现实生活中不同房号的居民分别等待不同快递公司的快递员送达货品。
那么如何维护这个关联对象集合与线程间的对应关系就变得复杂。因此, 设计一个接偶的中间类, 这样不仅能够解藕 消费者(等待结果方) 和 生产者(结果创建方)。 还能同时支持多个任务的管理。
上面的模型图 可以用如下代码演示: 居民线程要等待对应的邮递员把对应的信送到,即两两有序
1.邮递员(线程)2. 居民(线程)3. 解藕对象 (用来完成上述t0, t2, t4 和t1, t3, t5的两两关联) 4 主线程测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 package com.joshua.waitnotify;import com.joshua.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.Hashtable;import java.util.Map;import java.util.Set;public class GuardedSuspension_Multi_8 { public static void main (String[] args) { for (int i = 0 ; i < 3 ; i++) { new People().start(); } Sleeper.sleep(1_000 ); Mailboxes.getIds().forEach((id) -> { new Postman(id, "内容" + id).start(); }); } } @Slf 4j(topic = "c.People" )class People extends Thread { @Override public void run () { GuardedObject2 go = Mailboxes.createGuardedObject2(); log.debug("开始收信 id [{}]" , go.getId()); Object mail = go.get(5000 ); log.debug("收到信件 ID:{}" , go.getId(), mail); } } @Slf 4j(topic = "c.Postman" )class Postman extends Thread { private int id; private String mail; public Postman (int id, String mail) { this .id = id; this .mail = mail; } @Override public void run () { GuardedObject2 go = Mailboxes.popSentMail(id); log.debug("送信 id: {}" , go.getId()); go.generate(mail); } } @Slf 4j(topic = "c.Mailboxes" )class Mailboxes { private static Map<Integer, GuardedObject2> boxes = new Hashtable<>(); private static int id = 1 ; private static synchronized int generateId () { return id++; } public static GuardedObject2 popSentMail (int id) { return boxes.remove(id); } public static GuardedObject2 createGuardedObject2 () { GuardedObject2 go = new GuardedObject2(generateId()); boxes.put(go.getId(), go); return go; } public static Set<Integer> getIds () { return boxes.keySet(); } } @Slf 4j(topic = "c.GuardedObject2" )class GuardedObject2 { private int id; public GuardedObject2 (int id) { this .id = id; } public int getId () { return id; } private Object response; public Object get () { synchronized (this ) { while (response == null ) { try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("有人传递结果来了。" ); return response; } } public Object get (long timeout) { synchronized (this ) { long start = System.currentTimeMillis(); long timeElapsed = 0 ; while (response == null ) { if (timeElapsed >= timeout) { break ; } try { long timeToWait = timeout - timeElapsed; log.debug("还需等待ms: {}" , timeToWait); this .wait(timeToWait); } catch (InterruptedException e) { e.printStackTrace(); } timeElapsed = System.currentTimeMillis() - start; } } if (response == null ) { response = "已等待" + timeout + "ms,结果尚未返回。" ; } return response; } public void generate (Object response) { synchronized (this ) { this .notify(); } this .response = response; } }
测试结果(如下)可以看出无论是哪个ID都是按照这样的顺序: 开始收信1–> 送信1 –> 收到信 1
异步模式之生产者/消费者 主要特点:
与前面保护性暂停的多关联对象不同, 不需要产生结果和消费结果的线程一一对应
消费队列可以用来平衡生产和消费的线程资源
生产者仅负责产生结果,不关心数据如何处理,而消费者专心处理结果数据
消息队列谁有容量限制的,满载时不会再加入数据,空栽时不会再消费数据
JDK中各种阻塞队列就是采用的这种模式
⚠️ 之所以称之为异步是因为该模式中消费者消费可能延时。 如下图中t3产生的消息t3(d)没有被及时消费,而是先消费队列前面的t1(a).
代码演示 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 package com.joshua.waitnotify;import com.joshua.util.Sleeper;import lombok.extern.slf4j.Slf4j;import java.util.LinkedList;@Slf 4j(topic = "c.ProducerConsumer" )public class MQ_9 { public static void main (String[] args) { MessageQ mq = new MessageQ(3 ); for (int i = 0 ; i < 5 ; i++) { final int id = i; new Thread(() -> { mq.put(new Message(id, String.valueOf(id))); }, "生产者-" + i).start(); } new Thread(() -> { while (true ) { Sleeper.sleep(1_000 ); Message msg = mq.take(); } }, "消费者" ).start(); } } @Slf 4j(topic = "c.MQ" )class MessageQ { private LinkedList<Message> llist = new LinkedList<>(); private int capacity; public MessageQ (int capacity) { this .capacity = capacity; } public Message take () { synchronized (llist) { while (llist.isEmpty()) { try { log.debug("队列为空, 消费者线程需等待" ); llist.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Message msg = llist.removeFirst(); log.debug("已消费消息" + msg); llist.notifyAll(); return msg; } } public void put (Message message) { synchronized (llist) { while (llist.size() == capacity) { try { log.debug("队列已满, 生产者线程需等待" ); llist.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } llist.addLast(message); log.debug("已生产消息" + message); llist.notifyAll(); } } } final class Message { private int id; private String value; public int getId () { return id; } public String getValue () { return value; } public Message (int id, String value) { this .id = id; this .value = value; } @Override public String toString () { return "Message{" + "id=" + id + ", value='" + value + '\'' + '}' ; } }
测试结果: