首页 > 其他 > 详细

自动清理任务

时间:2020-04-04 15:24:08      阅读:76      评论:0      收藏:0      [点我收藏+]
# -*- coding: UTF-8 -*-
import json
import time
from clear_requests import *
from datetime import datetime, timedelta


def get_del_task_names_from_taskversion(searchKeyword="",projectId=‘‘,task_num=3,no_del_task_names=[],del_task_names=[],valid_days=7,taskType=""):
	tasks = []
	# no_del_task_types = ["dm","ods","acc","dim","fact","dws","mid","tag"]
	no_del_task_types = ["tag"]
	# valid_days = 7
	delta = timedelta(days=valid_days)
	if(not projectId and not searchKeyword):
		raise("参数projectId和searchKeyword不能同时为空!")

	try:
					
		# 获取所有“0602上线测试”项目里面的任务名称和类型,并放在一个list中,将指定名称的任务和名称中含有0905的任务排除
		task_info = json.loads(get_task_info(searchKeyword=searchKeyword,task_num=task_num,projectId=projectId))
		if(task_info["code"]["code"] != "0000"):
			logging.error(‘获取任务信息失败:\n%s‘ %(json.dumps(task_info)[:1000]), exc_info=True)
			raise Exception("获取任务信息失败。程序终止...")
		
		task_info_rows = task_info["bo"]["rows"]
		task_info_rows.reverse()
		for i in task_info_rows:
			createtime = datetime.strptime(i["createdDate"],"%Y-%m-%d %H:%M:%S")
			now = datetime.now()
			# logging.info("***task %s, type is %s,create date is %s***"%(i["taskName"], i["taskType"],i["createdDate"]))
			if(i["taskName"] in del_task_names):
				tasks.append((i["taskId"],i["taskType"],i["taskName"]))
			
			elif((now-createtime) < delta):
				logging.info(‘Task "%s" created time is %s,less than %s days, can\‘t be Dealed!‘%(i["taskName"],i["createdDate"],valid_days))
				
			elif(i["taskType"].lower() in no_del_task_types):
				logging.info(‘Task "%s\‘s" type is %s , this type can\‘t be Dealed!‘ %(i["taskName"], i["taskType"]))
			
			# elif(i["taskName"] in no_del_task_names or i["taskName"].find("0905") != -1):
			# 	logging.info(‘Task "%s" can\‘t be Dealed! It is used by daily testing.‘ %i["taskName"])
			
			else:
				tasks.append([i["taskId"],i["taskType"],i["taskName"]])
	except Exception as ex:
		logging.error("获取任务信息失败。The Error1 is:%s" %ex, exc_info=True)
	return tasks

def get_ods_task_names(projectid=‘282221398641901568‘,queryInfo=‘‘):	
	task_info = json.loads(get_ods_task_info(projectid=projectid,queryInfo=queryInfo))
	if(task_info["code"]["code"] != "0000"):
			logging.error(‘获取任务信息失败:\n%s‘ %(json.dumps(task_info)[:1000]), exc_info=True)
			raise Exception("获取ods任务信息失败。程序终止...")
	tasks = [(i["taskId"],i["taskType"],i["taskName"]) for i in task_info["bo"]["rows"]]

	return tasks

def del_and_offline_tasks(tasks):
	try:
		logging.info("These tasks‘s version will be dealed are:\n%s\n"%tasks)
		
		# 根据前面提供的列表,对任务进行下线和删除版本
		for i in tasks:
			task_ver_info = json.loads(get_task_ver(i[0],i[1]))
			if(task_ver_info["code"]["code"] != "0000"):
				logging.error(‘Taskid "%s" get version failed, the error info is:\n%s‘ %(i[0],json.dumps(task_ver_info)[500]))
				continue
					
			for j in task_ver_info["bo"]["rows"]:
				if(j["taskVerStatus"]):
					logging.info(‘Task "%s" will be offlined, the Version is: %s, the creator is: %s, create date is: %s‘ %(j["taskName"], j["taskVerCode"],j["creator"],j["createdDate"]))
					task_offline(j["taskVerId"], j["taskType"])
					
					logging.info(‘Task "%s" will be deleted, the Version is: %s, the creator is: %s, create date is: %s‘ %(j["taskName"], j["taskVerCode"],j["creator"],j["createdDate"]))
					del_task_ver(j["taskVerId"], j["taskType"])
				else:
					logging.info(‘Task "%s" will be deleted, the Version is: %s, the creator is: %s, create date is: %s‘ %(j["taskName"], j["taskVerCode"],j["creator"],j["createdDate"]))
					del_task_ver(j["taskVerId"], j["taskType"])
			
	except Exception as ex:
		logging.error("The Error2 is:%s" %ex, exc_info=True)

