转自:https://blog.csdn.net/MonkeyDCoding/article/details/81369610
0.源代码
github-简易高并发框架
注:本篇博客知识来自于网课。
1.问题来源以及w
对于一个题库系统。考试组要有批量的离线文档要生成。题库组批量的题目要进行排重,且要根据条件批量修改题目内容。对于
痛点:
批量任务完成缓慢
所有的问题都围绕着“查询”,即查询进度影响总体性能
我们希望尽量使用友好(如果用多线程来提高性能,我们希望能屏蔽细节)
因此我们需要一个可以提供查询进度通用的框架。
2.我们该怎么做?
这里先要明确“任务”(Task)和“工作”(Job)的关系。对于一个工作,他内部可能须有许多的任务,任务是他的子元素(属性、字段)。
用并发安全的类确保每个工作的属性和工作下的每个任务信息,也意味着工作和任务的注册机制。
需要并发安全的类保存每个任务的处理结果(TaskResult)。
需要提供查询接口,供外部的使用。
这里我们不处理对于工作的检查。有兴趣的可以实现。
3.总体流程
 
这里不按照流程讲解,而是按照类关系从下而上讲解。
4.目录结构
 
5.TaskResultType
package me.hcFramework.pool.vo;
 
//这个类只是用来作为标志的信息。
public enum TaskResultType {
	SUCCESS,    //表示任务成功
	FAILSURE,   //表示任务失败
	EXCEPTION;  //表示发生了异常,这里我们不去详尽判断,只用这个标示来笼统表示	
}
6.TaskResult
package me.hcFramework.pool.vo;
 
/**
 *
 * @param <R> 业务方法处理后的业务结果数据的类型
 * 
 * 对属性使用final修饰是为了使其不可改
 */
public class TaskResult<R> {
	//用户业务是否成功完成
	private final TaskResultType resultType;
	//业务方法处理后的业务结果数据
	private final R returnType;
	//如果失败,则失败原因
	private final String reason;
	
    //针对任务失败的构造方法
	public TaskResult(TaskResultType resultType , R returnType , String reason) {
		this.resultType = resultType;
		this.returnType = returnType;
		this.reason = reason;
	}
	
    //针对任务成功的构造方法
	public TaskResult(TaskResultType resultType , R returnType) {
		this.resultType = resultType;
		this.returnType = returnType;
		this.reason = "success";
	}
    
    //因为我们希望字段不可改,设置为了final。所以只提供getters
	public TaskResultType getResultType() {
		return resultType;
	}
 
	public R getReturnType() {
		return returnType;
	}
 
	public String getReason() {
		return reason;
	}
 
	@Override
	public String toString() {
		return "TaskResult [resultType=" + resultType + ", returnType=" + returnType + ", reason=" + reason + "]";
	}
	
}
在这里其实可以发生一点小改动。即:把错误信息归并到TaskResultType中。这样一个TaskResultType包括成功,错误/异常以及其原因就完整了。这里不过多介绍。
7.JobInfo
package me.hcFramework.pool.vo;
 
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
 
 
/**
 * 可以看作是一堆Task的打包+信息控制
 * 与TaskResult一样,一旦设置好了就不许再次更改
 */
public class JobInfo<R> {
	//唯一性标志
	private final String jobName;
	//任务处理器,要求业务人员实现接口
	private final ITaskProcessor<?, ?> taskProcessor;
	//工作(Job)中任务(Task)的数量
	private final int jobLength;
 
    //以下两个类保证操作原子性
	//任务总执行成功个数
	private AtomicInteger successCount;
	//已执行的任务总数
	private AtomicInteger taskProcessCount;
 
    //每个任务的处理结果,供查询调用
	private LinkedBlockingDeque<TaskResult<R>> taskDetailQueue;
	
	public JobInfo(String jobName , int jobLength , ITaskProcessor<?,?> taskProcessor) {
		this.jobName = jobName;
		this.jobLength = jobLength;
		this.taskProcessor = taskProcessor;
		this.successCount = new AtomicInteger(0);
		this.taskProcessCount = new AtomicInteger(0);
		this.taskDetailQueue = new LinkedBlockingDeque<TaskResult<R>>(jobLength);
	}
	
	//提供工作的整体进度信息
	public String getTotalProcess() {
		return "success[" + successCount.get()+"]/current[" + taskProcessCount.get() + "],Total=[" + jobLength + "]";
	}
	
	//取得工作中每个任务的详情
	public List<TaskResult<R>> getTaskDetail() {
		List<TaskResult<R>> taskDetailList = new LinkedList<TaskResult<R>>();
		TaskResult<R> taskResult;
        //pollFirst()方法返回双端队列的第一个元素,返回的元素会从列表中移除
		while((taskResult = taskDetailQueue.pollFirst()) != null) {
			taskDetailList.add(taskResult);
		}
		return taskDetailList;
	}
	
