首页 > 编程语言 > 详细

python解析soap消息

时间:2021-05-20 00:31:19      阅读:18      评论:0      收藏:0      [点我收藏+]

import datetime
import shutil
import zipfile
import gzip
import os
import re
import time
from lxml import etree
import xml.etree.ElementTree as ET




def un_gz(zfile_name, uzfile_name=None):
    g_file = gzip.GzipFile(zfile_name)
    uzfile_name = uzfile_name or ‘soap_log.xml‘
    open(uzfile_name, "wb+").write(g_file.read())
    g_file.close()


def un_zip(path, zfile):
    file_path = path + os.sep + zfile
    if not zfile.endswith(‘.zip‘): return
    des_dir = path + os.sep + zfile[:zfile.index(‘.zip‘)]
    src_file = zipfile.ZipFile(file_path)
    for filename in src_file.namelist():
        if filename == "1011_UnitOAM_SOAP_Log.zip"                 or filename == "UnitOAM_SOAP_Log0.xml.gz":
            src_file.extract(filename, des_dir)
            un_zip(des_dir, filename)


# un_zip(‘.‘, ‘rflog_20210517_112739.zip‘)
# un_gz(r‘./rflog_20210517_112739/1011_UnitOAM_SOAP_Log/UnitOAM_SOAP_Log0.xml.gz‘)
# shutil.rmtree(r‘./rflog_20210517_112739/‘) careful


xml_list = []


def normalize_xml(xml_line):
    # reg_exp_node = re.compile(‘<timestamp value=‘ + ‘([\s\S]*Z/)>‘)
    # reg_exp_time = re.compile(‘\d[\s\S]*Z/‘)
    time_raw = re.compile(‘<timestamp value=‘ + ‘([\s\S]*Z/)>‘).findall(xml_line)[0]
    time_struck = time.strptime(time_raw, "%Y-%m-%dT%H:%M:%SZ/")
    time_normalize_str = datetime.datetime(*time_struck[:6])
    return re.compile(‘\d[\s\S]*Z/‘).sub(‘"‘ + str(time_normalize_str) + ‘"‘, xml_line) + ‘</timestamp>‘


with open(‘soap_log.xml‘) as f:
    for i in f:
        if i.startswith(‘<timestamp‘) and i.strip().endswith(‘Envelope>‘):
            xml_list.append(normalize_xml(i.strip()))


def find_in_header(xml_header, msg_from=None,msg_to=None):
    if not msg_from and not msg_to:
        return True
    # if msg_from is None:
    #     msg_from = ‘‘
    # if msg_to is None:
    #     msg_to = ‘‘
    msg_from = ‘‘ if msg_from is None else str(msg_from)
    msg_to = ‘‘ if msg_to is None else str(msg_to)
    from_ = xml_header.find(‘from‘).text
    to_ = xml_header.find(‘to‘).text
    return msg_from in from_ and msg_to in to_


def find_in_body(xml_node,
                 xml_tag_name,
                 xml_tag_attrib=None,
                 managed_parameter_change=None,
                 should_contain_list=None,
                 should_not_contain_list=None):

    # if xml_tag_attrib is None:
    #     xml_tag_attrib={}

    iter_node = xml_node.iter(xml_tag_name)
    for each_node in iter_node:
        bool_attrib = find_body_attrib(each_node, **xml_tag_attrib)
        # if bool_attrib: # node的attrib满足
        #     # print(each_node.attrib)
        #     if managed_parameter_change and each_node.tag == ‘managedObject‘:  # 查找 managedObject 参数变化
        #         # print(managed_parameter_change)
        #         res = find_body_parameter_change(each_node, **managed_parameter_change)
        #         print(res)
        # if xml_tag_attrib and bool_attrib: # 设置xml_tag_attrib后,contain就在对应满足的node中找
        bool_contain = find_body_contain(each_node, should_contain_list)                        and find_body_not_contain(each_node, should_not_contain_list)

        if managed_parameter_change and each_node.tag == ‘managedObject‘:  # 查找 managedObject 参数变化
            bool_parameter_change = find_body_parameter_change(each_node, **managed_parameter_change)

        if bool_attrib & bool_contain & bool_parameter_change:

            print(‘  bool_attrib:‘,bool_attrib)
            print(‘  bool_contain:‘,bool_contain)
            print(‘  bool_parameter_change:‘,bool_parameter_change)
            print(each_node.tag)
            print(each_node.attrib)
            print(each_node.text)




