首页 > 其他 > 详细

SDN实验---Ryu的应用开发(三)流量监控

时间:2019-10-30 13:49:05      阅读:90      评论:0      收藏:0      [点我收藏+]

一:实现流量监控

技术分享图片

 (一)流量监控原理

技术分享图片

其中控制器向交换机周期下发获取统计消息,请求交换机消息------是主动下发过程
流速公式:是(t1时刻的流量-t0时刻的流量)/(t1-t0)
剩余带宽公式:链路总带宽-流速--------是这一个这一个,例如s2-s3(不是一条,例如:h1->s1->s2->s3->h2)的剩余带宽
路径有效带宽是只:这一整条路径中,按照最小的剩余带宽处理

技术分享图片

二:代码实现

(一)代码框架

from ryu.app import simple_switch_13
from ryu.controller.handler import set_ev_cls
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,DEAD_DISPATCHER

class MyMonitor(simple_switch_13):    #simple_switch_13 is same as the last experiment which named self_learn_switch
    ‘‘‘
    design a class to achvie managing the quantity of flow
    ‘‘‘

    def __init__(self,*args,**kwargs):
        super(MyMonitor,self).__init__(*args,**kwargs)

    @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER,DEAD_DISPATCHER])
    def _state_change_handler(self,ev):
        ‘‘‘
        design a handler to get switch state transition condition
        ‘‘‘
        pass

    def _monitor(self):
        ‘‘‘
        design a monitor on timing system to request switch infomations about port and flow
        ‘‘‘
        pass

    def _request_stats(self,datapath):
        ‘‘‘
        the function is to send requery to datapath
        ‘‘‘
        pass

    @set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER)
    def _port_stats_reply_handler(self,ev):
        ‘‘‘
        monitor to require the port state, then this function is to get infomation for port`s info
        ‘‘‘
        pass

    @set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER)
    def _port_stats_reply_handler(self,ev):
        ‘‘‘
        monitor to require the flow state, then this function is to get infomation for flow`s info
        ‘‘‘
        pass

(二)推文:协程https://www.cnblogs.com/ssyfj/p/9030165.html

(三)全部代码实现

from operator import attrgetter

from ryu.app import simple_switch_13
from ryu.controller.handler import set_ev_cls
from ryu.controller import ofp_event
from ryu.controller.handler import MAIN_DISPATCHER,DEAD_DISPATCHER
from ryu.lib import hub