	//放入工作详情,只需要保证最终个数正确即可,不需要加锁
	public void addTaskResult(TaskResult<R> result) {
		if(TaskResultType.SUCCESS == result.getResultType()) {
			successCount.getAndIncrement();
		}
		taskProcessCount.getAndIncrement();
		taskDetailQueue.add(result);
	}
 
	public String getJobName() {
		return jobName;
	}
 
	public ITaskProcessor<?, ?> getTaskProcessor() {
		return taskProcessor;
	}
 
	public int getJobLength() {
		return jobLength;
	}
	
	public int getSuccessCount() {
		return successCount.get();
	}
 
	public int getTaskProcessCount() {
		return taskProcessCount.get();
	}
 
	@Override
	public String toString() {
		return "JobInfo [jobName=" + jobName + ", taskProcessor=" + taskProcessor + ", jobLength=" + jobLength
				+ ", successCount=" + successCount + ", taskProcessCount=" + taskProcessCount + ", taskDetailQueue="
				+ taskDetailQueue + "]";
	}
	
}
关于LinkedBlockingDeque的说明:他是线程安全的。他是双端队列,任何一端都可以进行元素的出入。
8.ITaskProcessor
package me.hcFramework.pool.vo;
 
/**
 * 定义接口,所有需要完成的任务都需要实现此接口进行
 *
 * @param <T>	业务方法需要的数据
 * @param <R>	业务方法处理后的业务结果数据的类型
 */
public interface ITaskProcessor<T ,R> {
	TaskResult<R> taskExecute(T data);
}
9.真正的黑箱子:PendingJobPool
package me.hcFramework.pool;
 
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
import me.hcFramework.pool.vo.ITaskProcessor;
import me.hcFramework.pool.vo.JobInfo;
import me.hcFramework.pool.vo.TaskResult;
import me.hcFramework.pool.vo.TaskResultType;
 
/**
 * 
 * 这是框架主体类
 */
public class PendingJobPool {
	//key = 每个工作的名字 jobInfo.jobName
	//工作的存放容器,用于完成工作的注册
	private static ConcurrentHashMap<String,JobInfo<?>> jobInfoMap = 
			new ConcurrentHashMap<String,JobInfo<?>>();
	
    //单例模式组合拳:类内部实例化+私有构造方法+静态get方法
	private static PendingJobPool pool = new PendingJobPool();
	
	private PendingJobPool(){
		
	}
	
	public static PendingJobPool getPool() {
        //这里是为了完善逻辑,且为日后框架加入检查功能预留空间
        //当然这里也可成为后续版本AOP的切点
		//checkJob.initCheck(jobInfoMap);
		return pool;
	}
	
