在Java中,wait()
和notify()
是Object
类的两个方法,它们用于实现线程间的协作。wait()
使一个线程进入等待状态,直到另一个线程发出通知唤醒它。而notify()则用于唤醒正在等待的线程。
wait()
和notify()
必须放在synchronized
块中是因为这些方法依赖于对象的监视器锁(也称为互斥锁)。只有获得了对象的监视器锁(该锁即为调用wait
方法的对象)的线程才能调用wait()
和notify()
方法。如果这些方法不在同步块中使用,就无法保证线程安全性。
为了解决「lost wake up 问题」
两个线程启动,消费者检查 obj.count
的值,发现 obj.count <= 0
条件成立,但这时由于 CPU 的调度,发生上下文切换,生产者开始工作,执行了 count+1
和 obj.notify()
,也就是发出通知,准备唤醒一个阻塞的线程。然后 CPU 调度到消费者,此时消费者开始执行 obj.wait()
,线程进入阻塞。但生产者已经早在消费者阻塞前执行了唤醒动作,也就导致消费者永远无法醒来了。
在Java中,volatile
关键字是用于保证多线程环境下变量的可见性和有序性。
volatile
关键字声明代表变量是共享且不稳定的,每次使用它都到主存中进行读取,并且强制刷新到内存。volatile
关键字会禁止指令重排保证其有序性。volatile
是通过编译器在生成字节码时,在指令序列中添加内存屏障来保证变量可见性。
JMM层面的“内存屏障”:
JVM的实现会在volatile读写前后均加上内存屏障,在一定程度上保证有序性。如下所示:
LoadLoad volatile 读操作 LoadStore
StoreStore volatile 写操作 StoreLoad
禁止指令重排,汇编层面
lock 前缀:lock不是内存屏障,而是一种锁。执行时会锁住内存子系统来确保执行顺序,甚至跨多个CPU
synchronized
是 Java 中的一个关键字,翻译成中文是同步的意思,主要解决的是多个线程之间访问资源的同步性,可以保证被它修饰的方法或者代码块在任意时刻只能有一个线程执行。
在JDK1.6之前synchronized
关键字基于操作系统mutux
互斥变量实现,该操作较为重量级,但是在JDK1.6之后进行的JVM层面的锁优化。效率大大提升,优化后的锁主要分为一下四类
锁可以升级,但不能降级。即:无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁是单向的。首先程序判断该对象是否有锁,如果没有锁则将无锁状态转换为偏向锁状态,这里通过CAS操作进行加锁,如果加锁成功之后,就不会再有解锁等操作了。假如有两个线程来竞争该锁话,那么偏向锁就失效了,进而升级成轻量级锁了。
轻量级之所以是轻量级锁,是因为它仅仅使用 CAS 进行操作来获取锁。如果获取成功那么会直接获取锁,如果失败,当前线程便尝试使用自旋来获取锁。当竞争线程的自旋次数达到界限值(threshold
),轻量级锁将会膨胀为重量级锁。
重量级锁,是使用操作系统互斥量(mutex
)来实现的传统锁。 当所有对锁的优化都失效时,将退回到重量级锁。它与轻量级锁不同竞争的线程不再通过自旋来竞争线程, 而是直接进入堵塞状态,此时不消耗CPU,然后等拥有锁的线程释放锁后,唤醒堵塞的线程, 然后线程再次竞争锁。但是注意,当锁膨胀为重量锁时,就不能再退回到轻量级锁。
volatile
关键字是线程同步的轻量级实现,所以性能肯定比synchronized
关键字要好 。
volatile
关键字只能作用于变量,而 synchronized
关键字可以修饰方法以及代码块 。
volatile
关键字能保证数据的可见性。而synchronized
关键字能保证数据原子性。
volatile
关键字主要用于解决变量在多个线程之间的可见性,而 synchronized
关键字解决的是多个线程之间访问资源的同步性。
ReentrantLock
实现了 Lock
接口,是一个可重入且独占式的锁(默认是非公平锁),他的底层是基于CAS+AQS+LockSupport实现的
CAS是Compare and Swap的缩写,即比较并交换。它是一种无锁算法,在多线程编程中用于实现同步操作。简单来说,CAS操作包括三个操作数:内存位置V、预期值A和新值B。当且仅当内存位置V的值等于预期值A时,才将该位置的值更新为新值B。
AQS抽象队列同步器。它是Java并发包中锁和同步工具的核心实现上面的所说的LOCK锁就是基于AQS实现的。
AQS提供了一种通用的框架,用于实现线程间的协作和同步操作。它的核心思想是使用一个先进先出的等待队列来管理线程状态,同时支持独占模式和共享模式两种同步方式。
基于C语言底层实现,它可以阻塞线程以等待许可证,或者取消线程的阻塞状态,而不需要使用传统的synchronized关键字或Object.wait()/notify()方法。
synchronized
依赖于 JVM 而 ReentrantLock
依赖于 APIReentrantLock
可以指定是公平锁还是非公平锁。而synchronized
只能是非公平锁。synchronized
是以代码块形式实现的,因此只能对整个代码块进行同步操作,无法在代码块内部实现一些细粒度的控制。而ReentrantLock
可以通过Condition
对象实现线程间的协作和控制。AQS中,共享模式获取锁的顶层入口方法是acquireShared
,该方法会获取指定数量的资源,成功的话就直接返回,失败的话就进入等待队列,直到获取资源,
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
该方法里包含了两个方法的调用,
tryAcquireShared:尝试获取一定资源的锁,返回的值代表获取锁的状态。
doAcquireShared:进入等待队列,并循环尝试获取锁,直到成功。
tryAcquireShared
tryAcquireShared
在AQS里没有实现,同样由自定义的同步器去完成具体的逻辑,像一些较为常见的并发工具Semaphore、CountDownLatch里就有对该方法的自定义实现,虽然实现的逻辑不同,但方法的作用是一样的,就是获取一定资源的资源,然后根据返回值判断是否还有剩余资源,从而决定下一步的操作。
返回值有三种定义:
当返回值小于0时,证明此次获取一定数量的锁失败了,然后就会走doAcquireShared
方法
private void doAcquireShared(int arg) {
// 加入队列尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
// CAS自旋
for (;;) {
final Node p = node.predecessor();
// 判断前驱结点是否是head
if (p == head) {
// 尝试获取一定数量的锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取锁成功,而且还有剩余资源,就设置当前结点为head,并继续唤醒下一个线程
setHeadAndPropagate(node, r);
// 让前驱结点去掉引用链,方便被GC
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 跟独占模式一样,改前驱结点waitStatus为-1,并且当前线程挂起,等待被唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// head指向自己
setHead(node);
// 如果还有剩余量,继续唤醒下一个邻居线程
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
按照正常的思维,共享模式是可以多个线程同时执行的才对,所以,多个线程的情况下,如果老大释放完资源,但这部分资源满足不了老二,但能满足老三,那么老三就可以拿到资源。可事实是,从源码设计中可以看出,如果真的发生了这种情况,老三是拿不到资源的,因为等待队列是按顺序排列的,老二的资源需求量大,会把后面量小的老三以及老四、老五等都给卡住。从这一个角度来看,虽然AQS严格保证了顺序,但也降低了并发能力
线程池就是管理一系列线程的资源池。当有任务要处理时,直接从线程池中获取线程来处理,处理完之后线程并不会立即被销毁,而是等待下一个任务。线程池的优点和好处主要有以下三点
降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
FixedThreadPool
:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
SingleThreadExecutor
: 该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
CachedThreadPool
: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
ScheduledThreadPool
:该返回一个用来在给定的延迟后运行任务或者定期执行任务的线程池。
FixedThreadPool
和 SingleThreadExecutor
:使用的是无界的 LinkedBlockingQueue
,任务队列最大长度为 Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。
CachedThreadPool
:使用的是同步队列 SynchronousQueue
, 允许创建的线程数量为 Integer.MAX_VALUE
,可能会创建大量线程,从而导致 OOM。
ScheduledThreadPool
和 SingleThreadScheduledExecutor
: 使用的无界的延迟阻塞队列DelayedWorkQueue
,任务队列最大长度为 Integer.MAX_VALUE
,可能堆积大量的请求,从而导致 OOM。
总之一句话,他们的队列长度都是MAX_VALUE,可能导致OOM
corePoolSize
: 任务队列未达到队列容量时,最大可以同时运行的线程数量。
maximumPoolSize
: 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
workQueue
: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
当线程数小于核心线程数时,创建线程。
当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列
当线程数大于等于核心线程数,且任务队列已满
若线程数小于最大线程数,创建线程。
若线程数等于最大线程数,抛出异常,拒绝任务,执行拒绝策略
ThreadPoolExecutor.AbortPolicy
: 抛出 RejectedExecutionException
来拒绝新任务的处理。ThreadPoolExecutor.CallerRunsPolicy
: 调用执行自己的线程运行任务,也就是直接在调用execute
方法的线程中运行(run
)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。ThreadPoolExecutor.DiscardPolicy
: 不处理新任务,直接丢弃掉。ThreadPoolExecutor.DiscardOldestPolicy
: 此策略将丢弃最早的未处理的任务请求。Runtime.getRuntime().availableProcessors()
CPU 密集型任务(N+1)
I/O 密集型任务(2N)
CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。
private static final int NEW = 0; // 初始状态,FutureTask刚被创建,正在计算中都是该状态。
private static final int COMPLETING = 1; // 中间状态,表示计算已完成正在对结果进行赋值,或正在处理异常
private static final int NORMAL = 2; // 终止状态,表示计算已完成,结果已经被赋值。
private static final int EXCEPTIONAL = 3; // 终止状态,表示计算过程已经被异常打断。
private static final int CANCELLED = 4; // 终止状态,表示计算过程已经被cancel操作终止。
private static final int INTERRUPTING = 5; // 中间状态,表示计算过程已开始并且被中断,正在修改状态。
private static final int INTERRUPTED = 6; // 终止状态,表示计算过程已开始并且被中断,目前已完全停止。
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 初始状态默认为0
if (s <= COMPLETING)
// 如果小于COMPLETING则阻塞
s = awaitDone(false, 0L);
return report(s);
}
如果没有调用run方法改变线程状态则future则会被一直阻塞导致OOM
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)·
// 改变状态
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
通常情况下,我们创建的变量是可以被任何一个线程访问并修改的,想要每一个线程都有自己私有的本地变量就就需要使用ThreadLocal类,ThreadLocal
类主要解决的就是让每个线程绑定自己的值,可以将ThreadLocal
类形象的比喻成存放数据的盒子,盒子中可以存储每个线程的私有数据。
ThreadLocal
底层为每一个线程维护了一个ThreadLocalMap
,他的set方法如下
public void set(T value) {
//获取当前请求的线程
Thread t = Thread.currentThread();
//取出 Thread 类内部的 threadLocals 变量(哈希表结构)
ThreadLocalMap map = getMap(t);
if (map != null)
// 将需要存储的值放入到这个哈希表中
map.set(this, value);
else
createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
ThreadLocalMap
中使用的 key 为 ThreadLocal
的弱引用,而 value 是强引用。所以,如果 ThreadLocal
没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。
这样一来,ThreadLocalMap
中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。ThreadLocalMap
实现中已经考虑了这种情况,使用完 ThreadLocal
方法后最好手动调用remove()
方法
通过时间戳实现无锁编程是一种常见的无锁技术之一,可以用于并发环境下对共享资源的访问控制。下面是一个基本的时间戳无锁编程的实现思路: