import datetime
import shutil
import zipfile
import gzip
import os
import re
import time
from lxml import etree
import xml.etree.ElementTree as ET
def un_gz(zfile_name, uzfile_name=None):
g_file = gzip.GzipFile(zfile_name)
uzfile_name = uzfile_name or ‘soap_log.xml‘
open(uzfile_name, "wb+").write(g_file.read())
g_file.close()
def un_zip(path, zfile):
file_path = path + os.sep + zfile
if not zfile.endswith(‘.zip‘): return
des_dir = path + os.sep + zfile[:zfile.index(‘.zip‘)]
src_file = zipfile.ZipFile(file_path)
for filename in src_file.namelist():
if filename == "1011_UnitOAM_SOAP_Log.zip" or filename == "UnitOAM_SOAP_Log0.xml.gz":
src_file.extract(filename, des_dir)
un_zip(des_dir, filename)
# un_zip(‘.‘, ‘rflog_20210517_112739.zip‘)
# un_gz(r‘./rflog_20210517_112739/1011_UnitOAM_SOAP_Log/UnitOAM_SOAP_Log0.xml.gz‘)
# shutil.rmtree(r‘./rflog_20210517_112739/‘) careful
xml_list = []
def normalize_xml(xml_line):
# reg_exp_node = re.compile(‘<timestamp value=‘ + ‘([\s\S]*Z/)>‘)
# reg_exp_time = re.compile(‘\d[\s\S]*Z/‘)
time_raw = re.compile(‘<timestamp value=‘ + ‘([\s\S]*Z/)>‘).findall(xml_line)[0]
time_struck = time.strptime(time_raw, "%Y-%m-%dT%H:%M:%SZ/")
time_normalize_str = datetime.datetime(*time_struck[:6])
return re.compile(‘\d[\s\S]*Z/‘).sub(‘"‘ + str(time_normalize_str) + ‘"‘, xml_line) + ‘</timestamp>‘
with open(‘soap_log.xml‘) as f:
for i in f:
if i.startswith(‘<timestamp‘) and i.strip().endswith(‘Envelope>‘):
xml_list.append(normalize_xml(i.strip()))
def find_in_header(xml_header, msg_from=None,msg_to=None):
if not msg_from and not msg_to:
return True
# if msg_from is None:
# msg_from = ‘‘
# if msg_to is None:
# msg_to = ‘‘
msg_from = ‘‘ if msg_from is None else str(msg_from)
msg_to = ‘‘ if msg_to is None else str(msg_to)
from_ = xml_header.find(‘from‘).text
to_ = xml_header.find(‘to‘).text
return msg_from in from_ and msg_to in to_
def find_in_body(xml_node,
xml_tag_name,
xml_tag_attrib=None,
managed_parameter_change=None,
should_contain_list=None,
should_not_contain_list=None):
# if xml_tag_attrib is None:
# xml_tag_attrib={}
iter_node = xml_node.iter(xml_tag_name)
for each_node in iter_node:
bool_attrib = find_body_attrib(each_node, **xml_tag_attrib)
# if bool_attrib: # node的attrib满足
# # print(each_node.attrib)
# if managed_parameter_change and each_node.tag == ‘managedObject‘: # 查找 managedObject 参数变化
# # print(managed_parameter_change)
# res = find_body_parameter_change(each_node, **managed_parameter_change)
# print(res)
# if xml_tag_attrib and bool_attrib: # 设置xml_tag_attrib后,contain就在对应满足的node中找
bool_contain = find_body_contain(each_node, should_contain_list) and find_body_not_contain(each_node, should_not_contain_list)
if managed_parameter_change and each_node.tag == ‘managedObject‘: # 查找 managedObject 参数变化
bool_parameter_change = find_body_parameter_change(each_node, **managed_parameter_change)
if bool_attrib & bool_contain & bool_parameter_change:
print(‘ bool_attrib:‘,bool_attrib)
print(‘ bool_contain:‘,bool_contain)
print(‘ bool_parameter_change:‘,bool_parameter_change)
print(each_node.tag)
print(each_node.attrib)
print(each_node.text)
def find_body_contain(xml_node, contain_list):
return not (contain_list
and set(get_check_list(xml_node, contain_list)) != set(contain_list))
def find_body_not_contain(xml_node, contain_list):
return not (contain_list
and not set(get_check_list(xml_node, contain_list)).isdisjoint(set(contain_list)))
def get_check_list(xml_node, contain_list):
check_list = []
for contain in contain_list:
if walk_text(xml_node, contain) or walk_tag(xml_node, contain):
check_list.append(contain)
return check_list
def walk_text(xml_node,node_text):
text_iter = xml_node.itertext()
for text in text_iter:
if node_text==text: return True
def walk_tag(xml_node,node_name):
node_iter = xml_node.iter(node_name)
for name in node_iter:
if node_name == name: return True
def find_body_parameter_change(managed_object_node, parameterName, newValue, prevValue=None):
if managed_object_node.tag != ‘managedObject‘:
raise Exception(‘agr must be managedObject node!‘)
parameters = managed_object_node.findall(‘parameter‘)
for parameter in parameters:
# parameter_name = parameter.find(‘parameterName‘)
# parameter_new_value = parameter.find(‘newValue‘)
# parameter_prev_value = parameter.find(‘prevValue‘)
if parameterName == parameter.find(‘parameterName‘).text and newValue == parameter.find(‘newValue‘).text and not (prevValue and prevValue != parameter.find(‘prevValue‘).text):
return True
return False
def find_body_attrib(xml_node, **expected):
if not expected:
return True
# return set(xml_node.attrib.items()) == set(expected.items())
for key in expected:
if (key not in xml_node.attrib) or (expected[key] not in xml_node.attrib[key]):
return False
# print(True)
# print(‘***expected‘,expected)
# print(‘***xml_node.attrib‘,xml_node.attrib)
return True
# & 使用
# def assert_dict(expected, result):
# for key in expected:
# if (key in result) & (result[key]==expected[key]):
# print(‘测试通过‘)
# else:
# raise Exception(‘断言不通过‘)
xml_tag_attrib_ = {‘class‘: "TxArrayCarrier", ‘distName‘: ‘NR‘}
managed_parameter_change_={‘parameterName‘: "array", ‘newValue‘: ‘txArray2‘}
# managed_parameter_change = {‘parameterName‘: "TxArrayCarrier", ‘newValue‘: ‘NR‘, ‘prevValue‘: ‘‘}
should_contain_list_ = [‘bandwidth‘]
should_not_contain_list_ = [‘bandwidth1‘]
for line_xml in xml_list:
root = ET.fromstring(line_xml)
xml_time = root.get(‘value‘)
# print(find_body_attrib(root, value=‘2021-05-17 03:28:07‘))
xml_header = root[0][0]
xml_body = root[0][1]
res_header = find_in_header(xml_header,msg_from=‘RMOD‘)
find_in_body(xml_body,
xml_tag_name=‘managedObject‘,
xml_tag_attrib=xml_tag_attrib_,
managed_parameter_change=managed_parameter_change_,
should_contain_list =should_contain_list_,
should_not_contain_list= should_not_contain_list_)
# print(xml_time)
# def find_in_body(xml_node,
# xml_tag_name,
# xml_tag_attrib=None,
# managed_parameter_change=None,
# should_contain_list=None,
# should_not_contain_list=None):
# for i in root:
# print(‘--‘,i)
# neighbor.text
# neighbor.tag
# neighbor.attrib
# print(xml_list)
#
# for i in xml_list:
# print(i)
# print(‘**********‘)
原文:https://www.cnblogs.com/amize/p/14786206.html