并发编程.md 21 KB

为什么wait和notify必须放在synchronized中?

在Java中,wait()notify()Object类的两个方法,它们用于实现线程间的协作。wait()使一个线程进入等待状态,直到另一个线程发出通知唤醒它。而notify()则用于唤醒正在等待的线程。

wait()notify()必须放在synchronized块中是因为这些方法依赖于对象的监视器锁(也称为互斥锁)。只有获得了对象的监视器锁(该锁即为调用wait方法的对象)的线程才能调用wait()notify()方法。如果这些方法不在同步块中使用,就无法保证线程安全性。

为了解决「lost wake up 问题」

两个线程启动,消费者检查 obj.count 的值,发现 obj.count <= 0 条件成立,但这时由于 CPU 的调度,发生上下文切换,生产者开始工作,执行了 count+1obj.notify(),也就是发出通知,准备唤醒一个阻塞的线程。然后 CPU 调度到消费者,此时消费者开始执行 obj.wait(),线程进入阻塞。但生产者已经早在消费者阻塞前执行了唤醒动作,也就导致消费者永远无法醒来了。

1644918-20190619224558253-730645351.png

volatile 关键字

在Java中,volatile关键字是用于保证多线程环境下变量的可见性和有序性。

  1. 保证变量可见性:被volatile关键字声明代表变量是共享且不稳定的,每次使用它都到主存中进行读取,并且强制刷新到内存。
  2. 保证变量有序性:JVM 具有指令重排的特性,可以保证程序执行效率更高,但是volatile关键字会禁止指令重排保证其有序性。

volatile 关键字底层原理

volatile是通过编译器在生成字节码时,在指令序列中添加内存屏障来保证变量可见性。

JMM层面的“内存屏障”:

  • LoadLoad屏障: 对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。
  • StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。
  • LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作执行前,保证Load1要读取的数据被读取完毕。
  • StoreLoad屏障: 对于这样的语句Store1; StoreLoad; Load2,在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见。

JVM的实现会在volatile读写前后均加上内存屏障,在一定程度上保证有序性。如下所示:

LoadLoad volatile 读操作 LoadStore

StoreStore volatile 写操作 StoreLoad

禁止指令重排,汇编层面

lock 前缀:lock不是内存屏障,而是一种锁。执行时会锁住内存子系统来确保执行顺序,甚至跨多个CPU

synchronized 关键字

synchronized 是 Java 中的一个关键字,翻译成中文是同步的意思,主要解决的是多个线程之间访问资源的同步性,可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行。

在JDK1.6之前synchronized关键字基于操作系统mutux互斥变量实现,该操作较为重量级,但是在JDK1.6之后进行的JVM层面的锁优化。效率大大提升,优化后的锁主要分为一下四类

  1. 无锁状态
  2. 偏向锁状态
  3. 轻量级锁状态
  4. 重量级锁状态

锁可以升级,但不能降级。即:无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁是单向的。首先程序判断该对象是否有锁,如果没有锁则将无锁状态转换为偏向锁状态,这里通过CAS操作进行加锁,如果加锁成功之后,就不会再有解锁等操作了。假如有两个线程来竞争该锁话,那么偏向锁就失效了,进而升级成轻量级锁了。

轻量级之所以是轻量级锁,是因为它仅仅使用 CAS 进行操作来获取锁。如果获取成功那么会直接获取锁,如果失败,当前线程便尝试使用自旋来获取锁。当竞争线程的自旋次数达到界限值(threshold),轻量级锁将会膨胀为重量级锁。

重量级锁,是使用操作系统互斥量(mutex)来实现的传统锁。 当所有对锁的优化都失效时,将退回到重量级锁。它与轻量级锁不同竞争的线程不再通过自旋来竞争线程, 而是直接进入堵塞状态,此时不消耗CPU,然后等拥有锁的线程释放锁后,唤醒堵塞的线程, 然后线程再次竞争锁。但是注意,当锁膨胀为重量锁时,就不能再退回到轻量级锁。

synchronized 和 volatile 有什么区别?

  • volatile 关键字是线程同步的轻量级实现,所以性能肯定比synchronized关键字要好 。

  • volatile 关键字只能作用于变量,而 synchronized 关键字可以修饰方法以及代码块 。

  • volatile 关键字能保证数据的可见性。而synchronized 关键字能保证数据原子性。

  • volatile 关键字主要用于解决变量在多个线程之间的可见性,而 synchronized 关键字解决的是多个线程之间访问资源的同步性。

ReentrantLock锁

ReentrantLock 实现了 Lock 接口,是一个可重入且独占式的锁(默认是非公平锁),他的底层是基于CAS+AQS+LockSupport实现的

  • CAS

CAS是Compare and Swap的缩写,即比较并交换。它是一种无锁算法,在多线程编程中用于实现同步操作。简单来说,CAS操作包括三个操作数:内存位置V、预期值A和新值B。当且仅当内存位置V的值等于预期值A时,才将该位置的值更新为新值B。

  • AQS