def find_body_contain(xml_node, contain_list):
    return not (contain_list
                and set(get_check_list(xml_node, contain_list)) != set(contain_list))


def find_body_not_contain(xml_node, contain_list):
    return not (contain_list
                and not set(get_check_list(xml_node, contain_list)).isdisjoint(set(contain_list)))


def get_check_list(xml_node, contain_list):
    check_list = []
    for contain in contain_list:
        if walk_text(xml_node, contain) or walk_tag(xml_node, contain):
            check_list.append(contain)
    return check_list


def walk_text(xml_node,node_text):
    text_iter = xml_node.itertext()
    for text in text_iter:
        if node_text==text: return True


def walk_tag(xml_node,node_name):
    node_iter = xml_node.iter(node_name)
    for name in node_iter:
        if node_name == name: return True


def find_body_parameter_change(managed_object_node, parameterName, newValue, prevValue=None):
    if managed_object_node.tag != ‘managedObject‘:
        raise Exception(‘agr must be managedObject node!‘)
    parameters = managed_object_node.findall(‘parameter‘)
    for parameter in parameters:
        # parameter_name = parameter.find(‘parameterName‘)
        # parameter_new_value = parameter.find(‘newValue‘)
        # parameter_prev_value = parameter.find(‘prevValue‘)
        if parameterName == parameter.find(‘parameterName‘).text                 and newValue == parameter.find(‘newValue‘).text                 and not (prevValue and prevValue != parameter.find(‘prevValue‘).text):
            return True
    return False


def find_body_attrib(xml_node, **expected):
    if not expected:
        return True
    # return set(xml_node.attrib.items()) == set(expected.items())
    for key in expected:
        if (key not in xml_node.attrib) or (expected[key] not in xml_node.attrib[key]):
            return False
    # print(True)
    # print(‘***expected‘,expected)
    # print(‘***xml_node.attrib‘,xml_node.attrib)
    return True



# & 使用
# def assert_dict(expected, result):
#     for key in expected:
#         if (key in result) & (result[key]==expected[key]):
#             print(‘测试通过‘)
#         else:
#             raise Exception(‘断言不通过‘)



xml_tag_attrib_ = {‘class‘: "TxArrayCarrier", ‘distName‘: ‘NR‘}
managed_parameter_change_={‘parameterName‘: "array", ‘newValue‘: ‘txArray2‘}
# managed_parameter_change = {‘parameterName‘: "TxArrayCarrier", ‘newValue‘: ‘NR‘, ‘prevValue‘: ‘‘}
should_contain_list_ = [‘bandwidth‘]
should_not_contain_list_ = [‘bandwidth1‘]

for line_xml in xml_list:
    root = ET.fromstring(line_xml)
    xml_time = root.get(‘value‘)
    # print(find_body_attrib(root, value=‘2021-05-17 03:28:07‘))
    xml_header = root[0][0]
    xml_body = root[0][1]

    res_header = find_in_header(xml_header,msg_from=‘RMOD‘)
    find_in_body(xml_body,
                 xml_tag_name=‘managedObject‘,
                 xml_tag_attrib=xml_tag_attrib_,
                 managed_parameter_change=managed_parameter_change_,
                 should_contain_list =should_contain_list_,
                 should_not_contain_list= should_not_contain_list_)
    # print(xml_time)

    # def find_in_body(xml_node,
    #                  xml_tag_name,
    #                  xml_tag_attrib=None,
    #                  managed_parameter_change=None,
    #                  should_contain_list=None,
    #                  should_not_contain_list=None):

    # for i in root:
    #     print(‘--‘,i)



# neighbor.text
# neighbor.tag
# neighbor.attrib



# print(xml_list)
#
# for i in xml_list:
#     print(i)
#     print(‘**********‘)








python解析soap消息

原文:https://www.cnblogs.com/amize/p/14786206.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!