class MyMonitor(simple_switch_13.SimpleSwitch13):    #simple_switch_13 is same as the last experiment which named self_learn_switch
    ‘‘‘
    design a class to achvie managing the quantity of flow
    ‘‘‘

    def __init__(self,*args,**kwargs):
        super(MyMonitor,self).__init__(*args,**kwargs)
        self.datapaths = {}
        #use gevent to start monitor
        self.monitor_thread = hub.spawn(self._monitor)

    @set_ev_cls(ofp_event.EventOFPStateChange,[MAIN_DISPATCHER,DEAD_DISPATCHER])
    def _state_change_handler(self,ev):
        ‘‘‘
        design a handler to get switch state transition condition
        ‘‘‘
        #first get ofprocotol info
        datapath = ev.datapath
        ofproto = datapath.ofproto
        ofp_parser = datapath.ofproto_parser

        #judge datapath`s status to decide how to operate
        if datapath.state == MAIN_DISPATCHER:    #should save info to dictation 
            if datapath.id not in self.datapaths:
                self.datapaths[datapath.id] = datapath
                self.logger.debug("Regist datapath: %16x",datapath.id)
        elif datapath.state == DEAD_DISPATCHER:    #should remove info from dictation
            if datapath.id in self.datapaths:
                del self.datapaths[datapath.id]
                self.logger.debug("Unregist datapath: %16x",datapath.id)


    def _monitor(self):
        ‘‘‘
        design a monitor on timing system to request switch infomations about port and flow
        ‘‘‘
        while True:    #initiatie to request port and flow info all the time
            for dp in self.datapaths.values():
                self._request_stats(dp)
            hub.sleep(5)    #pause to sleep to wait reply, and gave time to other gevent to request

    def _request_stats(self,datapath):
        ‘‘‘
        the function is to send requery to datapath
        ‘‘‘
        self.logger.debug("send stats reques to datapath: %16x for port and flow info",datapath.id)

        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser

        req = parser.OFPFlowStatsRequest(datapath)
        datapath.send_msg(req)

        req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)
        datapath.send_msg(req)


    @set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER)
    def _port_stats_reply_handler(self,ev):
        ‘‘‘
        monitor to require the port state, then this function is to get infomation for port`s info
        print("6666666666port info:")
        print(ev.msg)
        print(dir(ev.msg))
        ‘‘‘
        body = ev.msg.body
        self.logger.info(datapath             port     
                        rx_packets            tx_packets
                        rx_bytes            tx_bytes
                        rx_errors            tx_errors
                        )
        self.logger.info(---------------    --------
                        --------    --------
                        --------    --------
                        --------    --------
                        )
        for port_stat in sorted(body,key=attrgetter(port_no)):
                self.logger.info(%016x %8x %8d %8d %8d %8d %8d %8d,
                    ev.msg.datapath.id,port_stat.port_no,port_stat.rx_packets,port_stat.tx_packets,
                    port_stat.rx_bytes,port_stat.tx_bytes,port_stat.rx_errors,port_stat.tx_errors
                        )


    @set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER)
    def _flow_stats_reply_handler(self,ev):
        ‘‘‘
        monitor to require the flow state, then this function is to get infomation for flow`s info
        print("777777777flow info:")
        print(ev.msg)
        print(dir(ev.msg))
        ‘‘‘
        body = ev.msg.body

        self.logger.info(datapath             
                        in_port            eth_src
                        out_port            eth_dst
                        packet_count        byte_count
                        )
        self.logger.info(---------------    
                        ----    -----------------
                        ----    -----------------
                        ---------    ---------
                        )
        for flow_stat in sorted([flow for flow in body if flow.priority==1],
                        key=lambda flow:(flow.match[in_port],flow.match[eth_src])):
                self.logger.info(%016x    %8x    %17s    %8x    %17s    %8d    %8d,
                    ev.msg.datapath.id,flow_stat.match[in_port],flow_stat.match[eth_src],
                    flow_stat.instructions[0].actions[0].port,flow_stat.match[eth_dst],
                    flow_stat.packet_count,flow_stat.byte_count
                        )

补充:注意---每个事件的属性可能不同,需要我们进行Debug,例如上面就出现了ev.msg.body(之前hub实现中没有)

(四)代码讲解

1.class MyMonitor(simple_switch_13.SimpleSwitch13):

simple_switch_13.SimpleSwitch13是样例代码,其中实现了和我们上一次实验中,自学习交换机类似的功能
(稍微多了个关于交换机是否上传全部packet还是只上传buffer_id),所以我们直接继承,可以减少写代码时间

2.协程实现伪并发self.monitor_thread = hub.spawn(self._monitor)

    def __init__(self,*args,**kwargs):
        super(MyMonitor,self).__init__(*args,**kwargs)
        self.datapaths = {}
        #use gevent to start monitor
        self.monitor_thread = hub.spawn(self._monitor)

3.在协程中实现周期请求交换机信息

    def _monitor(self):
        ‘‘‘
        design a monitor on timing system to request switch infomations about port and flow
        ‘‘‘
        while True:    #initiatie to request port and flow info all the time
            for dp in self.datapaths.values():
                self._request_stats(dp)
            hub.sleep(5)    #pause to sleep to wait reply, and gave time to other gevent to request