def stop_task_from_adma(otherCondition=‘edw_tag‘, status="", limit=2 ,time=cur_date,taskType=""):
	virtual_task_info = get_virtual_task()

	virtual_task_names = ""
	for m in virtual_task_info:
		virtual_task_names = virtual_task_names +"," + m["featureName"]
	virtual_task_names = json.dumps(virtual_task_names)

	task_info = get_task_name_from_adma(otherCondition=otherCondition,project_name = virtual_task_names, status=status, limit=limit ,time=time,taskType="")
	
	task_names =[x["taskName"] for x in task_info["searchInfo"]]
	
	logging.info("要删除的任务总数是:%d" %len(task_names))
	for j in task_names:
		logging.info("将要停止adma任务:{}".format(j))
		stop_task(j)

def clear_all_tasks_mian():
	logging.info("\n----------This is the start seperate line----------\n")
	
	del_task_names = ["acc_ods_alm_liuyue_test_0905_0323","acc_ods_alm_liuyue_test_0905_0323_hbase"]
	no_del_task_names = []
	projectid = ‘282221398641901568‘  #测试环境projectid = ‘282221398641901568‘ ,liuyue项目;0602,projectid=‘200292042823204864‘
	
	# 获取要删除的任务列表
	# tasks = get_ods_task_names(queryInfo=‘autotest‘,projectid=‘‘)
	# tasks = get_del_task_names_from_taskversion(projectId=‘‘,searchKeyword="employee")
	
	tasks = get_del_task_names_from_taskversion(searchKeyword=‘edw_tag‘,projectId="",task_num=300,del_task_names=del_task_names,no_del_task_names=		no_del_task_names,valid_days=0,taskType="")
	
	# 运维中心下线和删除版本
	del_and_offline_tasks(tasks)
	
	# 删除ods任务草稿
	# if(len(tasks)>0):
	# 	for task in tasks:
	# 		logging.info("ods task %s‘s draft will be deleted!" %task[2])
	# 		del_ods_task(task[0])
	# else:
	# 	logging.info("There is no task to delete!")
	
	logging.info("\n----------This is the end seperate line----------\n")



# clear_all_tasks_mian()
stop_task_from_adma(otherCondition=‘edw_dws‘, status="\"initial\"", limit=1 ,time=cur_date,taskType="")

  

# -*- coding: UTF-8 -*-
import requests
import base64
import json
import time
import random
import logging
import datetime

logging.basicConfig(filename="deal_log.txt",filemode="a",
                    format="%(asctime)s-%(funcName)s-%(lineno)d-%(levelname)s:%(message)s",level=logging.INFO)

console = logging.StreamHandler()
logging.getLogger().addHandler(console)

cur_date = datetime.datetime.today().strftime("%Y-%m-%d")
header = {"Content-Type":"application/json","X-Emp-No":"10237221","X-Lang-Id":"zh_CN","X-Auth-Value":get_token()}
# base_url = ‘http://d.zte.com.cn‘
base_url = ‘http://10.136.142.108‘
adma_ip = ‘10.5.21.133:26180‘