AQS抽象队列同步器。它是Java并发包中锁和同步工具的核心实现上面的所说的LOCK锁就是基于AQS实现的。

AQS提供了一种通用的框架,用于实现线程间的协作和同步操作。它的核心思想是使用一个先进先出的等待队列来管理线程状态,同时支持独占模式和共享模式两种同步方式。

image-20230506170616683

img

  • LockSupport

基于C语言底层实现,它可以阻塞线程以等待许可证,或者取消线程的阻塞状态,而不需要使用传统的synchronized关键字或Object.wait()/notify()方法。

synchronized 和 ReentrantLock 有什么区别?

  • synchronized 依赖于 JVM 而 ReentrantLock 依赖于 API
  • ReentrantLock可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。
  • synchronized是以代码块形式实现的,因此只能对整个代码块进行同步操作,无法在代码块内部实现一些细粒度的控制。而ReentrantLock可以通过Condition对象实现线程间的协作和控制。

LockSupport和synchronized的区别

  • LockSupport不需要获取锁对象,因此避免了可能出现的死锁问题。
  • notify只能随机释放一个线程,并不能指定某个特定线程,notifyAll是释放锁对象中的所有线程。而unpark方法可以唤醒指定的线程。
  • LockSupport可以先执行unpark()方法,然后再执行park()方法,而Object.notify()方法必须在对应的wait()方法之前执行。

AQS的共享模式

AQS中,共享模式获取锁的顶层入口方法是acquireShared,该方法会获取指定数量的资源,成功的话就直接返回,失败的话就进入等待队列,直到获取资源,

public final void acquireShared(int arg) {
 if (tryAcquireShared(arg) < 0)
  doAcquireShared(arg);
}

该方法里包含了两个方法的调用,

  • tryAcquireShared:尝试获取一定资源的锁,返回的值代表获取锁的状态。

  • doAcquireShared:进入等待队列,并循环尝试获取锁,直到成功。

tryAcquireShared

tryAcquireShared在AQS里没有实现,同样由自定义的同步器去完成具体的逻辑,像一些较为常见的并发工具Semaphore、CountDownLatch里就有对该方法的自定义实现,虽然实现的逻辑不同,但方法的作用是一样的,就是获取一定资源的资源,然后根据返回值判断是否还有剩余资源,从而决定下一步的操作。

返回值有三种定义:

  • 负值代表获取失败;
  • 0代表获取成功,但没有剩余的资源,也就是state已经为0;
  • 正值代表获取成功,而且state还有剩余,其他线程可以继续领取

当返回值小于0时,证明此次获取一定数量的锁失败了,然后就会走doAcquireShared方法

  • doAcquireShared: 此方法的作用是将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回,这是它的源码:
private void doAcquireShared(int arg) {
 // 加入队列尾部
 final Node node = addWaiter(Node.SHARED);
 boolean failed = true;
 try {
  boolean interrupted = false;
  // CAS自旋
  for (;;) {
   final Node p = node.predecessor();
   // 判断前驱结点是否是head
   if (p == head) {
    // 尝试获取一定数量的锁
    int r = tryAcquireShared(arg);
    if (r >= 0) {
     // 获取锁成功,而且还有剩余资源,就设置当前结点为head,并继续唤醒下一个线程
     setHeadAndPropagate(node, r);
     // 让前驱结点去掉引用链,方便被GC
     p.next = null; // help GC
     if (interrupted)
      selfInterrupt();
     failed = false;
     return;
    }
   }
   // 跟独占模式一样,改前驱结点waitStatus为-1,并且当前线程挂起,等待被唤醒
   if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
  }
 } finally {
  if (failed)
   cancelAcquire(node);
 }
}
 
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // head指向自己
    setHead(node);
     // 如果还有剩余量,继续唤醒下一个邻居线程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

按照正常的思维,共享模式是可以多个线程同时执行的才对,所以,多个线程的情况下,如果老大释放完资源,但这部分资源满足不了老二,但能满足老三,那么老三就可以拿到资源。可事实是,从源码设计中可以看出,如果真的发生了这种情况,老三是拿不到资源的,因为等待队列是按顺序排列的,老二的资源需求量大,会把后面量小的老三以及老四、老五等都给卡住。从这一个角度来看,虽然AQS严格保证了顺序,但也降低了并发能力

await

img

线程池

线程池就是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务。线程池的优点和好处主要有以下三点

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

常见的线程池:

  • FixedThreadPool:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。

  • SingleThreadExecutor 该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。

  • CachedThreadPool 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。

  • ScheduledThreadPool:该返回一个用来在给定的延迟后运行任务或者定期执行任务的线程池。

为什么不推荐使用内置线程池?

  • FixedThreadPoolSingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

  • CachedThreadPool:使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

  • ScheduledThreadPoolSingleThreadScheduledExecutor : 使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

总之一句话,他们的队列长度都是MAX_VALUE,可能导致OOM