4.主动下发消息,请求交换机信息OFPFlowStatsRequest------注意:我们这里请求两个(端口和协议信息),所以我们要使用两个函数来分别处理port和flow响应

    def _request_stats(self,datapath):
        ‘‘‘
        the function is to send requery to datapath
        ‘‘‘
        self.logger.debug("send stats reques to datapath: %16x for port and flow info",datapath.id)

        ofproto = datapath.ofproto
        parser = datapath.ofproto_parser

        req = parser.OFPFlowStatsRequest(datapath)
        datapath.send_msg(req)

        req = parser.OFPPortStatsRequest(datapath, 0, ofproto.OFPP_ANY)  #可以向上面一样省略默认参数
        datapath.send_msg(req)

源码查看参数

@_set_stats_type(ofproto.OFPMP_FLOW, OFPFlowStats)
@_set_msg_type(ofproto.OFPT_MULTIPART_REQUEST)
class OFPFlowStatsRequest(OFPFlowStatsRequestBase):
    """
    Individual flow statistics request message

    The controller uses this message to query individual flow statistics.

    ================ ======================================================
    Attribute        Description
    ================ ======================================================
    flags            Zero or ``OFPMPF_REQ_MORE``
    table_id         ID of table to read
    out_port         Require matching entries to include this as an output
                     port
    out_group        Require matching entries to include this as an output
                     group
    cookie           Require matching entries to contain this cookie value
    cookie_mask      Mask used to restrict the cookie bits that must match
    match            Instance of ``OFPMatch``
    ================ ======================================================

    Example::

        def send_flow_stats_request(self, datapath):
            ofp = datapath.ofproto
            ofp_parser = datapath.ofproto_parser

            cookie = cookie_mask = 0
            match = ofp_parser.OFPMatch(in_port=1)
            req = ofp_parser.OFPFlowStatsRequest(datapath, 0,
                                                 ofp.OFPTT_ALL,
                                                 ofp.OFPP_ANY, ofp.OFPG_ANY,
                                                 cookie, cookie_mask,
                                                 match)
            datapath.send_msg(req)
    """

    def __init__(self, datapath, flags=0, table_id=ofproto.OFPTT_ALL,
                 out_port=ofproto.OFPP_ANY,
                 out_group=ofproto.OFPG_ANY,
                 cookie=0, cookie_mask=0, match=None, type_=None):

