Thread-如何从整体上理解线程同步(线程并发同步概述)

在多线程编程中,线程间的通信是确保程序正确性和性能的关键。线程间通信主要有两种方式:资源共享和消息传递。这两种方式各有优势和适用场景,选择合适的通信方式对于开发高效、可靠的并发程序至关重要。因此线程同步总体可分为资源共享同步和消息传递同步。

资源共享

资源共享是线程间通信的一种方式,其中一个或多个线程可以访问和操作共享变量或数据结构。资源共享模型允许线程直接访问内存中的共享数据,从而实现线程间的隐式通信。

特点

  • 直接访问:线程可以直接访问共享资源,无需通过其他机制传递数据。
  • 同步需求:为了确保数据的一致性和防止竞争条件,通常需要同步机制来控制对共享资源的访问。

消息传递

消息传递是线程间通信的另一种方式,线程通过发送和接收消息来交换数据,而不是直接访问共享资源。这种方式可以避免复杂的同步问题,因为每个线程都独立地处理自己的消息。

特点

  • 解耦合:线程通过消息进行通信,不需要知道其他线程的内部状态。
  • 避免同步问题:由于不直接访问共享资源,可以减少同步机制的需求。


资源共享同步模型

资源共享同步方式有两种 锁资源和阻塞访问线程以及混合型

互斥锁(Mutex)

  1. 定义:互斥锁是一种同步机制,用于保护共享资源,确保任何时候只有一个线程可以访问该资源。
  2. 用途:互斥锁通常用于保护那些只能被一个线程修改的数据结构,或者在执行某些不能并行的操作时。
  3. 工作原理
  • 当一个线程想要访问被互斥锁保护的资源时,它必须先尝试获取锁。
  • 如果锁已经被其他线程占用,那么这个线程将被阻塞,直到锁被释放。
  • 一旦线程获取了锁,它就可以安全地访问资源,并且在访问完毕后释放锁,以便其他线程可以获取锁。
  1. 优点:简单易用,能够有效地防止数据竞争和条件竞争。
  2. 缺点:可能导致线程饥饿,特别是在锁竞争激烈的情况下。
代码示例-互斥锁
  1. public class MutexExample {
  2.    // 共享资源
  3.    private int counter = 0;
  4.    // 用于同步的方法
  5.    public synchronized void increment() {
  6.        counter++;  // 原子操作,但为了演示,我们将其拆分为多步
  7.        System.out.println(Thread.currentThread().getName() + " incremented counter to " + counter);
  8.   }
  9.    public int getCounter() {
  10.        return counter;
  11.   }
  12. }

共享锁(Shared Lock)

  1. 定义:共享锁是一种允许多个线程同时访问共享资源的同步机制,但不允许线程独占资源。
  2. 用途:共享锁通常用于读操作频繁的场景,允许多个线程同时读取数据,但写操作需要独占资源。
  3. 工作原理
  • 多个线程可以同时获取共享锁,只要没有线程持有独占锁。
  • 当一个线程想要写入资源时,它必须获取独占锁,这会阻止其他线程获取共享锁,直到独占锁被释放。
  • 共享锁通常与读写锁(Read-Write Lock)一起使用,读写锁是一种特殊的共享锁,它区分了读操作和写操作。
  1. 优点:提高了并发性能,特别是在读操作远多于写操作的场景中。
  2. 缺点:实现复杂,需要仔细处理读-写冲突和写-写冲突。
代码示例-共享锁
  1. import java.util.concurrent.locks.ReadWriteLock;
  2. import java.util.concurrent.locks.ReentrantReadWriteLock;
  3. public class SharedLockExample {
  4.    private int data = 0;
  5.    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  6.    public void increment() {
  7.        // 获取写锁
  8.        readWriteLock.writeLock().lock();
  9.        try {
  10.            data++;
  11.            System.out.println(Thread.currentThread().getName() + " incremented data to " + data);
  12.       } finally {
  13.            // 释放写锁
  14.            readWriteLock.writeLock().unlock();
  15.       }
  16.   }
  17.    public int getData() {
  18.        // 获取读锁
  19.        readWriteLock.readLock().lock();
  20.        try {
  21.            return data;
  22.       } finally {
  23.            // 释放读锁
  24.            readWriteLock.readLock().unlock();
  25.       }
  26.   }
  27. }