线程池核心参数

  • corePoolSize : 任务队列未达到队列容量时,最大可以同时运行的线程数量。

  • maximumPoolSize : 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。

  • workQueue: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

  1. 当线程数小于核心线程数时,创建线程。

  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列

  3. 当线程数大于等于核心线程数,且任务队列已满

    • 若线程数小于最大线程数,创建线程。

    • 若线程数等于最大线程数,抛出异常,拒绝任务,执行拒绝策略

线程池的饱和策略有哪些?

  • ThreadPoolExecutor.AbortPolicy 抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy 调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy 此策略将丢弃最早的未处理的任务请求。

如何设定线程池的大小?(CPU 核心数 - N)

Runtime.getRuntime().availableProcessors()

  • CPU 密集型任务(N+1)

  • I/O 密集型任务(2N)

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

线程池,如果使用future,则必须重构拒绝策略

  • Future类的get方法
private static final int NEW          = 0; // 初始状态,FutureTask刚被创建,正在计算中都是该状态。
private static final int COMPLETING   = 1; // 中间状态,表示计算已完成正在对结果进行赋值,或正在处理异常
private static final int NORMAL       = 2; // 终止状态,表示计算已完成,结果已经被赋值。
private static final int EXCEPTIONAL  = 3; // 终止状态,表示计算过程已经被异常打断。
private static final int CANCELLED    = 4; // 终止状态,表示计算过程已经被cancel操作终止。
private static final int INTERRUPTING = 5; // 中间状态,表示计算过程已开始并且被中断,正在修改状态。
private static final int INTERRUPTED  = 6; // 终止状态,表示计算过程已开始并且被中断,目前已完全停止。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 初始状态默认为0
    if (s <= COMPLETING)
        // 如果小于COMPLETING则阻塞
        s = awaitDone(false, 0L);
    return report(s);
}

如果没有调用run方法改变线程状态则future则会被一直阻塞导致OOM

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)·
                // 改变状态
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

ThreadLocal

通常情况下,我们创建的变量是可以被任何一个线程访问并修改的,想要每一个线程都有自己私有的本地变量就就需要使用ThreadLocal类,ThreadLocal类主要解决的就是让每个线程绑定自己的值,可以将ThreadLocal类形象的比喻成存放数据的盒子,盒子中可以存储每个线程的私有数据。

ThreadLocal底层为每一个线程维护了一个ThreadLocalMap,他的set方法如下

public void set(T value) {
    //获取当前请求的线程
    Thread t = Thread.currentThread();
    //取出 Thread 类内部的 threadLocals 变量(哈希表结构)
    ThreadLocalMap map = getMap(t);
    if (map != null)
        // 将需要存储的值放入到这个哈希表中
        map.set(this, value);
    else
        createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

ThreadLocal 内存泄露问题是怎么导致的?

ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,而 value 是强引用。所以,如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。

这样一来,ThreadLocalMap 中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。ThreadLocalMap 实现中已经考虑了这种情况,使用完 ThreadLocal方法后最好手动调用remove()方法

无锁化编程有哪些常见方法?

  1. 原子操作(Atomic Operations):使用原子操作可以保证对某个特定变量的操作是原子性的,即不会被其他线程干扰。比如使用原子加(Atomic Add)操作可以实现无锁的计数器。
  2. 无锁队列(Lock-Free Queue):通过使用原子操作和特定的数据结构,如CAS(Compare-and-Swap),可以实现无锁的队列,避免了使用锁带来的性能开销。无锁队列常用于实现高性能的生产者消费者模型。
  3. 无锁缓存(Lock-Free Caches):无锁缓存是一种无锁化设计的数据结构,用于高效地处理并发场景下的数据访问。通过使用原子操作和一些特殊的算法,可以实现对数据的并发读写而无需使用锁。类似于redis
  4. RCU(Read-Copy-Update):RCU 是一种读写锁的变体,适用于读操作远远多于写操作的场景。它通过使用副本机制,允许读操作和写操作同时进行,而不需要相互锁定。对于旧副本,可以采用延迟释放的方式来确保数据的一致性。CopyOnWrite
  5. 多个生产者和多个消费者,一样可以做到免锁访问,但要使用原子操作。这里的意思应该是不用原子操作级别的免锁,理由也很简单,生产者和消费者需要修改的位置是分开的(生产者加在尾部,消费者从头部消费)

通过时间戳实现无锁编程

通过时间戳实现无锁编程是一种常见的无锁技术之一,可以用于并发环境下对共享资源的访问控制。下面是一个基本的时间戳无锁编程的实现思路:

  1. 每个线程或进程维护一个自己的时间戳,代表该线程的操作序号。
  2. 每个共享资源也维护一个时间戳,表示该资源的操作序号。
  3. 线程在对共享资源进行修改时,首先读取该资源的时间戳,然后将自己的时间戳加1,并与资源的时间戳进行比较。
  4. 如果自己的时间戳大于等于资源的时间戳,则可以执行修改操作,并更新资源的时间戳为自己的时间戳。
  5. 如果自己的时间戳小于资源的时间戳,则说明其他线程已经修改了资源,此时需要根据具体的策略来处理冲突,如等待一段时间后重新尝试,或者放弃操作。