5.获取端口响应信息ofp_event.EventOFPPortStatsReply

    @set_ev_cls(ofp_event.EventOFPPortStatsReply,MAIN_DISPATCHER)
    def _port_stats_reply_handler(self,ev):
        ‘‘‘
        monitor to require the port state, then this function is to get infomation for port`s info
        print("6666666666port info:")
        print(ev.msg)
        print(dir(ev.msg))
        ‘‘‘
        body = ev.msg.body
        self.logger.info(datapath             port     
                        rx_packets            tx_packets
                        rx_bytes            tx_bytes
                        rx_errors            tx_errors
                        )
        self.logger.info(---------------    --------
                        --------    --------
                        --------    --------
                        --------    --------
                        )
        for port_stat in sorted(body,key=attrgetter(port_no)):
                self.logger.info(%016x %8x %8d %8d %8d %8d %8d %8d,
                    ev.msg.datapath.id,port_stat.port_no,port_stat.rx_packets,port_stat.tx_packets,
                    port_stat.rx_bytes,port_stat.tx_bytes,port_stat.rx_errors,port_stat.tx_errors
                        )

端口信息:《参考》

6666666666port info:
version=0x4,msg_type=0x13,msg_len=0x1d0,xid=0x8dcd9187,
OFPPortStatsReply(
    body=[
OFPPortStats(port_no=4294967294,rx_packets=0,tx_packets=0,rx_bytes=0,tx_bytes=0,rx_dropped=65,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=331000000), OFPPortStats(port_no=1,rx_packets=154,tx_packets=225,rx_bytes=11660,tx_bytes=19503,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=333000000), OFPPortStats(port_no=2,rx_packets=186,tx_packets=257,rx_bytes=14516,tx_bytes=22343,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=334000000), OFPPortStats(port_no=3,rx_packets=220,tx_packets=232,rx_bytes=18439,tx_bytes=19311,rx_dropped=0,tx_dropped=0,rx_errors=0,tx_errors=0,rx_frame_err=0,rx_over_err=0,rx_crc_err=0,collisions=0,duration_sec=1912,duration_nsec=333000000)
]
,flags=0,type=4)


OFPPortStats(
port_no=4294967294,                ----------
rx_packets=0,                    ----------
tx_packets=0,                    ----------
rx_bytes=0,                    ----------
tx_bytes=0,                    ----------
rx_dropped=65,
tx_dropped=0,
rx_errors=0,                    ----------
tx_errors=0,                    ----------
rx_frame_err=0,
rx_over_err=0,
rx_crc_err=0,
collisions=0,
duration_sec=1912,
duration_nsec=331000000)

[_STATS_MSG_TYPES, _TYPE, __class__, __delattr__, __dict__, __dir__, __doc__, __eq__, __format__, __ge__, __getattribute__, __gt__, __hash__, __init__, __init_subclass__, __le__, __lt__, __module__, __ne__, __new__, __reduce__, __reduce_ex__, __repr__, __setattr__, __sizeof__, __str__, __subclasshook__, __weakref__, _base_attributes, _class_prefixes, _class_suffixes, _decode_value, _encode_value, _get_decoder, _get_default_decoder, _get_default_encoder, _get_encoder, _get_type, _is_class, _opt_attributes, _restore_args, _serialize_body, _serialize_header, _serialize_pre, body, buf, cls_body_single_struct, cls_from_jsondict_key, cls_msg_type, cls_stats_body_cls, cls_stats_type,
datapath                    ----------
, flags, from_jsondict, msg_len, msg_type, obj_from_jsondict, parser, parser_stats, parser_stats_body, register_stats_type, serialize, set_buf, set_classes, set_headers, set_xid, stringify_attrs, to_jsondict, type, version, xid]

6.获取flow协议响应信息ofp_event.EventOFPFlowStatsReply

    @set_ev_cls(ofp_event.EventOFPFlowStatsReply,MAIN_DISPATCHER)
    def _flow_stats_reply_handler(self,ev):
        ‘‘‘
        monitor to require the flow state, then this function is to get infomation for flow`s info
        print("777777777flow info:")
        print(ev.msg)
        print(dir(ev.msg))
        ‘‘‘
        body = ev.msg.body

        self.logger.info(datapath             
                        in_port            eth_src
                        out_port            eth_dst
                        packet_count        byte_count
                        )
        self.logger.info(---------------    
                        ----    -----------------
                        ----    -----------------
                        ---------    ---------
                        )
        for flow_stat in sorted([flow for flow in body if flow.priority==1],
                        key=lambda flow:(flow.match[in_port],flow.match[eth_src])):
                self.logger.info(%016x    %8x    %17s    %8x    %17s    %8d    %8d,
                    ev.msg.datapath.id,flow_stat.match[in_port],flow_stat.match[eth_src],
                    flow_stat.instructions[0].actions[0].port,flow_stat.match[eth_dst],
                    flow_stat.packet_count,flow_stat.byte_count
                        )

        

协议信息《参考》

