集合类不安全
list不安全
// java.util.ConcurrentModificationException 并发修改异常!
public class ListTest {
public static void main(String[] args) {
//并发下 ArrayList 不安全的 Synchronized;
/**
* 解决方案
* 1、List<String> list = new Vector<>();
* 2、List<String> list = Collections.synchronizedList(new ArrayList<>());
* 3、List<String> list = new CopyOnWriteArrayList<>();
*/
//CopyOnWrite 写入时复制 COW 计算机程序设计领域的一种优化策略;
//多个线程调用的时候,list,读取的时候,固定的,写入(覆盖)
//在写入的时候避免覆盖,造成数据问题!
//读写分离 Mycat
//CopyOnWriteArrayList 比 Vector Nb 在哪里?
List<String> list = new CopyOnWriteArrayList<>();
//List<String> list = Collections.synchronizedList(new ArrayList<>());
//List<String> list = new ArrayList<>();
//List<String> list = new Vector<>();
for (int i = 0; i < 10; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
set 不安全
/**
* 同理可证:java.util.ConcurrentModificationException 并发修改异常
* //1、Set<String> set = Collections.synchronizedSet(new HashSet<>());
* //2、Set<String> set = new CopyOnWriteArraySet<>();
*/
public class SetTest {
public static void main(String[] args) {
//Set<String> set = new HashSet<>();
//Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 10; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
hashSet 底层是什么?
public HashSet() {
map = new HashMap<>();
}
//add 本质就是map key是无法重复的!
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
private static final Object PRESENT = new Object();//不变的值
Map 不安全
//java.util.ConcurrentModificationException
public class MapTest {
public static void main(String[] args) {
//map 是这样用的吗? 不是,工作中不用 HashMap
// 默认等价于什么? new HashMap<>(16,0.75)
//加载因子,初始化容量
//Map<String, String> map = new HashMap<>();
//Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
//private static final float LOAD_FACTOR = 0.75f; 默认加载因子
//private static final int DEFAULT_CAPACITY = 16; 默认容量
//private static final int MAXIMUM_CAPACITY = 1 << 30; 最大值
Map<String, String> map = new ConcurrentHashMap<>();
for (int i = 1; i <= 30; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
Callable
接口类似于Runnable
,因为它们都是为其实例可能由另一个线程执行的类设计的。 然而,A Runnable
不返回结果,也不能抛出被检查的异常。
1、可以有返回值
2、可以抛出异常
细节:
1、有缓存
2、结果可能需要等待,会阻塞!
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//new Thread(new Runnable()).start();
//new Thread(new FutureTask<V>()).start();
//new Thread(new FutureTask<V>( Callable )).start();
new Thread().start();//怎么启动Callable
MyThread thread = new MyThread();
FutureTask futureTask = new FutureTask(thread);//适配类
new Thread(futureTask,"A").start();
Integer o = (Integer) futureTask.get();//获取 Callable 的返回值结果
System.out.println(o);
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() {
System.out.println("call()");
return 1024;
}
}
countDownLatch.countDown();//数量-1
countDownLatch.await();//等待计数器归零,然后在向下执行
每次有线程调用countDown()数量-1,假设计数器变为0,countDownLatch.await()就会被唤醒,继续执行!
//计数器
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//总数是6 必须要执行任务的时候,再使用!
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName() + "Go out");
countDownLatch.countDown();//数量-1
},String.valueOf(i)).start();//减法计数器
}
countDownLatch.await();//等待计数器归零,然后在向下执行
//countDownLatch.countDown();//-1
System.out.println("Close Door");
}
}
允许一组线程全部等待彼此达到共同屏障点的同步辅助。 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 集齐7颗龙珠召唤神龙
*/
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
//lambda能操作到i吗?
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集"+temp+"颗龙珠");
try {
cyclicBarrier.await();//等待
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();//加法计数器
}
}
}
一个计数信号量。 在概念上,信号量维持一组许可证。 如果有必要,每个acquire()
都会阻塞,直到许可证可用,然后才能使用它。
原理:
semaphore.acquire() 获得,假设如果已经满了,等待,等待被释放为止!
semaphore.release() 释放,会将当前的信号量释放+1,然后唤醒等待的线程!
作用:多个共享资源互斥的使用!并发限流,控制最大的线程数!
public class SemaphoreDemo {
public static void main(String[] args) {
//线程数量:停车位! 限流!
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
//acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName() + "离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();//release() 释放
}
},String.valueOf(i)).start();
}
}
}
四组API: 1、抛出异常 2、有返回值,不会抛出异常 3、阻塞等待 4、超时等待
public class Test {
public static void main(String[] args) throws InterruptedException {
test1();
test2();
test3();
test4();
}
/**
* 抛出异常
*/
public static void test1(){
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
//IllegalStateException: Queue full 抛出异常!
//System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.element());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//NoSuchElementException 抛出异常!
//System.out.println(blockingQueue.remove());
}
/**
* 有返回值,没有异常
*/
public static void test2(){
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//System.out.println(blockingQueue.offer("d"));//false 不抛出异常!
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//System.out.println(blockingQueue.poll());//null 不抛出异常!
}
/**
* 等待,阻塞(一直阻塞)
*/
public static void test3() throws InterruptedException {
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
//一直阻塞
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
//blockingQueue.put("d");//队列没有位置了,一直阻塞
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
//System.out.println(blockingQueue.take());//没有这个元素,一直阻塞
}
/**
* 等待,阻塞(等待超时)
*/
public static void test4() throws InterruptedException {
//队列的大小
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a");
blockingQueue.offer("b");
blockingQueue.offer("c");
//blockingQueue.offer("d",2, TimeUnit.SECONDS);//等待超过2秒就退出
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS));//等待超过2秒就退出
}
}
SynchronousQueue 同步队列: 进去一个元素,必须等待取出来之后,才能再往里面放一个元素
/**
* 同步队列
* 和其他的 BlockingQueue 不一样,SynchronousQueue 不存储元素
* put 了一个元素,必须从里面先 take 取出来,否则不能在 put 进去值!
*/
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();//同步队列
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName() + "put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + "put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + "put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "=>" + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "=>" + blockingQueue.take());
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "=>" + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
线程池:三大方法、7大参数、4种拒绝策略
程序的运行,本质:占用系统的资源!优化资源的使用!=>池化技术
线程池、链接池、内存池、对象池///......创建、销毁十分浪费资源
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我
线程池的好处:
1、降低资源的消耗
2、提高相应的速度
3、方便管理
线程复用、可以控制最大并发数、管理线程
// Executors 工具类、3大方法
//使用了线程池之后,使用线程池来创建线程
public class Demo01 {
public static void main(String[] args) {
// ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
// ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个固定的线程池大小
ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩的,遇强则强,遇弱则弱
try {
for (int i = 0; i < 100; i++) {
//使用了线程池之后,使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + "oK");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadPool.shutdown();
}
}
}
//七大参数源码
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,//21亿
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
//本质:ThreadPoolExecutor()
public ThreadPoolExecutor(int corePoolSize,//核心线程池大小
int maximumPoolSize,//最大核心线程池大小
long keepAliveTime,//超时了没有人调用就会释放
TimeUnit unit,//超时单位
BlockingQueue<Runnable> workQueue,//阻塞队列
ThreadFactory threadFactory,//线程工厂,创建线程的,一般不用
RejectedExecutionHandler handler) {//拒绝策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
// Executors 工具类、3大方法
//使用了线程池之后,使用线程池来创建线程
/**
* new ThreadPoolExecutor.AbortPolicy() //银行满了,还有人进来,不处理这个人的,抛出异常
* new ThreadPoolExecutor.CallerRunsPolicy() //哪来的去哪里!
* new ThreadPoolExecutor.DiscardPolicy() //队列满了不会抛出异常,丢掉任务,不会抛出异常!
* new ThreadPoolExecutor.DiscardOldestPolicy() //队列满了尝试和最早的竞争,也不会抛出异常!
*/
public class Demo01 {
public static void main(String[] args) {
//ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程
//ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个固定的线程池大小
//ExecutorService threadPool = Executors.newCachedThreadPool();//可伸缩的,遇强则强,遇弱则弱
//自定义线程池!工作中ThreadPoolExecutor
//最大线程到底该如何定义 一般为核数*2
//1、CPU 密集型,几核,就是几,可以保证CPU的效率最高!
//2、IO 密集型,判断你程序中十分耗IO的线程
//程序 15个大型任务 IO十分占用资源!
//获取CPU的核数
System.out.println(Runtime.getRuntime().availableProcessors());
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了尝试和最早的竞争,也不会抛出异常!
try {
//最大承载:Deque + max
//超过 RejectedExecutionException 拒绝策略
for (int i = 1; i <= 9; i++) {
//使用了线程池之后,使用线程池来创建线程
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName() + "oK");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//线程池用完,程序结束,关闭线程池
threadPool.shutdown();
}
}
}
原文:https://www.cnblogs.com/linhtx212318293/p/14598205.html