Thread-如何从整体上理解线程同步(线程并发同步概述)
在多线程编程中,线程间的通信是确保程序正确性和性能的关键。线程间通信主要有两种方式:资源共享和消息传递。这两种方式各有优势和适用场景,选择合适的通信方式对于开发高效、可靠的并发程序至关重要。因此线程同步总体可分为资源共享同步和消息传递同步。
资源共享
资源共享是线程间通信的一种方式,其中一个或多个线程可以访问和操作共享变量或数据结构。资源共享模型允许线程直接访问内存中的共享数据,从而实现线程间的隐式通信。
特点
- 直接访问:线程可以直接访问共享资源,无需通过其他机制传递数据。
- 同步需求:为了确保数据的一致性和防止竞争条件,通常需要同步机制来控制对共享资源的访问。
消息传递
消息传递是线程间通信的另一种方式,线程通过发送和接收消息来交换数据,而不是直接访问共享资源。这种方式可以避免复杂的同步问题,因为每个线程都独立地处理自己的消息。
特点
- 解耦合:线程通过消息进行通信,不需要知道其他线程的内部状态。
- 避免同步问题:由于不直接访问共享资源,可以减少同步机制的需求。
资源共享同步模型
资源共享同步方式有两种 锁资源和阻塞访问线程以及混合型
锁
互斥锁(Mutex)
- 定义:互斥锁是一种同步机制,用于保护共享资源,确保任何时候只有一个线程可以访问该资源。
- 用途:互斥锁通常用于保护那些只能被一个线程修改的数据结构,或者在执行某些不能并行的操作时。
- 工作原理
- 当一个线程想要访问被互斥锁保护的资源时,它必须先尝试获取锁。
- 如果锁已经被其他线程占用,那么这个线程将被阻塞,直到锁被释放。
- 一旦线程获取了锁,它就可以安全地访问资源,并且在访问完毕后释放锁,以便其他线程可以获取锁。
- 优点:简单易用,能够有效地防止数据竞争和条件竞争。
- 缺点:可能导致线程饥饿,特别是在锁竞争激烈的情况下。
代码示例-互斥锁
public class MutexExample {
// 共享资源
private int counter = 0;
// 用于同步的方法
public synchronized void increment() {
counter++; // 原子操作,但为了演示,我们将其拆分为多步
System.out.println(Thread.currentThread().getName() + " incremented counter to " + counter);
}
public int getCounter() {
return counter;
}
}
共享锁(Shared Lock)
- 定义:共享锁是一种允许多个线程同时访问共享资源的同步机制,但不允许线程独占资源。
- 用途:共享锁通常用于读操作频繁的场景,允许多个线程同时读取数据,但写操作需要独占资源。
- 工作原理
- 多个线程可以同时获取共享锁,只要没有线程持有独占锁。
- 当一个线程想要写入资源时,它必须获取独占锁,这会阻止其他线程获取共享锁,直到独占锁被释放。
- 共享锁通常与读写锁(Read-Write Lock)一起使用,读写锁是一种特殊的共享锁,它区分了读操作和写操作。
- 优点:提高了并发性能,特别是在读操作远多于写操作的场景中。
- 缺点:实现复杂,需要仔细处理读-写冲突和写-写冲突。
代码示例-共享锁
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SharedLockExample {
private int data = 0;
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void increment() {
// 获取写锁
readWriteLock.writeLock().lock();
try {
data++;
System.out.println(Thread.currentThread().getName() + " incremented data to " + data);
} finally {
// 释放写锁
readWriteLock.writeLock().unlock();
}
}
public int getData() {
// 获取读锁
readWriteLock.readLock().lock();
try {
return data;
} finally {
// 释放读锁
readWriteLock.readLock().unlock();
}
}
}
条件变量
条件变量 Condition
是一种同步机制,它允许线程在某些条件尚未满足时挂起(等待),直到其他线程通知这些条件已经发生改变
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumerExample {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
private static final int SIZE = 10;
private int[] buffer = new int[SIZE];
private int putPtr, takePtr, count;
public void put(int v) throws InterruptedException {
lock.lock();
try {
while (count == SIZE) {
notEmpty.await();
}
buffer[putPtr] = v;
putPtr = (putPtr + 1) % SIZE;
++count;
notFull.signal();
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notFull.await();
}
int v = buffer[takePtr];
takePtr = (takePtr + 1) % SIZE;
--count;
notEmpty.signal();
return v;
} finally {
lock.unlock();
}
}
}
信号量
信号量(Semaphore)是一种更为复杂的同步机制,用于控制对有限资源的访问。它允许多个线程或进程访问有限数量的资源,同时防止资源被过载。信号量可以用来控制线程之间的执行顺序,或者用于线程间的计数和通信。
基本概念
- 计数器 :信号量内部维护一个计数器,表示可用资源的数量。
- 许可(Permission) :信号量中的计数器值通常被称为许可。当一个线程想要进入一个代码段时,它必须首先从信号量中获取一个许可。
- P操作(等待/获取) :当线程请求进入受保护的代码段时,它会执行P操作(Proberen,荷兰语中的“测试”),这会尝试减少信号量的计数器。如果计数器值大于0,它将减少一个单位,线程继续执行。如果计数器值为0,则线程将被阻塞,直到信号量中再次有可用的许可。
- V操作(释放/信号) :当线程离开受保护的代码段时,它会执行V操作(Verhogen,荷兰语中的“增加”),这会增加信号量的计数器。这可能会释放一个正在等待的线程,允许它继续执行。
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
private final Semaphore semaphore;
public SemaphoreExample(int permits) {
semaphore = new Semaphore(permits);
}
public void executeTask() {
try {
// 尝试获取一个许可
semaphore.acquire();
try {
Thread.sleep(1000);
} finally {
semaphore.release();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消息传递同步模型
- 消息队列: 基于队列的同步模型,队列模型用于在线程或进程之间传递消息。
- 管道:基于IO的同步模型,IO是管道模型,一个线程的输出成为另一个线程的输入。
- 事件:基于事件的同步模型,事件是一种信号,用于通知线程某个特定的状态或条件已经发生。
代码示例-消息队列
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class MessageQueueSyncModel {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 生产者方法,将消息放入队列
public void produce(String message) {
try {
queue.put(message); // 将消息放入队列,如果队列满了,会阻塞直到队列有空间
System.out.println("Produced: " + message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断状态
System.out.println("Producer interrupted.");
}
}
// 消费者方法,从队列中获取消息
public String consume() {
try {
String message = queue.take(); // 从队列中取出消息,如果队列为空,会阻塞直到队列中有消息
System.out.println("Consumed: " + message);
return message;
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 重新设置中断状态
System.out.println("Consumer interrupted.");
return null;
}
}
}
代码示例-管道
pip 管道是单向的
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class PipedExample {
public static void main(String[] args) {
// 创建管道输出流和管道输入流
PipedOutputStream pos = new PipedOutputStream();
PipedInputStream pis = new PipedInputStream();
try {
// 将输入流和输出流连接起来
pis.connect(pos);
// 创建生产者线程
Thread producerThread = new Thread(new Producer(pis));
producerThread.start();
// 创建消费者线程
Thread consumerThread = new Thread(new Consumer(pos));
consumerThread.start();
// 等待线程结束
producerThread.join();
consumerThread.join();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
代码示例-事件同步
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
// 定义事件对象
class CustomEvent extends java.util.EventObject {
private String message;
public CustomEvent(Object source, String message) {
super(source);
this.message = message;
}
public String getMessage() {
return message;
}
}
// 定义事件监听器接口
interface CustomEventListener {
void onCustomEvent(CustomEvent event);
}
// 事件源类
class EventSource {
private final List<CustomEventListener> listeners = new ArrayList<>();
private final ExecutorService executor = Executors.newCachedThreadPool();
public void addCustomEventListener(CustomEventListener listener) {
listeners.add(listener);
}
public void removeCustomEventListener(CustomEventListener listener) {
listeners.remove(listener);
}
public void fireEvent(String message) {
CustomEvent event = new CustomEvent(this, message);
for (CustomEventListener listener : listeners) {
executor.submit(() -> listener.onCustomEvent(event));
}
}
public void shutdown() {
executor.shutdown();
}
}
// 事件监听器实现
class EventListenerImpl implements CustomEventListener {
@Override
public void onCustomEvent(CustomEvent event) {
System.out.println("Event received: " + event.getMessage());
// 处理事件...
}
}
public class EventBasedCommunicationExample {
public static void main(String[] args) {
EventSource eventSource = new EventSource();
EventListenerImpl listener = new EventListenerImpl();
eventSource.addCustomEventListener(listener);
// 在单独的线程中发送事件
new Thread(() -> {
eventSource.fireEvent("Event from Thread 1");
eventSource.fireEvent("Event from Thread 2");
}).start();
eventSource.shutdown(); // 关闭ExecutorService
}
}
混合型同步模型
混合模型结合了共享内存和消息传递的优点,通过共享内存和消息传递的结合使用,提供了更灵活的同步机制。
代码 略
总结
本文只是线程同步的一个简单概述,简单的梳理下同步的模型。在多线程编程中,线程间的通信是确保程序正确性和性能的关键。线程间通信主要有两种方式:资源共享和消息传递。资源共享允许线程直接访问内存中的共享数据,而消息传递则通过发送和接收消息来交换数据。这两种方式各有优势和适用场景,选择合适的通信方式对于开发高效、可靠的并发程序至关重要。