线程间有哪些通信方式?
线程间通信是指在多线程编程中,各个线程之间共享信息或者协同完成某一任务的过程。常用的线程间通信方式有以下几种:
- 共享变量:共享变量是指多个线程都可以访问和修改的变量,它们通常是在主线程中创建的。多个线程对同一个共享变量进行读写操作时,可能会出现竞态条件导致数据错误或程序异常。因此需要使用同步机制比如synchronized、Lock等来保证线程安全
- 管道通信:管道是一种基于文件描述符的通信机制,形成一个单向通信的数据流管道。它通常用于只有两个进程或线程之间的通信。其中一个进程将数据写入到管道(管道的输出端口),而另一个进程从管道的输入端口读取数据
- 信号量:信号量是一种计数器,用于控制多个线程对资源的访问。当一个线程需要访问资源时,它需要申请获取信号量,如果信号量的计数器值大于 0,则可以访问资源,否则该线程就会等待。当线程结束访问资源后,需要释放信号量,并将计数器加1
- 条件变量:条件变量是一种通知机制,用于在多个线程之间传递状态信息和控制信息。当某个线程需要等待某个条件变量发生改变时,它可以调用 wait() 方法挂起,并且释放所占用的锁。当某个线程满足条件后,可以调用 notify() 或者 signal() 方法来通知等待该条件变量的线程继续执行
/**
* 共享变量
* 创建人:百里
*/
public class BaiLiSharedMemoryDemo {
public static void main(string[] args) {
ArrayList<Integer> integers = new ArrayList<>();
Thread producerThread = new Thread(() -> {
for (int i = 0; i < 5; i++) {
synchronized (integers) {
integers.add(i);
System.out.println(Thread.currentThread().getName() + "_Producer:" + i);
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "ProducerThread");
Thread consumeThread = new Thread(() -> {
while (true) {
synchronized (integers) {
if (!integers.isEmpty()) {
Integer integer = integers.remove(0);
System.out.println(Thread.currentThread().getName() + "_Consume:" + integer);
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "ConsumeThread");
producerThread.start();
consumeThread.start();
}
}
/**
* 管道通信模式
* 创建人:百里
*/
public class BaiLiPipedStreamDemo {
public static void main(String[] args) throws IOException {
//输出管道
PipedOutputStream pipedOutputStream = new PipedOutputStream();
//输入管道
PipedInputStream pipedInputStream = new PipedInputStream();
pipedInputStream.connect(pipedOutputStream);
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
pipedOutputStream.write(i);
System.out.println(Thread.currentThread().getName() + "_Produce: " + i);
Thread.sleep(2000);
}
pipedOutputStream.close();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}, "ProducerThread");
Thread consumeThread = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
while (true) {
int read = pipedInputStream.read();
if (read != -1) {
System.out.println(Thread.currentThread().getName() + "_Consume: " + read);
} else {
break;
}
Thread.sleep(1000);
}
}
pipedInputStream.close();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}, "ConsumeThread");
producerThread.start();
consumeThread.start();
}
}
/**
* 信号量
* 创建人:百里
*/
public class BaiLiSemaphoreDemo {
public static void main(String[] args) {
// 实例化一个信号量对象,初始值为 0
Semaphore semaphore = new Semaphore(0);
// 创建生产者线程
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "_Producer:" + i);
semaphore.release(); // 把信号量的计数器加 1
Thread.sleep(1000); //模拟停顿
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "ProducerThread");
// 创建消费者线程
Thread consumeThread = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
semaphore.acquire(); // 请求占有信号量,如果计数器不为 0,计数器减 1,否则线程阻塞等待
System.out.println(Thread.currentThread().getName() + "_Consume:" + i);
Thread.sleep(1000); //模拟停顿
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "ConsumeThread");
producerThread.start();
consumeThread.start();
}
}
/**
* 条件变量|可重入锁
* 创建人:百里
*/
public class BaiLIConditionDemo {
public static void main(String[] args) {
// 实例化一个可重入锁对象
ReentrantLock lock = new ReentrantLock();
// 获取该锁对象的条件变量
Condition condition = lock.newCondition();
// 创建生产者线程
Thread producerThread = new Thread(() -> {
try {
lock.lock(); // 获取锁对象
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + " produce: " + i);
condition.signal(); // 唤醒处于等待状态下的消费者线程
condition.await(); // 使当前线程处于等待状态,并释放锁对象
Thread.sleep(1000);
}
condition.signal(); // 避免消费者线程一直等待
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁对象
}
}, "producer");
// 创建消费者线程
Thread consumerThread = new Thread(() -> {
try {
lock.lock(); // 获取锁对象
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + " consume: " + i);
condition.signal(); // 唤醒处于等待状态下的生产者线程
condition.await(); // 使当前线程处于等待状态,并释放锁对象
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock(); // 释放锁对象
}
}, "consumer");
// 启动生产者和消费者线程
producerThread.start();
consumerThread.start();
}
}