import datetime
import threading
from threading import Thread,Lock
from time import sleep
options.connect_timeout=60
# 表名获取
def check_list(prefixName,sysName,projectName):
table_list_in=[]
for table in o.list_tables(project=projectName,prefix=prefixName):
table_list_in.append([table.name,table.size,projectName,sysName])
return table_list_in
# 单表查询
def check_sql(table_name,table_size,projectName,sysName):
searchtime = datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘)
if table_size!=0:
odsSql=‘select count(*) as cnt from ‘+projectName+‘.‘+table_name +‘ where ds=‘+datadate
# print(odsSql+"\n")
with o.execute_sql(odsSql).open_reader() as reader:
for record in reader:
cnt=record._values[0]
Myrecord=[sysName,table_name,cnt,datadate,searchtime,table_size]
# print(Myrecord)
records.append(Myrecord)
else:
blank_record=[sysName,table_name,0,datadate,searchtime,0,0]
# print(blank_record)
records.append(blank_record)
# print(‘==查询完成==‘)
# 线程管理
def check_thread(table_list):
print("线程管理开始")
for nn in table_list:
print(nn)
# print(‘============‘)
# print(table_list)
threads = []
for table in table_list:
# print(table[0])
t = threading.Thread(target=check_sql,args=(table[0],table[1],table[2],table[3]))
t.setDaemon(True)
t.start()
sleep(0.01)
threads.append(t)
# print("线程共:"+ str(len(threads))+"\n")
while True:
thread_num = len(threading.enumerate())
# print("剩余线程数:"+ str(thread_num)+"\n")
if thread_num <= 1:
break
sleep(10)
# 程序入口
if __name__ == ‘__main__‘:
print("程序开始")
# 保存表名集合
table_list = []
Mylist=[[‘dis_uep_bs_‘,‘95598业务支持系统‘,‘PRO_DWD_EXCHANGE_ZB_DOWN‘],
[‘dis_uep_icms_cisd‘,‘基建管理系统‘,‘PRO_DWD_EXCHANGE_ZB_DOWN‘],
[‘dis_uep_prs_uap‘,‘统一项目储备库管理系统‘,‘PRO_DWD_EXCHANGE_ZB_DOWN‘],
[‘dis_uep_uvm_tycldb‘,‘统一车辆管理平台‘,‘PRO_DWD_EXCHANGE_ZB_DOWN‘],
[‘dis_uep_ecp1_ebiz_bidpro‘,‘电子商务平台‘,‘PRO_DWD_EXCHANGE_ZB_DOWN‘],
[‘dis_uep_elms_htgl‘,‘经济法律管理业务应用系统‘,‘PRO_DWD_EXCHANGE_ZB_DOWN‘],
[‘dis_uep_fcm_cd_cwgk‘,‘集中部署财务管控‘,‘PRO_DWD_EXCHANGE_ZB_DOWN‘],
[‘dis_uep_hrcs_‘,‘人力资源管理系统‘,‘PRO_DWD_EXCHANGE_ZB_DOWN‘],
[‘dis_uep_mdm_sgmdm‘,‘主数据管理系统‘,‘PRO_DWD_EXCHANGE_ZB_DOWN‘],
[‘dis_uep_erp_cd_sgp_sapsr3‘,‘集中部署ERP‘,‘PRO_DWD_EXCHANGE_ZB_DOWN‘],
[‘dis_uep_pis_pps_‘,‘规划计划信息管理系统‘,‘PRO_DWD_EXCHANGE_ZB_DOWN‘]]
# 获取业务时间
datadate=str(${bdp.system.bizdate})
# 获取表名
before_time = datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘)
# print("获取表名开始时间"+before_time)
for i in range(len(Mylist)):
# print(i)
table_list+=check_list(Mylist[i][0],Mylist[i][1],Mylist[i][2])
# print(len(table_list))
after_time = datetime.datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘)
# print("获取表名结束时间"+after_time)
# 保存查询结果集合
records=[]
# 线程规划
# 线程数
magicNum=800
# 表名集合长度
lens=len(table_list)
for i in range(lens/magicNum+1):
print("第"+ str(i+1) +"次循环")
m=i*magicNum
j = m + magicNum
if j < lens:
check_thread(table_list[m:j])
else:
check_thread(table_list[m:])
if len(records)!=0:
# print(records)
print(len(records))
print(‘开始将结果写入数据表‘)
o.write_table(‘bdc_jishu_ads_pro.check_result_dis_all_thread‘,records,partition=‘ds=‘+datadate,create_partition=True)
print(‘写入结束!!!‘)
原文:https://www.cnblogs.com/jycjy/p/15183175.html