# 获取token
def get_token():
	url =‘http://uac.zte.com.cn/uaccommauth/auth/comm/createRefreshToken.serv‘
	header = {"Content-Type":"application/json"}
	datastr = "eyJsb2dpbkNsaWVudElwIjogIjEyNy4wLjAuMSIsICJhY2NvdW50IjogIjEwMjM3MjIxIiwgInBhc3NXb3JkIjogIjI5OTgxMDA2YS0iLCAibG9naW5TeXN0ZW	1Db2RlIjogIlVBQyIsICJ2ZXJpZnlDb2RlIjogIjJkNz	EzODQ2N2I3OWRlYTZlNTI2NWQ1ZTIzMDA3YTg4In0="
		
	r = requests.post(url=url, headers = header, data = json.dumps(json.loads(base64.b64decode(datastr))))
	
	r_dict = json.loads(r.content)
	token = r_dict["other"]["token"]
	return token 

# 开发版本查询界面,查询任务信息
def get_task_info(searchKeyword = "",projectId = "",task_num = 3,taskType=""):
	logging.info("The params is:searchKeyword = %s,projectId= %s ,task_num= %s ,taskType=%s"%(searchKeyword,projectId,task_num,taskType))
	url =base_url + ‘/zte-itp-dcp-dataoperation/task/getpage‘
	data ={"page":1,"rows":task_num,"bo":{"projectIds":[projectId],"schedulingType":"","searchKeyword":searchKeyword,"taskType":taskType}}	

	r = requests.post(url=url, headers = header, data = json.dumps(data))
	return r.content 

# 点击查看任务版本信息
def get_task_ver(taskid, tasktype):
	url =base_url + ‘/zte-itp-dcp-dataoperation/dobaselineinfo/gettaskverinfos‘
	data = {"bo":{"baselineExtractStatus":"","searchKeyword":"","taskType":tasktype,"taskId":taskid},"page":1,"rows":10,"sort":"",
		"order":"","other":{}}
	
	r = requests.post(url=url, headers = header, data = json.dumps(data))
	
	return r.content  

# 点击下线
def task_offline(taskverid, tasktype):
	url =base_url +  ‘/zte-itp-dcp-dataoperation/task/offlinetaskbytaskverid‘
	data = {"taskType":tasktype,"taskName":"ods_test_ods_1_global_privs_xl_1112_21522","taskVerId":taskverid,"taskId":
		"3917491946274488322","taskVerCode":"V1.0.02"}

	r = requests.post(url=url, headers = header, data = json.dumps(data))
	
# 点击删除版本
def del_task_ver(taskverid, tasktype):
	url =base_url + ‘/zte-itp-dcp-dataoperation/task/deletetaskbytaskverid‘
	data = [{"taskType":tasktype,"taskName":"edw_test_full_123102","taskVerId":taskverid,"taskId":"409460023074258944",
		"taskVerCode":"V1.0.0"}]
	if(tasktype.lower() ==‘tag‘ or tasktype.lower()==‘profile‘):
		logging.info("标签和画像类型的任务无法在运维中心删除版本,需要在标签工厂界面删除!")
	else:
		r = requests.post(url=url, headers = header, data = json.dumps(data))
		logging.info("任务:%s已被删除!"%(taskverid))

# 数据同步查询界面,查询ods任务信息
def get_ods_task_info(queryInfo="",projectid=‘282221398641901568‘, tasktype=""):
	url =base_url + ‘/zte-itp-dcp-datamodeling/detaskconf/gettaskquery‘
	data = {"bo":{"goalDomainSubId":"","projId":projectid,"queryInfo":queryInfo,"lastUpdatedBy":"","taskType":tasktype},
		"order":"desc","other":{},"page":1,"rows":1000,"sort":"lastUpdatedDate"}

	r = requests.post(url=url, headers = header, data = json.dumps(data))
	return r.content

# 数据同步查询界面,删除ods任务草稿
def del_ods_task(taskid):
	url =base_url + ‘/zte-itp-dcp-datamodeling/detaskconf/deletetask‘
	data = {"bo":{"taskId":taskid}}

	r = requests.post(url=url, headers = header, data = json.dumps(data))

