這篇文章主要從4個角度來講多線程間的通信:

  1. 使用wait/notify實現線程間的通信
  2. 生產者/消費者模式的實現
  3. 方法join的使用
  4. ThreadLocal類的使用

等待/通知機制的實現

(1)wait()方法屬於Object類,作用是讓當前執行代碼的線程進行等待,該方法用來將當前線程置於"預執行隊列"中,並且在wait()所在的代碼行處停止執行,直到接到通知或者被中斷為止。在調用wait()方法之前,線程必須獲得該對象的對象級別鎖,只能在同步方法或者同步塊中調用wait()方法。在執行wait()方法後,當前線程釋放鎖。在從wait()返回前,線程與其他線程競爭重新獲得鎖。如果調用wait()時沒有持有適當的鎖,就會拋出IllegalMonitorStateException異常,它是RuntimeException的一個子類。因此不需要進行異常捕獲。

此處有個面試題,是關於為什麼wait()方法必須在同步中?佔小狼的公眾號給出了答案,lost wake up問題,作者給出了生產和消費的模式距離來說明lost wake up問題,並給出要解決就必須同步,獲取同一對象鎖,連接如下:

阿里面試題,為什麼wait()方法要放在同步塊中??

mp.weixin.qq.com
圖標

(2)notify()方法也需要在同步方法或者同步代碼塊中調用,在調用前,線程必須獲得該對象級別的鎖,如果調用notify()時沒有持有適當的鎖,就會拋出IllegalMonitorStateException異常。該方法用來通知那些可能等待對象的對象鎖的其他線程,如果有多個線程等待,則由線程規劃器隨機挑選其中一個呈wait狀態的線程,對其發出notify,並使它等待獲取該對象的對象鎖。需要說明的是,在執行notify()方法後,當前線程不會馬上釋放鎖,呈wait狀態的線程也不會馬上獲得鎖,必須等到notify()方法的線程執行完,走出同步代碼塊或則方法的時候,當前線程才會釋放鎖,wait狀態的線程纔可以獲得鎖。

notify()方法可以隨機喚醒等待隊列中等待同一共享資源的一個線程,並使線程退出等待隊列,進入可運行的狀態,該方法一次只可以喚醒一個線程

notifyAll()方法是可以喚醒所有正在等待隊列中等待同一共享資源的全部線程,讓其從等待狀態退出,進入可運行狀態,誰的優先順序高,誰將會先被執行,也有可能隨機執行,取決於JVM虛擬機的實現。

下面這段代碼,因為沒有「對象監視器」,沒有同步鎖的原因,所以出現了異常

package com.july.eight;