777777777flow info:
version=0x4,msg_type=0x13,msg_len=0x200,xid=0x9e448a1a,
OFPFlowStatsReply(
    body=[
        OFPFlowStats(byte_count=5446,cookie=0,duration_nsec=552000000,duration_sec=1893,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=1,type=0)],len=24,type=4)],
        length=104,match=OFPMatch(oxm_fields={in_port: 2, eth_src: 8a:06:6a:2c:10:fc, eth_dst: 26:20:2f:85:5a:9a}),packet_count=71,priority=1,table_id=0),     OFPFlowStats(byte_count=5348,cookie=0,duration_nsec=549000000,duration_sec=1893,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=2,type=0)],len=24,type=4)],
        length=104,match=OFPMatch(oxm_fields={in_port: 1, eth_src: 26:20:2f:85:5a:9a, eth_dst: 8a:06:6a:2c:10:fc}),packet_count=70,priority=1,table_id=0), OFPFlowStats(byte_count=8302,cookie=0,duration_nsec=438000000,duration_sec=1887,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=1,type=0)],len=24,type=4)],
        length=104,match=OFPMatch(oxm_fields={in_port: 2, eth_src: ca:9e:a1:af:b9:5f, eth_dst: 26:20:2f:85:5a:9a}),packet_count=103,priority=1,table_id=0), OFPFlowStats(byte_count=8204,cookie=0,duration_nsec=436000000,duration_sec=1887,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65509,port=2,type=0)],len=24,type=4)]
        ,length=104,match=OFPMatch(oxm_fields={in_port: 1, eth_src: 26:20:2f:85:5a:9a, eth_dst: ca:9e:a1:af:b9:5f}),packet_count=102,priority=1,table_id=0), OFPFlowStats(byte_count=6739,cookie=0,duration_nsec=807000000,duration_sec=9,flags=0,hard_timeout=0,idle_timeout=0,instructions=[OFPInstructionActions(actions=[OFPActionOutput(len=16,max_len=65535,port=4294967293,type=0)],len=24,type=4)],
        length=80,match=OFPMatch(oxm_fields={}),packet_count=74,priority=0,table_id=0)
]
,flags=0,type=1)


OFPFlowStats(
byte_count=5446,                ----------
cookie=0,
duration_nsec=552000000,
duration_sec=1893,
flags=0,
hard_timeout=0,
idle_timeout=0,
instructions=[
    OFPInstructionActions(
        actions=[
            OFPActionOutput(
                len=16,
                max_len=65509,
                port=1,        ----------
                type=0)
            ],
            len=24,
            type=4
    )
],
length=104,
match=OFPMatch(oxm_fields={
    in_port: 2,                 ----------
    eth_src: 8a:06:6a:2c:10:fc,     ----------
    eth_dst: 26:20:2f:85:5a:9a        ----------
}),
packet_count=71,                ----------
priority=1,
table_id=0
)

[_STATS_MSG_TYPES, _TYPE, __class__, __delattr__, __dict__, __dir__, __doc__, __eq__, __format__, __ge__, __getattribute__, __gt__, __hash__, __init__, __init_subclass__, __le__, __lt__, __module__, __ne__, __new__, __reduce__, __reduce_ex__, __repr__, __setattr__, __sizeof__, __str__, __subclasshook__, __weakref__, _base_attributes, _class_prefixes, _class_suffixes, _decode_value, _encode_value, _get_decoder, _get_default_decoder, _get_default_encoder, _get_encoder, _get_type, _is_class, _opt_attributes, _restore_args, _serialize_body, _serialize_header, _serialize_pre, body, buf, cls_body_single_struct, cls_from_jsondict_key, cls_msg_type, cls_stats_body_cls, cls_stats_type, 
datapath                    ----------
, flags, from_jsondict, msg_len, msg_type, obj_from_jsondict, parser, parser_stats, parser_stats_body, register_stats_type, serialize, set_buf, set_classes, set_headers, set_xid, stringify_attrs, to_jsondict, type, version, xid]

三:实验演示

(一)开启Ryu

ryu-manager my_monitor.py

技术分享图片 

(二)开启Mininet

sudo mn --topo=tree,2,2 --controller=remote --mac 

技术分享图片

技术分享图片 

(三)Ryu显示结果

技术分享图片 

(四)还需要去了解返回的字段含义才可以---以后做,最近没时间了 

SDN实验---Ryu的应用开发(三)流量监控

原文:https://www.cnblogs.com/ssyfj/p/11755773.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!