# 新增数据源
def add_datasource(sourceName, dbJdbcUrl, dbAccount, dbPwd, sourceEnvType, dbType):
	url =base_url + ‘/zte-itp-dcp-datamodeling/dedatasourceconf/add‘
	dbPwd = base64.b64encode(dbPwd)
	data = {"dbAccount":dbAccount,"dbJdbcUrl":dbJdbcUrl,
		"dbNames":"","portNumber":null,"dbPwd":dbPwd,"sourceEnvType":sourceEnvType,"dbType":dbType,"switchType":"1",
		"domainSubId":"200247186260262912","dbStatus":0,"domainSubName":"ods/alm","projId":"282221398641901568","region":"shenzhen",
		"sourceDesc":"","sourceName":sourceName,"clusterName":"","maxConnectionQty":20}

	r = requests.post(url=url, headers = header, data = json.dumps(data))

# 删除数据源
def del_datasource(sourceid):
	url =base_url + ‘/zte-itp-dcp-datamodeling/dedatasourceconf/delete/‘

	r = requests.delete(url=url, headers = header, params=sourceid)

# 查询数据源
def search_datasource(sourceName):
	url =base_url + ‘/zte-itp-dcp-datamodeling/dedatasourceconf/getlist‘
	data = {"bo":{"dbType":"","domainSubName":"","sourceEnvType":"","sourceName":sourceName,"projId":"282221398641901568"},"page":1,
		"rows":10,"sort":"lastUpdatedDate","order":"desc"}
	r = requests.post(url=url, headers = header, data=data)

# adma统计分析界面,获取任务信息
def get_task_name_from_adma(otherCondition=‘‘,project_name = ‘test_0602‘, status="", limit=100 ,time=cur_date,taskType=""):

	url =‘https://‘ +adma_ip +‘/vmaxmetadata/metadatamanage/analysis/tasksearch‘
	header = {"Content-Type":"application/json"}
	data = {"taskSearchCondition":{
		"virtualTaskName":project_name,"status":status,"repeat":"","taskTriggerType":"","taskType":taskType,
		"executeType":"","executeTime":{"executeTimeSecondStart":0,"executeTimeSecondEnd":86400},"dbType":"","reasonCategory":"",
		"otherCondition":otherCondition,"durationAsc":""},
		"draw":20,"limit":limit,"page":1,"provincecode":"all","time":time,"classify":"all",
		"order":{"column":"taskId","dir":"asc"}}

	r = requests.post(url=url, headers = header, data = json.dumps(data), verify = False)
	
	r_dict = json.loads(r.content)
	return r_dict 

# adma接口,停止任务
def stop_task(task_name=‘‘):
	url =‘https://‘ + adma_ip +‘/vmaxmetadata/metadatamanage/visualrestapi/add‘
	header = {"Content-Type":"application/json"}
	data = {
		"taskType": "302",
		"userName": "admin",
		"tasksInfo": {
			"algorithmNames": task_name
		}
	}	
	
	r = requests.post(url=url, headers = header, data = json.dumps(data), verify = False)
	
	r_dict = json.loads(r.content)
	return r_dict 
# adma接口,查询虚拟任务
def get_virtual_task(time=cur_date):
	url =‘https://‘ + adma_ip +‘/vmaxmetadata/metadatamanage/analysis/featuresearch‘
	header = {"Content-Type":"application/json"}
	param = dict(provincecode="all",time=time,classify="all",_="1585277235758")
	r = requests.get(url=url, headers = header, params=param, verify = False)
	
	r_dict = json.loads(r.content)
	return r_dict 

# print(get_token())
# get_task_info("200292042823204864")
# get_task_info(searchKeyword="employee")
# get_task_ver(‘409470761318776832‘,‘ACC‘)
# task_offline(‘409470761318776832‘)
# del_task_ver(‘409470761318776832‘)
# print(get_task_name_from_adma())
# logging.error("This is a error log.")
# get_ods_task_info("test")
# del_datasource("123")
# stop_task("edw_tag_profile_autotest_tu8r_1580912760_cf064_hb")

  

自动清理任务

原文:https://www.cnblogs.com/yahutiaotiao/p/12631936.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!