public class Test1 {
public static void main(String[] args) {
try {
String str = new String("");
str.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

異常:

Exception in thread "main" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at com.july.eight.Test1.main(Test1.java:7)

下面這段代碼,雖然使用了synchronized關鍵字,而且wait方法也在同步塊中,但是因為當前線程main被掛起,一直處於等待,所以wait()方法後面的代碼都沒有執行機會。

package com.july.eight;

public class Test2 {
public static void main(String[] args) {
try {
String lock = new String();
System.out.println("sync 上面");
synchronized (lock){
System.out.println("come into synchronized");
lock.wait();
System.out.println("under wait method");
}
System.out.println("come out of synchronized");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

運行結果:

sync 上面
come into synchronized

下面這段代碼實現了線程間的通信,線程A先啟動執行,然後調用wait方法,線程睡眠3秒,然後線程B啟動執行,並執行了notify()方法,通知喚醒wait()的線程,當線程B執行完synchronized同步代碼塊,然後釋放了對象鎖,wait()的線程獲取到了對象鎖,然後繼續執行。

package com.july.eight;

public class Test3 {
public static void main(String[] args) {
try {
Object lock = new Object();
MyThread1 myThread1 = new MyThread1(lock);
Thread t1 = new Thread(myThread1);
t1.start();
Thread.sleep(3000);
MyThread2 myThread2 = new MyThread2(lock);
Thread t2 = new Thread(myThread2);
t2.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class MyThread1 implements Runnable{

private Object lock;

public MyThread1(Object lock) {
this.lock = lock;
}

@Override
public void run() {
try {
synchronized (lock){
System.out.println(" begin wait time :" + System.currentTimeMillis());
lock.wait();
System.out.println(" end wait time : " + System.currentTimeMillis());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class MyThread2 implements Runnable{
private Object lock;

public MyThread2(Object lock) {
this.lock = lock;
}

@Override
public void run() {
synchronized (lock){
System.out.println("notify begin time :" + System.currentTimeMillis());
lock.notify();
System.out.println("notify end time: " +System.currentTimeMillis());
}
}
}

運行結果:

begin wait time :1559960722343
notify begin time :1559960725347
notify end time: 1559960725347
end wait time : 1559960725348

下面這段代碼,是當list中添加元素5個的時候,然後就notify()另外一個wait()的線程。

package com.july.eight;

import java.util.ArrayList;
import java.util.List;

public class Test4 {
public static void main(String[] args) {
try {
Object lock = new Object();
ThreadA a = new ThreadA(lock);
a.start();
Thread.sleep(50);
ThreadB b = new ThreadB(lock);
b.start();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

class MyList{
private static List list = new ArrayList();
public static void add(){
list.add("abc");
}
public static int size(){
return list.size();
}
}

class ThreadA extends Thread{
private Object lock;

public ThreadA(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
try {
synchronized (lock) {
if (MyList.size() != 5) {
System.out.println("wait begin " + System.currentTimeMillis());
lock.wait();
System.out.println("wait end " + System.currentTimeMillis());
}
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

class ThreadB extends Thread{
private Object lock;

public ThreadB(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
try {
synchronized (lock) {
for (int i = 0; i <10; i++) {
MyList.add();
if (MyList.size() == 5) {
lock.notify();
System.out.println("notify has been sent");
}
System.out.println("has add elements " + (i+1) );
Thread.sleep(1000);
}
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

運行結果:

wait begin 1559961654069
has add elements 1
has add elements 2
has add elements 3
has add elements 4
notify has been sent
has add elements 5
has add elements 6
has add elements 7
has add elements 8
has add elements 9
has add elements 10
wait end 1559961664126

從運行結果可以看出,最開始是wait begin,結束是wait end。

線程的生命週期:

大致分為創建、可運行、運行、阻塞、銷毀等五個狀態。

其中可運行狀態和運行狀態可以相互轉換,阻塞狀態和可運行狀態可以相互轉換。

線程進入Runnable狀態的情況

(1)調用sleep()方法後經過的時間超過了指定的休眠時間

(2)線程調用的阻塞IO已經返回,阻塞方法執行完畢

(3)線程成功得獲得了試圖同步的監視器

(4)線程正在等待某個通知,其他線程發出了通知

(5)處於掛起狀態的線程調用了resume方法恢複線程

線程出現阻塞狀態的情況

(1)線程調用sleep方法,主動放棄佔用的處理器資源

(2)線程調用了阻塞式IO方法,在方法返回前,該線程被阻塞

(3)線程試圖獲得一個同步監視器,但是該同步監視器正被其他線程所持有

(4)線程等待某個通知

(5)程序調用了suspend()方法,掛起該線程,此方法容易導致死鎖,應該避免調用。

wait方法執行後,鎖會自動釋放;notify()方法執行後,鎖不會自動釋放,除非執行完對應的synchronized方法,才會釋放鎖;sleep方法也是不釋放鎖的(????sleep方法不是指定線程休眠時間,休眠時間過後,線程就i 重新獲得CPU,去執行嗎????我的理解:多線程情況下,其中一個線程在同步塊中被指定休眠時間,這個線程是在休眠期間不會釋放鎖,在休眠結束,同步塊執行完以後就會釋放對象鎖

執行完同步代碼塊會釋放對象鎖;

在執行同步代碼塊的過程中,如果遇到異常導致線程終止,鎖也會被釋放;

在執行同步代碼塊的過程中,執行了鎖所屬對象的wait()方法,這個線程會釋放對象鎖,而此線程對象會進入線程等待池中,等待被喚醒

wait(long)方法:等待某一時間內是否有線程對鎖進行喚醒,如果超過這個時間則自動喚醒。

下面代碼使用wait,分別有兩個線程執行,都執行了同步方法,從控制檯可以看出,程序依然沒有執行完,正是因為A線程執行了wait()方法後,釋放了對象鎖,B線程纔可以獲取對象鎖,然後執行同步代碼塊,執行wait。

package com.july.eight;

public class Test5 {
public static void main(String[] args) {
Object lock = new Object();
ThreadA1 a = new ThreadA1(lock);
a.start();
ThreadA2 b = new ThreadA2(lock);
b.start();
}
}

class Service{
public void testMethod(Object lock){
try {
synchronized (lock){
System.out.println("begin wait()");
lock.wait();
System.out.println("end wait()");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class ThreadA1 extends Thread{
private Object lock;

public ThreadA1(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}

class ThreadA2 extends Thread{
private Object lock;

public ThreadA2(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
Service service = new Service();
service.testMethod(lock);
}
}

運行結果:

begin wait()
begin wait()

下面的程序中使用了wait和notify結合,其中wait執行完,其他線程還可以繼續執行同步代碼塊,但是執行到notify的時候,程序就一直不能結束,原因是notify()方法指定完後沒有釋放對象鎖,所以才導致這個問題發生,代碼和運行結果如下所示:

package com.july.eight;

public class Test6 {
public static void main(String[] args) {
Object lock = new Object();
ThreadA6 a = new ThreadA6(lock);
a.start();
ThreadB6 notifyThread = new ThreadB6(lock);
notifyThread.start();
SyncNotifyMethodThread c = new SyncNotifyMethodThread(lock);
c.start();
}
}

class Test6Service{
public void testMethod(Object lock){
try{
synchronized (lock){
System.out.println("begin wait() thread name = " + Thread.currentThread().getName());
lock.wait();
System.out.println("end wait() thread name = " + Thread.currentThread().getName());
}
}catch (InterruptedException e){
e.printStackTrace();
}
}

public void synNotifyMethod(Object lock){
try{
synchronized (lock){
System.out.println("begin notify() thread name = " + Thread.currentThread().getName()
+ " time : " + System.currentTimeMillis());
lock.wait();
System.out.println("end notify() thread name = " + Thread.currentThread().getName()
+ " time : " + System.currentTimeMillis());
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

class ThreadA6 extends Thread{
private Object lock;

public ThreadA6(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
Test6Service service = new Test6Service();
//調用wait
service.testMethod(lock);
}
}

class ThreadB6 extends Thread{
private Object lock;

public ThreadB6(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
Test6Service service = new Test6Service();
//調用notify
service.synNotifyMethod(lock);
}
}

class SyncNotifyMethodThread extends Thread{
private Object lock;

public SyncNotifyMethodThread(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
Test6Service service = new Test6Service();
service.synNotifyMethod(lock);
}
}

begin wait() thread name = Thread-0
begin notify() thread name = Thread-1 time : 1559967797417
begin notify() thread name = Thread-2 time : 1559967797417

當interrupt()方法和wait()方法相遇,下面是示例代碼和運行結果:

package com.july.eight;

public class Test7 {
public static void main(String[] args) {
try {
Object lock = new Object();
ThreadSeven a = new ThreadSeven(lock);
a.start();
Thread.sleep(5000);
a.interrupt();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

class ServiceSeven{
public void testMethod(Object lock){
try {
synchronized (lock){
System.out.println("begin wait()");
lock.wait();
System.out.println("end wait()");
}
}catch (InterruptedException e){
e.printStackTrace();
System.out.println("throws exception ,beacuse of the wait() method is interrupted by interrupt()");
}
}
}

class ThreadSeven extends Thread{
private Object lock;

public ThreadSeven(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
ServiceSeven serviceSeven = new ServiceSeven();
serviceSeven.testMethod(lock);
}
}

begin wait()
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at com.july.eight.ServiceSeven.testMethod(Test7.java:22)
at com.july.eight.ThreadSeven.run(Test7.java:43)
throws exception ,beacuse of the wait() method is interrupted by interrupt()

下面這段代碼是關於notify()方法,一次只可以喚醒一個線程(隨機)的驗證:

package com.july.eight;

public class Test8 {
public static void main(String[] args) {
try {
Object lock = new Object();
EightThreadOne a = new EightThreadOne(lock);
a.start();
EightThreadTwo b = new EightThreadTwo(lock);
b.start();
EightThreadThree c = new EightThreadThree(lock);
c.start();
Thread.sleep(1000);
NotifyThread notifyThread = new NotifyThread(lock);
notifyThread.start();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

class ServiceEight{
public void testMethod(Object lock){
try {
synchronized (lock){
System.out.println("begin wait() thread name = " + Thread.currentThread().getName());
lock.wait();
System.out.println("end wait() thread name = " + Thread.currentThread().getName());
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

class EightThreadOne extends Thread{
private Object lock;

public EightThreadOne(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
ServiceEight serviceEight = new ServiceEight();
serviceEight.testMethod(lock);
}
}

class EightThreadTwo extends Thread{
private Object lock;

public EightThreadTwo(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
ServiceEight serviceEight = new ServiceEight();
serviceEight.testMethod(lock);
}
}

class EightThreadThree extends Thread{
private Object lock;

public EightThreadThree(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
ServiceEight serviceEight = new ServiceEight();
serviceEight.testMethod(lock);
}
}

class NotifyThread extends Thread{
private Object lock;

public NotifyThread(Object lock) {
super();
this.lock = lock;
}

@Override
public void run() {
synchronized (lock){
//如果要喚醒兩個或者更多,那麼下面就要追加lock.notify()
lock.notify();
}
}
}

程序運行結果:

begin wait() thread name = Thread-0
begin wait() thread name = Thread-1
begin wait() thread name = Thread-2
end wait() thread name = Thread-0

多線程中的生產者和消費者模式

(1)一個生產者和一個消費者:操作值

package com.july.eight.pc.one;

public class TestProductorConsumerOne {
public static void main(String[] args) {
String lock = new String("");
Productor productor = new Productor(lock);
Consumer consumer = new Consumer(lock);
ProductThread productThread = new ProductThread(productor);
ConsumerThread consumerThread = new ConsumerThread(consumer);
productThread.start();
consumerThread.start();
}
}

/**
* 生產者
*/
class Productor{
private String lock;

public Productor(String lock) {
super();
this.lock = lock;
}

public void setValue(){
try {
synchronized (lock){
if (!ValueObject.value.equals("")){
lock.wait();
}
String value = System.currentTimeMillis() + "_" + System.nanoTime();
System.out.println("set value is : " + value);
ValueObject.value = value;
lock.notify();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

/**
* 消費者
*/
class Consumer{
private String lock;

public Consumer(String lock) {
this.lock = lock;
}

public void getValue(){
try {
synchronized (lock){
if (ValueObject.value.equals("")){
lock.wait();
}
String value = System.currentTimeMillis() + "_" + System.nanoTime();
System.out.println("get value is : " + ValueObject.value);
ValueObject.value = "";
lock.notify();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

class ValueObject {
public static String value = "";
}

class ProductThread extends Thread{
private Productor productor;

public ProductThread(Productor productor) {
super();
this.productor = productor;
}

@Override
public void run() {
while (true){
productor.setValue();
}
}
}

class ConsumerThread extends Thread{
private Consumer consumer;

public ConsumerThread(Consumer consumer) {
super();
this.consumer = consumer;
}

@Override
public void run() {
while (true){
consumer.getValue();
}
}
}

運行結果:set和get交替執行

set value is : 1559980830622_188397570568700
get value is : 1559980830622_188397570568700
set value is : 1559980830622_188397570583300
get value is : 1559980830622_188397570583300

(2)多生產者與多消費者:操作值:假死

下面的這段代碼,因為存在使用notify(),可能生產喚醒生產者,消費者喚醒消費者,所以會造成假死狀態。

package com.july.eight.pc.two;

public class TestProductorConsumerTwo {
public static void main(String[] args) {
try{
String lock = new String("");
Productor productor = new Productor(lock);
Consumer consumer = new Consumer(lock);
ProductThread[] productThread = new ProductThread[2];
ConsumerThread[] consumerThread = new ConsumerThread[2];
for (int i = 0; i < 2; i++) {
productThread[i] = new ProductThread(productor);
productThread[i].setName("productor " + (i+1));
consumerThread[i] = new ConsumerThread(consumer);
consumerThread[i].setName("consumer " + (i+1));
productThread[i].start();
consumerThread[i].start();
}
Thread.sleep(5000);
Thread[] threadArray = new Thread[Thread.currentThread().getThreadGroup().activeCount()];
Thread.currentThread().getThreadGroup().enumerate(threadArray);
for (int i = 0; i < threadArray.length; i++) {
System.out.println(threadArray[i].getName() + " " + threadArray[i].getState());
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

/**
* 生產者
*/
class Productor{
private String lock;

public Productor(String lock) {
super();
this.lock = lock;
}

public void setValue(){
try {
synchronized (lock){
while (!ValueObject.value.equals("")){
System.out.println("productor " + Thread.currentThread().getName() + " warning ※");
lock.wait();
}
System.out.println("productor " +Thread.currentThread().getName() + " runnable");
String value = System.currentTimeMillis() + "_" + System.nanoTime();
ValueObject.value = value;
lock.notify();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

/**
* 消費者
*/
class Consumer{
private String lock;

public Consumer(String lock) {
this.lock = lock;
}

public void getValue(){
try {
synchronized (lock){
while (ValueObject.value.equals("")){
System.out.println("consumer " + Thread.currentThread().getName() + " warning △");
lock.wait();
}
System.out.println("consumer " +Thread.currentThread().getName() + " runnable");
ValueObject.value = "";
lock.notify();
}
}catch (InterruptedException e){
e.printStackTrace();
}
}
}

class ValueObject {
public static String value = "";
}

class ProductThread extends Thread{
private Productor productor;

public ProductThread(Productor productor) {
super();
this.productor = productor;
}

@Override
public void run() {
while (true){
productor.setValue();
}
}
}

class ConsumerThread extends Thread{
private Consumer consumer;

public ConsumerThread(Consumer consumer) {
super();
this.consumer = consumer;
}

@Override
public void run() {
while (true){
consumer.getValue();
}
}
}

運行結果:

productor productor 1 runnable
productor productor 1 warning
consumer consumer 1 runnable
consumer consumer 1 warning
productor productor 1 runnable
productor productor 1 warning
consumer consumer 1 runnable
consumer consumer 1 warning
productor productor 1 runnable
productor productor 1 warning
consumer consumer 1 runnable
consumer consumer 1 warning
productor productor 1 runnable
productor productor 1 warning
consumer consumer 1 runnable
consumer consumer 1 warning
productor productor 1 runnable
productor productor 1 warning
consumer consumer 1 runnable
consumer consumer 1 warning
productor productor 1 runnable
productor productor 1 warning
consumer consumer 1 runnable
consumer consumer 1 warning
productor productor 1 runnable
productor productor 1 warning
consumer consumer 1 runnable
consumer consumer 1 warning
productor productor 1 runnable
productor productor 1 warning
consumer consumer 1 runnable
consumer consumer 1 warning
productor productor 1 runnable
productor productor 1 warning
consumer consumer 1 runnable
consumer consumer 1 warning
productor productor 1 runnable
productor productor 1 warning
consumer consumer 2 runnable
consumer consumer 2 warning
consumer consumer 1 warning
productor productor 1 runnable
productor productor 1 warning
productor productor 2 warning
consumer consumer 2 runnable
consumer consumer 2 warning
consumer consumer 1 warning
main RUNNABLE
Monitor Ctrl-Break RUNNABLE
productor 1 WAITING
consumer 1 WAITING
productor 2 WAITING
consumer 2 WAITING

推薦閱讀:

相關文章