条件变量

条件变量 Condition 是一种同步机制,它允许线程在某些条件尚未满足时挂起(等待),直到其他线程通知这些条件已经发生改变

  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.ReentrantLock;
  3. public class ProducerConsumerExample {
  4.    private final ReentrantLock lock = new ReentrantLock();
  5.    private final Condition notEmpty = lock.newCondition();
  6.    private final Condition notFull = lock.newCondition();
  7.    private static final int SIZE = 10;
  8.    private int[] buffer = new int[SIZE];
  9.    private int putPtr, takePtr, count;
  10.    public void put(int v) throws InterruptedException {
  11.        lock.lock();
  12.        try {
  13.            while (count == SIZE) {
  14.                notEmpty.await();
  15.           }
  16.            buffer[putPtr] = v;
  17.            putPtr = (putPtr + 1) % SIZE;
  18.            ++count;
  19.            notFull.signal();
  20.       } finally {
  21.            lock.unlock();
  22.       }
  23.   }
  24.    public int take() throws InterruptedException {
  25.        lock.lock();
  26.        try {
  27.            while (count == 0) {
  28.                notFull.await();
  29.           }
  30.            int v = buffer[takePtr];
  31.            takePtr = (takePtr + 1) % SIZE;
  32.            --count;
  33.            notEmpty.signal();
  34.            return v;
  35.       } finally {
  36.            lock.unlock();
  37.       }
  38.   }
  39. }

信号量

信号量(Semaphore)是一种更为复杂的同步机制,用于控制对有限资源的访问。它允许多个线程或进程访问有限数量的资源,同时防止资源被过载。信号量可以用来控制线程之间的执行顺序,或者用于线程间的计数和通信。

基本概念
  1. 计数器 :信号量内部维护一个计数器,表示可用资源的数量。
  2. 许可(Permission) :信号量中的计数器值通常被称为许可。当一个线程想要进入一个代码段时,它必须首先从信号量中获取一个许可。
  3. P操作(等待/获取) :当线程请求进入受保护的代码段时,它会执行P操作(Proberen,荷兰语中的“测试”),这会尝试减少信号量的计数器。如果计数器值大于0,它将减少一个单位,线程继续执行。如果计数器值为0,则线程将被阻塞,直到信号量中再次有可用的许可。
  4. V操作(释放/信号) :当线程离开受保护的代码段时,它会执行V操作(Verhogen,荷兰语中的“增加”),这会增加信号量的计数器。这可能会释放一个正在等待的线程,允许它继续执行。
  1. import java.util.concurrent.Semaphore;
  2. public class SemaphoreExample {
  3.    private final Semaphore semaphore;
  4.    public SemaphoreExample(int permits) {
  5.        semaphore = new Semaphore(permits);
  6.   }
  7.    public void executeTask() {
  8.        try {
  9.            // 尝试获取一个许可
  10.            semaphore.acquire();
  11.            try {
  12.                Thread.sleep(1000);
  13.           } finally {
  14.                semaphore.release();
  15.           }
  16.       } catch (InterruptedException e) {
  17.            e.printStackTrace();
  18.       }
  19.   }
  20. }

消息传递同步模型

  1. 消息队列: 基于队列的同步模型,队列模型用于在线程或进程之间传递消息。
  2. 管道基于IO的同步模型,IO是管道模型,一个线程的输出成为另一个线程的输入。
  3. 事件基于事件的同步模型,事件是一种信号,用于通知线程某个特定的状态或条件已经发生。

代码示例-消息队列

  1. import java.util.concurrent.BlockingQueue;
  2. import java.util.concurrent.LinkedBlockingQueue;
  3. public class MessageQueueSyncModel {
  4.    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
  5.    // 生产者方法,将消息放入队列
  6.    public void produce(String message) {
  7.        try {
  8.            queue.put(message); // 将消息放入队列,如果队列满了,会阻塞直到队列有空间
  9.            System.out.println("Produced: " + message);
  10.       } catch (InterruptedException e) {
  11.            Thread.currentThread().interrupt(); // 重新设置中断状态
  12.            System.out.println("Producer interrupted.");
  13.       }
  14.   }
  15.    // 消费者方法,从队列中获取消息
  16.    public String consume() {
  17.        try {
  18.            String message = queue.take(); // 从队列中取出消息,如果队列为空,会阻塞直到队列中有消息
  19.            System.out.println("Consumed: " + message);
  20.            return message;
  21.       } catch (InterruptedException e) {
  22.            Thread.currentThread().interrupt(); // 重新设置中断状态
  23.            System.out.println("Consumer interrupted.");
  24.            return null;
  25.       }
  26.   }
  27. }

