常见的复合操作:”没有则加入“,”相等则移除/替换“等,都应该被实现为原子性操作来保证并发安全。
1 循环遍历线程安全容器时需要进行同步,不过这种其他线程访问容器时就必须等待遍历结束,这样由于在比较长时间占有锁,会破坏程序可伸缩性,有产生死锁的风险。有个好的方案是,循环时通过新变量复制容器,这样就形成了线程限制,但是开销也比较大。
synchronized(vector) {
for(int i = 0; i< vector.size(); i++){
fun(vector.get(i));
}
}
Vector vecoterCopy = new Vector(vector);
for(int i = 0; i< vecoterCopy.size(); i++){
fun(vecoterCopy.get(i));
}
注意:迭代器的“及时失败”:Collection在迭代开始后,若被修改,则会抛出异常ConcurrentModificationException,
2 并发容器
HashMap - > ConcurrentHashMap
List -> CopyOnWriteArrayList
Set -> CopyOnWriteSet
Queue -> ConcurrentLinkedQueue
ConcurrentHashMap采用分离锁,允许多线程并发访问读写Map,且提供不会抛出ConcurrentModificationException的迭代器,因此不需要在容器迭代时加锁,即迭代器的”弱一致性“(而不是及时失败)
CopyOnWriteArrayList同样避免了在迭代期间对容器加锁和复制,实现原理是利用不可变对象被正确发布后,则访问它就不需要同步,每次需要修改时,都会重新发布一个新的容器拷贝。其内部是一个不可变数组,每次改变时,复制数组。(因为复制数组也有开销,若对容器迭代操作的频率远高于对容器修改的频率时可以考虑)
BlockingQueue:若Queue已满,put方法会阻塞直到空间可用,反之若为空,则take阻塞。
生产者和消费者模式,生产者和消费者以不同的速度生成和消费任务,当一个任务加入时,不会立刻执行,而是置入队列中,等待处理。比如线程池和工作队列
双端队列:
DeQue(ArrayDeque)
(BlockingDeque)LinkedBlockingDeque
窃取模式:在传统都消费者生产者模式中,所有消费者共享一个工作队列,在窃取模式中,每个消费者都有
自己的一个双端队列,若一个消费者完成了自己双端队列的工作,则可以偷取其他消费者的双端队列中末尾任务。这样有更好的伸缩性。
3 线程阻塞和可中断:
一个线程因为等待I/O,等待获取一个锁,等待从Thread.sleep中唤醒,或等待另一个线程的计算结果时,它则被设置为阻塞状态(BLOCKED,WAITING,TIMED_WAITING),等一个外部事件发生后,才被置回RUNNABLE状态,重新获得调度的机会
被中断:若一个方法能抛出InterruptedException时,则该方法为一个可阻塞方法,若它被中断,则可提前结束阻塞状态。java中线程提供interrupt方法,用来给一个线程设置一个标志,表示它已被中断。在调用sleep方法时,就会抛出InterruptedException异常
4 Synchronizer(同步器):
封装状态,这些状态决定线程是通过还是等待。
4.1 闭锁(latch)
可以延迟线程的执行的进度。就像一个大门,直到闭锁到达终点状态之前,门一直是关闭的,没有任何线程能够通过,在终点状态来了后,门才打开,允许所有线程通过,而且闭锁到达终点状态后就不会再改变状态了。可用来确保特定活动在其他活动完成后才开始发生。
CountDownLatch:
将 CountDownLatch 的计数器初始化为n :new CountDownLatch(n),每当一个任务线程执行完毕,就将计数器减1 countdownlatch.countDown(),当计数器的值变为0时,在CountDownLatch上 await() 的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行
public class CountDownLatchTest {
private static final int THREAD_COUNT = 2;
public static void main(String[] args) {
CountDownLatch gate = new CountDownLatch(THREAD_COUNT);
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("自线程1 执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
gate.countDown();
}
});
thread1.start();
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("自线程2 执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
gate.countDown();
}
});
thread2.start();
System.out.println("主线程 等待中");
try {
gate.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("所有子线程执行完毕,主线程开始执行");
}
}
FutureTask
也可以作为闭锁,它提供一个可取消的携带结果的异步计算,可以通过isDone查询一个任务是否执行完成,以及isCancelled查询是否被取消,若已完成(执行完成/已取消/异常中断)则可直接调用get方法获取结果,若未完成/未启动,调用get方法则会导致调用线程阻塞。
public class FutureTaskTest {
public static void main(String[] args) {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
System.out.println("getResult in futureTask");
return "futureTask";
}
});
Thread thread = new Thread(futureTask);
thread.start();
System.out.println("try get result in main");
try {
String result = futureTask.get();
System.out.println("hello " + result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
4.2 信号量(Semaphore)
用于限制可以同时访问某些资源的线程数量,一个Semaphore管理有限个许可,线程通过调用其acquire方法获取许可,若当前已经没有许可了,则acquire方法会阻塞,直到有可用的许可为止。通过调用release方法,则可以将许可归还。
https://www.jianshu.com/p/0572483ecec7
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
//执行
try {
semaphore.acquire();
System.out.println("线程1,获取到许可,开始执行: ");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//释放
semaphore.release();
System.out.println("线程1执行完释放许可");
}
});
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("线程2,获取到许可,开始执行: ");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//释放
semaphore.release();
System.out.println("线程2执行完释放许可");
}
});
thread1.start();
thread2.start();
try {
semaphore.acquire();
System.out.println("主线程, 获取到许可,开始执行: ");
} catch (InterruptedException e) {
e.printStackTrace();
}
//释放
semaphore.release();
System.out.println("主线程执行完释放许可");
}
}
4.3 关卡(Barrier)
关卡能够阻塞一组线程,只有当该线程组的所有线程都到达关卡点,才能继续处理。
CyclicBarrier(循环关卡):当一个线程到达关卡点时,调用await会阻塞,直到所有线程都到达关卡点,关卡就会被成功突破,所有线程都被释放。而关卡会重置以备下一次使用。可以用于采用多线程计算数据,最后合并计算结果的场景。
public class CyclicBarrierDemo {
static class TaskThread extends Thread {
CyclicBarrier barrier;
public TaskThread(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(getName() + " 到达栅栏 A");
barrier.await();
System.out.println(getName() + " 冲破栅栏 A");
Thread.sleep(2000);
System.out.println(getName() + " 到达栅栏 B");
barrier.await();
System.out.println(getName() + " 冲破栅栏 B");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int threadNum = 5;
CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " 完成最后任务");
}
});
for(int i = 0; i < threadNum; i++) {
new TaskThread(barrier).start();
}
}
}
Exchanger:用于两个工作线程之间交换数据的封装工具类,简单说就是一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据.当一个线程到达 exchange 调用点时,如果其他线程此前已经调用了此方法,则其他线程会被调度唤醒并与之进行对象交换,然后各自返回;如果其他线程还没到达交换点,则当前线程会被挂起,直至其他线程到达才会完成交换并正常返回,或者当前线程被中断或超时返回.
public class ExchangerTest {
static class Producer extends Thread {
private Exchanger<Integer> exchanger;
private static int data = 0;
Producer(String name, Exchanger<Integer> exchanger) {
super("Producer-" + name);
this.exchanger = exchanger;
}
@Override
public void run() {
for (int i=1; i<5; i++) {
try {
TimeUnit.SECONDS.sleep(1);
data = i;
System.out.println(getName()+" 交换前:" + data);
data = exchanger.exchange(data);
System.out.println(getName()+" 交换后:" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Consumer extends Thread {
private Exchanger<Integer> exchanger;
private static int data = 0;
Consumer(String name, Exchanger<Integer> exchanger) {
super("Consumer-" + name);
this.exchanger = exchanger;
}
@Override
public void run() {
while (true) {
data = 0;
System.out.println(getName()+" 交换前:" + data);
try {
TimeUnit.SECONDS.sleep(1);
data = exchanger.exchange(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName()+" 交换后:" + data);
}
}
}
public static void main(String[] args) throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<Integer>();
new Producer("", exchanger).start();
new Consumer("", exchanger).start();
TimeUnit.SECONDS.sleep(7);
System.exit(-1);
}
}
4.4 构建自定义同步器