在开发之前我们先要结合自己项目的业务场景设计出一个临时任务表,以保证任务的“安全”,然后在开始代码的编写,以下代码里可
能会包含一些伪代码,但是整体的实现步骤和一些容错处理都会有所体现,下面看一下具体的实现:
临时任务表的大概结构:
@Data
public class TestTempMo {
/**
*
*/
private Integer id;
/**
* 待处理业务的Id
*/
private Integer logicId;
/**
* 本机ip
*/
private String ip;
/**
* 是否塞入任务队列
*/
private Boolean isTask;
/**
* 创建时间
*/
private Date createDate;
}
单例获取阻塞队列方法:
public class BlockingQueueUtils {
public static BlockingQueue<TestTempMo> testTaskQueue;
private BlockingQueueUtils() {
}
public BlockingQueue<TestTempMo> getInstance() {
if (Objects.isNull(testTaskQueue)) {
synchronized (this) {
if (Objects.isNull(testTaskQueue)) {
int cpuCores = Runtime.getRuntime().availableProcessors();
testTaskQueue = new ArrayBlockingQueue<>(cpuCores * 10);
}
}
}
return ocrScanTaskQueue;
}
}
任务生产者:
/**
* 每台机器只负责自己的任务(负载均衡由Nginx处理)
* Created by lcy on 2019/10/27.
*/
@Service
public class TestProducer implements Runnable{
private static Logger LOG = LoggerFactory.getLogger(TestProducer.class);
/** 机器空闲时,定时扫描test_temp表的间隔 */
private static final long SCAN_PERIOD = 1000 * 10;
private BlockingQueue<TestTempMo> testTaskQueue;
@Resource
//临时任务表的Mapper类
private TestTempMapper testTempMapper;
@Resource
//自定义SQL类
private SelectForMasterMapper selectForMasterMapper;
@Resource
//错误日志记录类
private LogRecord logRecord;
@Resource
private BlockingQueueUtils blockingQueueUtils;
@PostConstruct
public void init() {
try {
//初始化临时表状态(防止机器重启时有未处理的任务处理不掉)
initTempTaskState();
testTaskQueue = blockingQueueUtils.getInstance();
new Thread(this, "ScanTempProducer").start();
} catch (Throwable e) {
LogUtils.error(LOG, "初始化test生产者线程异常", e);
throw new ExceptionInInitializerError(e);
}
}
@Override
@Transactional(rollbackFor = Throwable.class)
public void run() {
while(true) {
/** 是否还有未执行完的任务 */
boolean hasMoreTask = false;
long start = System.currentTimeMillis();
try {
List<TestTempMo> taskTempMoList = produceTaskBatch();
if(CollectionUtils.isNotEmpty(taskTempMoList)) {
for (TestTempMo taskTempMo : taskTempMoList) {
//将任务塞入阻塞队列
testTaskQueue.put(taskTempMo);
//改变临时表状态,防止重复塞入任务队列
taskTempMo.setIsTask(true);
testTempMapper.updateByPrimaryKeySelective(taskTempMo);
}
/** 分页查询结果不止一页,则认为还有更多的任务(强制查询主库) */
Double count = selectForMasterMapper.selectScanTempCount(ExternalOcrConstant.IP);
if(count > 1) {
hasMoreTask = true;
}
}
} catch (Throwable e) {
LogUtils.error(LOG, "test生产者线程发生异常", e);
//记录错误日志(自定义方法,将错误日志入库发送邮件方便及时处理问题)
logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "TestTempProducer"+"#"+"run", "test系统", (int)(System.currentTimeMillis()-start), e);
}
/** 没有更多的任务,则休眠一段时间 */
if(!hasMoreTask) {
waitAMoment();
}
}
}
/**
* 分页查询未完成的临时表信息(根据本机IP和状态进行查询)
* @return
*/
private List<TestTempMo> produceTaskBatch() {
try {
//这里使用自定义SQL强制查询主库,防止主从不一致(根据id或时间排序保证任务执行顺序)
List<TestTempMo> testTempMos = selectForMasterMapper.selectScanTempByPage(ExternalOcrConstant.IP);
return testTempMos;
} catch (Throwable e) {
LogUtils.error(LOG, "获取优先任务列表异常", e);
throw new BusinessException(TestStatusEnum.SYSTEM_ERROR);
}
}
private void waitAMoment() {
try {
Thread.sleep(SCAN_PERIOD);
} catch (InterruptedException e) {
LogUtils.error(LOG, "生产者线程休眠异常", e);
}
}
/**
* 初始化临时表状态(每台机器只负责自己的任务)
*/
private void initTempTaskState(){
TestTempExample example = new TestTempExample();
example.createCriteria().andIpEqualTo(ExternalOcrConstant.IP).andIsTaskEqualTo(true);
List<TestTempMo> testTempMos = testTempMapper.selectByExample(example);
//存在遗留数据
if (CollectionUtils.isNotEmpty(testTempMos)){
for (TestTempMo testTempMo : testTempMos) {
testTempMo.setIsTask(false);
//将临时表状态改为初始状态
testTempMapper.updateByPrimaryKeySelective(testTempMo);
}
}
}
}
任务消费者:
/**
* Created by lcy on 2019/10/27.
*/
@Service
public class TestTempConsumer implements Runnable{
private static Logger LOG = LoggerFactory.getLogger(TestTempConsumer.class);
private BlockingQueue<TestTempMo> testTaskQueue;
@Resource
//错误日志记录类
private LogRecord logRecord;
@Resource
private BlockingQueueUtils blockingQueueUtils;
@Resource
//自定义SQL类
private SelectForMasterMapper selectForMasterMapper;
@PostConstruct
public void init() {
testTaskQueue = blockingQueueUtils.getInstance();
new Thread(this, "TestConsumer").start();
}
@Override
public void run() {
while(true) {
//从阻塞队列里取出任务(如果没有任务这里会阻塞)
TestTempMo taskTempMo = acquireTask();
//使用线程池多线程处理任务
ThreadPoolUtil.TestPool.execute(() -> {
//具体的消费逻辑
consume(taskTempMo);
});
}
}
/**
* 从阻塞队列里取出任务
* @return
*/
private TestTempMo acquireTask() {
try {
TestTempMo testTemp = testTaskQueue.take();
return testTemp;
} catch (InterruptedException e) {
/** 简单记录异常,无需做特殊处理 */
LogUtils.error(LOG, "从队列中获取test任务异常", e);
}
return null;
}
/**
* 消费逻辑(这里的所有SQl都要强制查询主库否则会因为主从延迟而处理失败)
* @param taskTempMo
*/
private void consume(TestTempMo taskTempMo) {
TraceUtils.beginTrace();
long start = System.currentTimeMillis();
try {
LogUtils.info(LOG, "开始处理具体的逻辑");
//开始处理具体的逻辑...
System.out.println("处理完啦");
} catch (Throwable e) {
LogUtils.error(LOG, "处理具体逻辑时发生异常", e);
//记录错误日志
logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "TestTempConsumer"+"#"+"consume", "test系统,什么数据:"+taskTempMo.getTestId(), (int)(System.currentTimeMillis()-start), e);
} finally {
try {
//删除任务表数据
selectForMasterMapper.delScanTemp(taskTempMo.getId());
} catch (Throwable e) {
LogUtils.error(LOG, "删除任务表数据异常", e,"id",taskTempMo.getId());
}
LogUtils.info(LOG, "处理具体逻辑完成", "耗时(ms)", (System.currentTimeMillis() - start));
TraceUtils.endTrace();
}
}
}
当然仅仅只有上边这些代码这个模型还是不够可靠的,因为如果集群中某台机器宕机的话则该台机器上的所有未处理完成的任务都会“陷入僵局”因此这个时候就需要其他的兄弟进行“接盘”操作了,这里是使用ZK进行处理的:
ZK的操作类:
/**
* ZK的连接工具类
* Created by lcy on 2019/10/27.
*/
@Component
public class ZooKeeperClient {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class);
//dubbo服务地址
@Value("${dubbo.registry.addrss}")
private String hosts;
//本机环境地址
@Value("${dubbo.provider.group}")
private String env;
//超时时间
private final int SESSION_TIMEOUT = 5000;
//根节点
private final String ROOT_NODE = "/test";
private ZooKeeper zk;
private CountDownLatch latch = new CountDownLatch(1);
@Resource
//错误日志记录类
private LogRecord logRecord;
@PostConstruct
private void init() {
try {
//链接ZK
initZookeeperClient();
} catch (Exception e) {
LogUtils.error(LOG, "初始化ZooKeeperClient错误", e);
throw new ExceptionInInitializerError("初始化ZooKeeperClient错误");
}
}
/**
* 链接ZK
* @throws Exception
*/
private synchronized void initZookeeperClient() throws Exception {
LogUtils.info(LOG, "初始化Zookeeper链接", "hosts", hosts);
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, event -> {
LogUtils.info(LOG, "处理ZooKeeper事件", "State", event.getState(), "Type", event.getType());
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
LogUtils.info(LOG, "连接建立");
latch.countDown();
}
}
);
// 等待连接建立
latch.await();
LogUtils.info(LOG, "成功建立ZooKeeper连接");
//判断根节点是否存在
if (Objects.isNull(zk.exists(ROOT_NODE, false))){
//创建一个持久节点
zk.create(ROOT_NODE,"IP_Statistic".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//判断环境节点是否存在
String envNode = ROOT_NODE + "/" + env;
if (Objects.isNull(zk.exists(envNode, false))){
//创建环境节点
zk.create(envNode,("environment:" + env).getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//创建IP临时节点
String childNode = envNode + "/" + IPConstant.IP;
String create = zk.create(childNode, ExternalOcrConstant.IP.getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
LogUtils.info(LOG, "创建IP节点成功", "create", create);
}
/**
* 关闭资源
*/
private void close() {
if (Objects.nonNull(zk)) {
try {
zk.close();
zk = null;
} catch (InterruptedException e) {
LogUtils.error(LOG, "关闭ZK节点失败", e, "path", ROOT_NODE);
}
}
}
/**
* 重连ZK
* @return
*/
private synchronized boolean reConnect() {
long start = System.currentTimeMillis();
//关闭链接
close();
try {
Thread.sleep(1000);
initZookeeperClient();
return true;
} catch (Exception e) {
LogUtils.error(LOG, "重连ZooKeeper失败", e);
//记录错误日志
recordErroLog(e,"reConnect",start);
}
return false;
}
/**
* 获取活跃节点
* @return
*/
public synchronized List<String> fetchActiveNode() {
long start = System.currentTimeMillis();
try {
List<String> activeNodeList = zk.getChildren(ROOT_NODE + "/" + env, false);
return activeNodeList;
} catch (Throwable e) {
LogUtils.error(LOG, "获取ZK节点列表失败", e, "path", ROOT_NODE);
//记录错误日志
recordErroLog(e,"fetchActiveNode",start);
//重连ZK
reConnect();
return Lists.newArrayList();
}
}
/**
* 记录错误日志
* @param e
* @param methodName
* @param start
*/
public void recordErroLog(Throwable e, String methodName, Long start){
logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "ZooKeeperClient"+"#" + methodName, "test系统", (int)(System.currentTimeMillis()-start), e);
}
}
服务器健康检测:
/**
* 服务器健康检测和未处理的任务分配类
* Created by lcy on 2019/10/27.
*/
@Component
public class CheckServerProcess implements Runnable{
private static final Logger LOG = LoggerFactory.getLogger(CheckServerProcess.class);
/** 检查ZK健康状况的间隔 */
private static final long CHECK_ZK = 1000 * 20;
@Resource
//临时任务表Mapper类
private TestTempMapper testTempMapper;
@Resource
//错误日记记录类
private LogRecord logRecord;
@Resource
//自定义SQL类
private SelectForMasterMapper selectForMasterMapper;
@Resource
//ZK的操作类
private ZooKeeperClient zooKeeperClient;
@PostConstruct
public void init() {
new Thread(this, "CheckServerProcess").start();
}
@Override
public void run() {
while(true) {
//检查服务器的健康状态,分配宕机的未完成任务
checkServerHealth();
waitAMoment();
}
}
/**
* 检查服务器的健康状态
*/
public void checkServerHealth() {
long start = System.currentTimeMillis();
List<String> taskIpList=Lists.newArrayList();
try {
//查询任务列表里的全部Ip
taskIpList = selectForMasterMapper.selectIpForOcrScanTemp();
//当前没有临时任务
if (CollectionUtils.isEmpty(taskIpList)){
return;
}
/** 从Zookeeper找到当前活动的机器 */
List<String> activeNodeList = zooKeeperClient.fetchActiveNode();
//活跃ip比任务ip数大于或等于则认为机器正常
if(activeNodeList.containsAll(taskIpList)) {
return;
}
/** 全部IP去掉在线的IP,剩下的就是离线的IP */
taskIpList.removeAll(activeNodeList);
LogUtils.info(LOG, "存在离线机器", "serverIp", taskIpList);
//获取离线机器的未完成任务
TestTempExample testTempExample =new TestTempExample();
testTempExample.createCriteria().andIpIn(taskIpList);
List<TestTempMo> unDealTestTemp = testTempMapper.selectByExample(testTempExample);
if(CollectionUtils.isEmpty(unDealOcrScanTemp)){
//没有未完成的处理任务
return;
}
if (CollectionUtils.isNotEmpty(activeNodeList)){
//平均分配未完成的任务
List<TestTempMo> pendTestTempList = allotTask(unDealTestTemp, activeNodeList);
//批量更新临时表
batchUpdateTemp(pendTestTempList);
LogUtils.info(LOG, "分配未处理test任务结束","deadIp", taskIpList, "task:", pendTestTempList);
}else {
LogUtils.error(LOG, "获取ZK节点列表为空");
}
}catch (Exception e){
LogUtils.error(LOG, "分配未处理test任务失败", e,"serverIpMos",taskIpList);
logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "CheckServerProcess"+"#"+"checkServerHealth", "test系统", (int)(System.currentTimeMillis()-start), e);
}
}
/**
* 平均分配未完成的任务
* @param unDealTestTemp
* @param activeNodeList
*/
public static List<TestTempMo> allotTask(List<TestTempMo> unDealTestTemp, List<String> activeNodeList) {
List<TestTempMo> testTemp=Lists.newArrayList();
//每台机器分配的任务数(平均分配)
int taskCount = unDealTestTemp.size() / activeNodeList.size();
//分配个数奇偶判断
int type = unDealTestTemp.size() % activeNodeList.size();
int count=0;
for (String ip : activeNodeList) {
Iterator<TestTempMo> it = unDealTestTemp.iterator();
while(it.hasNext()){
TestTempMo testTempMo = it.next();
testTempMo.setIp(ip);
//初始化任务状态
testTempMo.setIsTask(false);
testTemp.add(testTempMo);
it.remove();
count++;
//如果任务数大于平均任务数任务数,则分配到下台机器机器
if (type == 0){
if (count == taskCount){
count=0;
break;
}
}else {
if (count>taskCount){
count=0;
break;
}
}
}
}
return testTemp;
}
/**
* 批量更新临时表数据
* @param unDealTestTemp
*/
public void batchUpdateTemp(List<TestTempMo> unDealTestTemp){
for (TestTempMo testTempMo : unDealTestTemp) {
testTempMapper.updateByPrimaryKeySelective(testTempMo);
}
}
private void waitAMoment() {
try {
Thread.sleep(CHECK_ZK);
} catch (InterruptedException e) {
LogUtils.error(LOG, "生产者线程休眠异常", e);
}
}
}
我们在键盘上留下的余温,也将随时代传递到更远的将来。共勉!
原文:https://www.cnblogs.com/wuliaojava/p/11748035.html