代码示例-管道

pip 管道是单向的

  1. import java.io.IOException;
  2. import java.io.PipedInputStream;
  3. import java.io.PipedOutputStream;
  4. public class PipedExample {
  5.    public static void main(String[] args) {
  6.        // 创建管道输出流和管道输入流
  7.        PipedOutputStream pos = new PipedOutputStream();
  8.        PipedInputStream pis = new PipedInputStream();
  9.        try {
  10.            // 将输入流和输出流连接起来
  11.            pis.connect(pos);
  12.            // 创建生产者线程
  13.            Thread producerThread = new Thread(new Producer(pis));
  14.            producerThread.start();
  15.            // 创建消费者线程
  16.            Thread consumerThread = new Thread(new Consumer(pos));
  17.            consumerThread.start();
  18.            // 等待线程结束
  19.            producerThread.join();
  20.            consumerThread.join();
  21.       } catch (IOException e) {
  22.            e.printStackTrace();
  23.       } catch (InterruptedException e) {
  24.            e.printStackTrace();
  25.       }
  26.   }
  27. }

代码示例-事件同步

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. // 定义事件对象
  6. class CustomEvent extends java.util.EventObject {
  7.    private String message;
  8.    public CustomEvent(Object source, String message) {
  9.        super(source);
  10.        this.message = message;
  11.   }
  12.    public String getMessage() {
  13.        return message;
  14.   }
  15. }
  16. // 定义事件监听器接口
  17. interface CustomEventListener {
  18.    void onCustomEvent(CustomEvent event);
  19. }
  20. // 事件源类
  21. class EventSource {
  22.    private final List<CustomEventListener> listeners = new ArrayList<>();
  23.    private final ExecutorService executor = Executors.newCachedThreadPool();
  24.    public void addCustomEventListener(CustomEventListener listener) {
  25.        listeners.add(listener);
  26.   }
  27.    public void removeCustomEventListener(CustomEventListener listener) {
  28.        listeners.remove(listener);
  29.   }
  30.    public void fireEvent(String message) {
  31.        CustomEvent event = new CustomEvent(this, message);
  32.        for (CustomEventListener listener : listeners) {
  33.            executor.submit(() -> listener.onCustomEvent(event));
  34.       }
  35.   }
  36.  
  37.    public void shutdown() {
  38.        executor.shutdown();
  39.   }
  40. }
  41. // 事件监听器实现
  42. class EventListenerImpl implements CustomEventListener {
  43.    @Override
  44.    public void onCustomEvent(CustomEvent event) {
  45.        System.out.println("Event received: " + event.getMessage());
  46.        // 处理事件...
  47.   }
  48. }
  49. public class EventBasedCommunicationExample {
  50.    public static void main(String[] args) {
  51.        EventSource eventSource = new EventSource();
  52.        EventListenerImpl listener = new EventListenerImpl();
  53.        eventSource.addCustomEventListener(listener);
  54.        // 在单独的线程中发送事件
  55.        new Thread(() -> {
  56.            eventSource.fireEvent("Event from Thread 1");
  57.            eventSource.fireEvent("Event from Thread 2");
  58.       }).start();
  59.        eventSource.shutdown(); // 关闭ExecutorService
  60.   }
  61. }

混合型同步模型

混合模型结合了共享内存和消息传递的优点,通过共享内存和消息传递的结合使用,提供了更灵活的同步机制。

代码 略

总结

本文只是线程同步的一个简单概述,简单的梳理下同步的模型。在多线程编程中,线程间的通信是确保程序正确性和性能的关键。线程间通信主要有两种方式:资源共享和消息传递。资源共享允许线程直接访问内存中的共享数据,而消息传递则通过发送和接收消息来交换数据。这两种方式各有优势和适用场景,选择合适的通信方式对于开发高效、可靠的并发程序至关重要。