	//根据工作名称,拿工作的实体
	@SuppressWarnings("unchecked")
	public <R> JobInfo<R> getJob(String jobName) {
		JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName);
		if(null == jobInfo) {
			throw new RuntimeException(jobName + "是非法任务!");
		}
		return jobInfo;
	}
	
	//获得处理详情,这里不对与jobName作出检查
	public <R> List<TaskResult<R>> getTaskDetail(String jobName) {
		JobInfo<R> jobInfo = getJob(jobName);
		return jobInfo.getTaskDetail();
	}
	
	//获得处理进度
	public String getTaskProgess(String jobName) {
		return getJob(jobName).getTotalProcess();
	}
	
	//获得当前已处理多少个任务
	public int getDoneCount(String jobName) {
		return getJob(jobName).getTaskProcessCount();
	}
 
	/**
	 * 注册方法:注册工作(job)
	 * @param jobName 名字
	 * @param jobLength 工作中任务的长度
	 * @param taskProcessor 业务处理器
	 */
	public <R> void registerJob(String jobName , int jobLength , ITaskProcessor<?,?> taskProcessor) {
		JobInfo<R> jobInfo = new JobInfo<R>(jobName, jobLength, taskProcessor);
        //putIfAbsent()如果map中没有该工作,则放入且返回null;如果已有会返回对象
		if(jobInfoMap.putIfAbsent(jobName, jobInfo) != null) {
			throw new RuntimeException(jobName + "已经注册过了");
		}
	}
	
	/**
	 * 提交任务
	 * @param jobName 任务所对应的工作名
	 * @param t	任务数据
	 */
	public <T ,R> void putTask(String jobName , T t) {
		JobInfo<R> jobInfo = getJob(jobName);
		PendingTask<T ,R> task = new PendingTask<T ,R>(jobInfo , t);
		taskExecutor.execute(task);
	}
	
	//取得当前机器上的CPU数量
	private static final int THREAD_COUNTS = Runtime.getRuntime().availableProcessors();
	//阻塞队列,线程池使用,用以存放待处理的任务
	private static BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(5000);
	//线程池,固定大小,有界队列
	private static ExecutorService taskExecutor = new ThreadPoolExecutor(THREAD_COUNTS, THREAD_COUNTS, 60, TimeUnit.SECONDS, taskQueue);
	
	public void closePool() {
		taskExecutor.shutdown();
	}
	
	//交给我们框架执行的任务
	private static class PendingTask<T , R> implements Runnable {
		private JobInfo<R> jobInfo;
		private T processData;
		
		public PendingTask(JobInfo<R> jobInfo , T processData) {
			this.jobInfo = jobInfo;
			this.processData = processData;
		}
		
		@SuppressWarnings("unchecked")
		@Override
		public void run() {
			ITaskProcessor<T, R> taskProcessor = (ITaskProcessor<T, R>) jobInfo.getTaskProcessor();
			TaskResult<R> result = null;
			try{
				result = taskProcessor.taskExecute(processData);
				if(result== null) {
					result = new TaskResult<R>(TaskResultType.EXCEPTION, null , "is null");
				}else if(result.getResultType() == null) {
                    //如果你看懂这个判断,就会觉得很厉害同时又会感到羞辱
					if(result.getReason() == null) {
						result = new TaskResult<R>(TaskResultType.EXCEPTION, null , "reason is null");
					} else {
						result = new TaskResult<R>(TaskResultType.EXCEPTION, null , "type is null");
					}
				}
			} catch (Exception e) {
				result = new TaskResult<R>(TaskResultType.EXCEPTION, null , 
						"task exception" + e.getMessage());
			} finally {
				jobInfo.addTaskResult(result);
			}
			
		}
	}
}
如果读者了解Spring的实现,会知道bean的注册过程其实也就是放入了Map中。或者读者也曾经开发过一些需要注册功能的应用,无疑都是使用了Map。除了Map的高性能,真的可以说是:聪明人都只用一种聪明法。
10.测试
自己实现ITaskProcessor接口
 
public class MyTask implements ITaskProcessor<Integer, Integer>{
 
	@Override
	public TaskResult<Integer> taskExecute(Integer data) {
		Random r = new Random();
		int flag = r.nextInt(500);
		try {
			Thread.sleep(flag);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		if(flag <= 300) {//正常处理的情况
			Integer returnValue = data.intValue() + flag;
			return new TaskResult<Integer>(TaskResultType.SUCCESS, returnValue);
		} else if(flag > 300 && flag <= 400) {//处理失败的情况
			return new TaskResult<Integer>(TaskResultType.FAILSURE, -1 , "Failsure");
		} else {
			try {
				throw new RuntimeException("异常发生了!!");
			} catch(Exception e) {
				return new TaskResult<Integer>(TaskResultType.EXCEPTION, -1 ,e.getMessage());
			}
		}
	}
 
}
Test类
public class AppTest {
	private final static String JOB_NAME="计算数值";
	//private final static String JOB_OTHER_NAME = "字符串";
	private final static int JOB_LENGTH = 150;
	
	private static class QueryResult implements Runnable {
		private PendingJobPool pool;
		private String jobName;
	
		public QueryResult(PendingJobPool pool , String jobName) {
			this.pool = pool;
			this.jobName = jobName;
		}
		
		@Override
		public void run() {
			while(pool.getDoneCount(jobName) <= JOB_LENGTH) {
				List<TaskResult<String>> taskDetail = pool.getTaskDetail(jobName);
				if(!taskDetail.isEmpty()) {
					System.out.println(pool.getTaskProgess(jobName));
					System.out.println(taskDetail);
				}
				if(pool.getDoneCount(jobName) == JOB_LENGTH) {
					break;
				}
				try {
					Thread.sleep(100);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}
 
	public static void main(String[] args) {
		PendingJobPool pool = PendingJobPool.getPool();
		MyTask myTask = new MyTask();
		
		pool.registerJob(JOB_NAME, JOB_LENGTH, myTask);
		
		Random r = new Random();
		for(int i = 0 ; i < JOB_LENGTH ; i++) {
			pool.putTask(JOB_NAME, r.nextInt(1000));
		}
		
		new Thread(new QueryResult(pool, JOB_NAME)).start();
	}
}
Test类中实现了一个用来查询的线程。
--------------------- 
作者:MonkeyDCoding 
来源:CSDN 
原文:https://blog.csdn.net/MonkeyDCoding/article/details/81369610 
版权声明:本文为博主原创文章,转载请附上博文链接!
原文:https://www.cnblogs.com/sharpest/p/10660695.html