网络编程就是要通过网络来访问另一台计算机的数据,这样必然需要至少两台计算机,一台计算机存放用于分享的数据和用于分享数据的程序,另一台计算机上运行访问数据的程序。提供数据的一方称为服务器(Server),访问数据的一方称为客户端(Client).
指的是客户端/服务器架构。(B/S架构也是C/S架构的一种,只不过客户端是浏览器)
1.物理连接介质
包括网线,无线电,光纤等。
2.通信协议
什么是协议?协议指的是标准,大家要遵循相同的标准才能正常交流通讯。
OSI七层模型,开放式系统互联参考模型.把整个通信过程分为七层.
计算机之间必须完成组网
物理层功能:主要是基于电器特性发送高低电压(电信号),高电压对应数字1,低电压对应数字0。
数据链路层由来:单纯的电信号0和1没有任何意义,必须规定电信号多少位一组,每组什么意思。
数据链路层的功能:定义了电信号的分组方式。
定义了统一的分组方式,及以太网协议ethernet
:
ethernet 规定:
head包含:(固定18个字节)
data包含:(最短46字节,最长1500字节)
head长度+data长度=最短64字节,最长1518字节,超过最大限制就分片发送
head中包含的源和目标地址由来:ethernet规定接入internet的设备都必须具备网卡,发送端和接收端的地址便是指网卡的地址,即mac地址
mac地址:每块网卡出厂时都被烧制上一个世界唯一的mac地址,长度为48位2进制,通常由12位16进制数表示(前六位是厂商编号,后六位是流水线号)
有了mac地址,同一网络内的两台主机就可以通信了(一台主机通过arp协议获取另外一台主机的mac地址)
ethernet采用最原始的方式,广播的方式进行通信,即计算机通信基本靠吼
网络层由来:有了ethernet、mac地址、广播的发送方式,世界上的计算机就可以彼此通信了,问题是世界范围的互联网是由 一个个彼此隔离的小的局域网组成的,那么如果所有的通信都采用以太网的广播方式,那么一台机器发送的包全世界都会收到, 引发广播风暴
上图结论:必须找出一种方法来区分哪些计算机属于同一广播域,哪些不是,如果是就采用广播的方式发送,如果不是,就采用路由的方式(向不同广播域/子网分发数据包),mac地址是无法区分的,它只跟厂商有关
网络层功能:引入一套新的地址来区分不同的广播域/子网,这套地址即网络地址.
ip地址分成两部分
注意:单纯的ip地址段只是标识了ip地址的种类,从网络部分或主机部分都无法辨识一个ip所处的子网
例:172.16.10.1与172.16.10.2并不能确定二者处于同一子网
子网掩码
所谓”子网掩码”,就是表示子网络特征的一个参数。它在形式上等同于IP地址,也是一个32位二进制数字,它的网络部分全部为1,主机部分全部为0。比如,IP地址172.16.10.1,如果已知网络部分是前24位,主机部分是后8位,那么子网络掩码就是11111111.11111111.11111111.00000000,写成十进制就是255.255.255.0。
知道”子网掩码”,我们就能判断,任意两个IP地址是否处在同一个子网络。方法是将两个IP地址与子网掩码分别进行AND运算(两个数位都为1,运算结果为1,否则为0),然后比较结果是否相同,如果是的话,就表明它们在同一个子网络中,否则就不是。
比如,已知IP地址172.16.10.1和172.16.10.2的子网掩码都是255.255.255.0,请问它们是否在同一个子网络?两者与子网掩码分别进行AND运算,
172.16.10.1:10101100.00010000.00001010.000000001
255255.255.255.0:11111111.11111111.11111111.00000000
AND运算得网络地址结果:10101100.00010000.00001010.000000001->172.16.10.0
172.16.10.2:10101100.00010000.00001010.000000010
255255.255.255.0:11111111.11111111.11111111.00000000
AND运算得网络地址结果:10101100.00010000.00001010.000000001->172.16.10.0
结果都是172.16.10.0,因此它们在同一个子网络。
总结一下,IP协议的作用主要有两个,一个是为每一台计算机分配IP地址,另一个是确定哪些地址在同一个子网络。
ip数据包
ip数据包也分为head和data部分,无须为ip包定义单独的栏位,直接放入以太网包的data部分
head:长度为20到60字节
data:最长为65,515字节。
而以太网数据包的”数据”部分,最长只有1500字节。因此,如果IP数据包超过了1500字节,它就需要分割成几个以太网数据包,分开发送了。
以太网头 | ip 头 | ip数据 |
---|---|---|
arp协议由来:计算机通信基本靠吼,即广播的方式,所有上层的包到最后都要封装上以太网头,然后通过以太网协议发送,在谈及以太网协议时候,我们了解到
通信是基于mac的广播方式实现,计算机在发包时,获取自身的mac是容易的,如何获取目标主机的mac,就需要通过arp协议
arp协议功能:广播的方式发送数据包,获取目标主机的mac地址
协议工作方式
每台主机ip都是已知的
例如:主机172.16.10.10/24访问172.16.10.11/24
a:首先通过ip地址和子网掩码区分出自己所处的子网
场景 | 数据包地址 |
---|---|
同一子网 | 目标主机mac,目标主机ip |
不同子网 | 网关mac,目标主机ip |
b:分析172.16.10.10/24与172.16.10.11/24处于同一网络(如果不是同一网络,那么下表中目标ip为172.16.10.1,通过arp获取的是网关的mac)
源mac | 目标mac | 源ip | 目标ip | 数据部分 | |
---|---|---|---|---|---|
发送端主机 | 发送端mac | FF:FF:FF:FF:FF:FF | 172.16.10.10/24 | 172.16.10.11/24 | 数据 |
c:这个包会以广播的方式在发送端所处的自网内传输,所有主机接收后拆开包,发现目标ip为自己的,就响应,返回自己的mac。
传输层的由来:网络层的ip帮我们区分子网,以太网层的mac帮我们找到主机,然后大家使用的都是应用程序,你的电脑上可能同时开启qq,暴风影音,等多个应用程序,
那么我们通过ip和mac找到了一台特定的主机,如何标识这台主机上的应用程序,答案就是端口,端口即应用程序与网卡关联的编号。
传输层功能:建立端口到端口的通信
补充:端口范围0-65535,0-1023为系统占用端口
开发中常用软件的默认端口号:
mysql: 3306
mongodb: 27017
Django: 8000
Tomcat: 8080
Flask: 5000
Redis: 6379
可靠传输,TCP数据包没有长度限制,理论上可以无限长,但是为了保证网络的效率,通常TCP数据包的长度不会超过IP数据包的长度,以确保单个TCP数据包不必再分割。
以太网头 | ip 头 | tcp头 | 数据 |
---|---|---|---|
-建立双向通道,建立好连接。
- listen: 监听
- established: 确认请求建立连接
- 发送数据:
write
read
客户端往服务端发送数据,数据存放在内存中,需要服务端确认收到,数据才会在内存中释放掉。
否则,会隔一段时间发送一次,让服务端返回确认收到。
在一段时间内,若服务端还是不返回确认收到,则取消发送。并释放内存中的数据。
为什么只有三次握手才能确认双方接收发送正常
第一次握手:客户端发送网络包,服务端收到了。
这样服务端就能得出结论:客户端的发送能力、服务端的接收能力 是正常的。
第二次握手:服务端发包,客户端收到了。
这样客户端就能得出结论:服务端的接收、发送能力,客户端的接 收、发送能力是正常的。 不过此时服务器并不能确认客户端的接收能力是否正常。
第三次握手:客户端发包,服务端收到了。
这样服务端就能得出结论:客户端的接收、发送能力正常,服务器 自己的发送、接收能力也正常。
因此,需要三次握手才能确认双方的接收与发送能力是否正常。
刚开始双方都处于 establised 状态,假如是客户端先发起关闭请求,则:
- 第一次挥手:客户端发送一个 FIN 报文,报文中会指定一个序列号。此时客户端处于 FIN_WAIT1 状态。
- 第二次握手:服务端收到 FIN 之后,会发送 ACK 报文,且把客户端的序列号值 +1 作为 ACK 报文的序列号值,表明已经收到客户端的报文了,此时服务端处于 CLOSE_WAIT 状态。
- 第三次挥手:如果服务端也想断开连接了,和客户端的第一次挥手一样,发给 FIN 报文,且指定一个序列号。此时服务端处于 LAST_ACK 的状态。
- 第四次挥手:客户端收到 FIN 之后,一样发送一个 ACK 报文作为应答,且把服务端的序列号值 +1 作为自己 ACK 报文的序列号值,此时客户端处于 TIME_WAIT 状态。
需要过一阵子以确保服务端收到自己的 ACK 报文之后才会进入 CLOSED 状态
- 服务端收到 ACK 报文之后,就处于关闭连接了,处于 CLOSED 状态。
为什么客户端发送 ACK 之后不直接关闭,而是要等一阵子才关闭。
要确保服务器是否已经收到了我们的 ACK 报文,如果没有收到的话,服务器会重新发 FIN 报文给客户端,客户端再次收到 ACK 报文之后,就知道之前的 ACK 报文丢失了,然后再次发送 ACK 报文。
至于 TIME_WAIT 持续的时间至少是一个报文的来回时间。一般会设置一个计时,如果过了这个计时没有再次收到 FIN 报文,则代表对方成功,就是 ACK 报文,此时处于 CLOSED 状态。
不可靠传输,”报头”部分一共只有8个字节,总长度不超过65,535字节,正好放进一个IP数据包。
以太网头 | ip头 | udp头 | 数据 |
---|---|---|---|
应用层由来:用户使用的都是应用程序,均工作于应用层,互联网是开发的,大家都可以开发自己的应用程序,数据多种多样,必须规定好数据的组织形式
应用层功能:规定应用程序的数据格式。
例:TCP协议可以为各种各样的程序传递数据,比如Email、WWW、FTP等等。那么,必须有不同协议规定电子邮件、网页、FTP数据的格式,这些应用程序协议就构成了”应用层”。
Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。
在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议。
所以,我们无需深入理解tcp/udp协议,socket已经为我们封装好了,我们只需要遵循socket的规定去编程,写出的程序自然就是遵循tcp/udp标准的。
我们知道两个进程如果需要进行通讯最基本的一个前提能能够唯一的标示一个进程,在本地进程通讯中我们可以使用PID来唯一标示一个进程,但PID只在本地唯一,网络中的两个进程PID冲突几率很大,这时候我们需要另辟它径了,我们知道IP层的ip地址可以唯一标示主机,而TCP层协议和端口号可以唯一标示主机的一个进程,这样我们可以利用ip地址+协议+端口号
唯一标示网络中的一个进程。
能够唯一标示网络中的进程后,它们就可以利用socket进行通信了,什么是socket呢?我们经常把socket翻译为套接字,socket是在应用层和传输层之间的一个抽象层,它把TCP/IP层复杂的操作抽象为几个简单的接口供应用层调用已实现进程在网络中通信。
socket起源于UNIX,在Unix一切皆文件哲学的思想下,socket是一种"打开—读/写—关闭"模式的实现,服务器和客户端各自维护一个"文件",在建立连接打开后,可以向自己文件写入内容供对方读取或者读取对方内容,通讯结束时关闭文件。
服务端套接字函数
s.bind() 绑定(主机,端口号)到套接字
s.listen() 开始TCP监听
s.accept() 被动接受TCP客户的连接,(阻塞式)等待连接的到来
客户端套接字函数
s.connect() 主动初始化TCP服务器连接
s.connect_ex() connect()函数的扩展版本,出错时返回出错码,而不是抛出异常
公共用途的套接字函数
s.recv() 接收TCP数据
s.send() 发送TCP数据(send在待发送数据量大于己端缓存区剩余空间时,数据丢失,不会发完)
s.sendall() 发送完整的TCP数据(本质就是循环调用send,sendall在待发送数据量大于己端缓存区剩余空间时,数据不丢失,循环调用send直到发完)
s.recvfrom() 接收UDP数据
s.sendto() 发送UDP数据
s.getpeername() 连接到当前套接字的远端的地址
s.getsockname() 当前套接字的地址
s.getsockopt() 返回指定套接字的参数
s.setsockopt() 设置指定套接字的参数
s.close() 关闭套接字
面向锁的套接字方法
s.setblocking() 设置套接字的阻塞与非阻塞模式
s.settimeout() 设置阻塞套接字操作的超时时间
s.gettimeout() 得到阻塞套接字操作的超时时间
面向文件的套接字的函数
s.fileno() 套接字的文件描述符
s.makefile() 创建一个与该套接字相关的文件
server.py
import socket
soc = socket.socket()
soc.bind(('192.168.12.64', 50000)) # 指定ip和端口号,其中这两个元素是一个元组
soc.listen() # 监听接口
client, address = soc.accept() # 接收客户端的连接请求
data = client.recv(1024) # 接收数据,都是bytes类型
print(data.decode('utf-8')) # 打印接收到的数据信息
client.send('hello'.encode('utf-8')) # 发送数据
soc.close() # 关闭socket,回收资源
# 在服务器中接收和发送数据都是由客户端的socket来完成,服务器的socket只是用来处理连接
client.py
import socket
client = socket.socket() # 创建socket对象
client.connect(('192.168.12.64', 50000)) # 连接服务器,就是在做三次握手
client.send('1234456'.encode('utf-8')) # 发送数据
data = client.recv(1024) # 接收数据
print(data.decode('utf-8')) # 打印数据
client.close() # 关闭连接
'''server服务端'''
import socket
# 1.获得socket对象server,默认指定TCP协议
server = socket.socket()
# 2.传入服务端的(IP + 端口)
server.bind(
('127.0.0.1',8848) #(用户回环地址,自定义端口值)
)
# 3.开始监听
server.listen(5) # listen(5) 半连接池
# 4.监听是否发送消息,并查看客户端的地址
# conn :服务端至客户端的管道 addr:客户端的地址
conn,addr= server.accept()
# print(addr)
# 5.服务端接受客户端消息,需解码(设定可以接受字节大小数据)
data = conn.recv(1024).decode('utf-8')
print(data)
# 6.服务端通过单向管道向客户端发送消息
conn.send(('我也好').encode('utf-8'))
# 7.服务端通道关闭
conn.close()
# 8.关闭服务器
server.close()
'''
('127.0.0.1', 49941)
你好'''
'''client客户端'''
import socket
# 1.获得socket对象client
client = socket.socket()
# 2.寻找服务端地址 (服务端的IP + 端口号)
# client:相当于客户端至服务端的单向通道
client.connect(
('127.0.0.1',8848) # (IP + port )寻找服务端
)
# 3.客户端向服务端发送消息(客户端主动)
client.send(('你好').encode('utf-8'))
# 4.客户端接受服务端的消息,设定接受数据字节大小
data = client.recv(1024).decode('utf-8')
print(data)
# 5.客户端主动断开连接
client.close()
'''我也好'''
'''server服务端'''
import socket
# 1.获得socket对象server,默认指定TCP协议
server = socket.socket()
# 2.传入服务端的(IP + 端口)
server.bind(
('127.0.0.1',8848) #(用户回环地址,自定义端口值)
)
# 3.开始监听
server.listen(5) # listen(5) 半连接池
# 4.监听是否发送消息,并查看客户端的地址
# conn :服务至客户的管道 addr:客户端的地址
conn,addr= server.accept()
print(addr)
# 5.打印时进行循环
while True:
# 服务端接受客户端消息,需解码(设定可以接受字节大小数据)
data = conn.recv(1024).decode('utf-8')
print(data)
# 判断客户端发送的是否为q 就退出
if data == 'q':
break
# 6.服务端通过单向管道向客户端发送消息,自定义人工输入
say = input('请输入向客户端发送的消息:').encode('utf-8')
conn.send(say)
# 7.服务端通道关闭
conn.close()
'''client客户端'''
import socket
# 1.获得socket对象client
client = socket.socket()
# 2.寻找服务端地址 (服务端的IP + 端口号)
# client:相当于客户端至服务端的单向通道
client.connect(
('127.0.0.1',8848) # (IP + port )寻找服务端
)
# 3.发送消息时进行循环
while True:
# 自定义输入消息
msg = input('请输入客户端向服务端发送的消息:')
# 4.客户端向服务端发送消息(客户端主动)
client.send(msg.encode('utf-8'))
# 判断输入为q,则主动退出
if msg == 'q':
break
# 5.不是q,则客户端接受服务端的消息,设定接受数据字节大小
data = client.recv(1024).decode('utf-8')
print(data)
# 6.客户端主动断开连接
client.close()
import socket
# 1获取对象
s = socket.socket()
# 2 地标
s.bind(
('127.0.0.1',8848)
)
# 3.监听
s.listen(5)
while True:
# 4.建立管道
conn, addr = s.accept()
# 5.接受信息
while True:
try:
data = conn.recv(1024).decode('utf-8')
print(data)
# 判断bug
if len(data) == 0:
continue
if data == 'q':
break
# 6.发送信息
conn.send(data.encode('utf-8'))
except Exception as e:
print(e)
break
# 7.关闭通道
conn.close()
import socket
# 1.对象
c = socket.socket()
# 2.获取地址,建立通道
c.connect(
('127.0.0.1',8848)
)
while True:
# 3.发送信息
msg = input('客户端到用户端:')
c.send(msg.encode('utf-8'))
if msg =='q':
break
# 4.接收信息
data = c.recv(1024).decode('utf-8')
print(data)
c.close()
-------------------socket.serve-----------
import socket
?
# 买手机 默认得到是一个TCP的socket
server = socket.socket()
?
# 两行代码的效果是一样的
# socket的家族 AF_INET
# socket的类型
# SOCK_STREAM 对应的是TCP SOCK_DGRAM 对应的是UDP
?
# server = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
# 创建基于UDP的socket
# server = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
?
?
server.bind(("127.0.0.1",16888)) # 绑定手机卡
?
server.listen() # 开始待机
?
# 连接循环 可以不断接受新连接
while True:
client, addr = server.accept()
?
# 通讯循环 可以不断的收发数据
while True:
try:
# 如果是windows 对方强行关闭连接 会抛出异常
# 如果是linux 不会抛出异常 会死循环收到空的数据包
data = client.recv(1024)
if not data:
client.close()
break
?
print("收到客户端发来的数据:%s" % data.decode("utf-8"))
client.send(data)
except ConnectionResetError:
print("客户端强行关闭了连接")
client.close()
break
client.close() #挂断电话
server.close() # 关机
?
---------------socket.client--------------
import socket
?
client = socket.socket()
client.connect(("127.0.0.1",16888))
while True:
msg = input(">:")
client.send(msg.encode("utf-8"))
data = client.recv(1024)
print("收到服务器:%s" % data.decode("utf-8"))
client.close()
------------------socket.serve--------------
import socket
?
# UDP协议 在创建socket是 只有一个类型不同
server = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,proto=0)
server.bind(("127.0.0.1",8888))
?
while True:
data,addr = server.recvfrom(1024) # 阻塞 直到收到数据为止
print("收到来自%s的消息:%s" % (data.decode("utf-8"),addr[0]))
# 返回值为 数据 和 对方ip地址 和端口号
server.sendto(data.upper(),addr)
?
print(res)
server.close()
?
---------------socket.client1--------------
import socket
?
client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
?
while True:
data = input(">>:").encode("utf-8")
client.sendto(data,("127.0.0.1",8888))
d,addr = client.recvfrom(1024)
print(d.decode("utf-8"))
?
client.close()
?
------------socket.client2--------------
import socket
?
client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
?
while True:
data = input(">>:").encode("utf-8")
client.sendto(data,("127.0.0.1",8888))
d,addr = client.recvfrom(1024)
print(d.decode("utf-8"))
?
client.close()
1.可以帮你通过代码执行操作系统的终端命令
2.并返回终端执行命令后的结果
用subprocess
模块来运行系统命令.subprocess
模块允许我们创建子进程,连接他们的输入/输出/错误管道,还有获得返回值。
1.subprocess模块中只定义了一个类: Popen。可以使用Popen来创建进程,并与进程进行复杂的交互。
2.如果参数shell设为true,程序将通过shell来执行。
3.subprocess.PIPE
在创建Popen对象时,subprocess.PIPE可以初始化stdin, stdout或stderr参数。表示与子进程通信的标准流。
初始版
import subprocess
# 执行系统dir命令,把执行的正确结果放到管道中
obj = subprocess.Popen(
'tasklist', # cmd 命令 /dir/tasklist
shell= True, # Shell=True
stderr=subprocess.PIPE, # 返回错误结果参数 error
stdout=subprocess.PIPE # 返回正确结果参数
)
# 拿到正确结果的管道,读出里面的内容
data = obj.stdout.read() + obj.stderr.read()
# cmd中默认为gkb,解码需要gbk
print(data.decode('gbk'))
循环打印
客户端与服务端交互,cmd命令在客户端打印
'''服务端'''
import socket
import subprocess
'''客户端输入cmd命令,服务端接受命令并传给cmd,得到正确的数据,利用subprocess返回数据'''
s = socket.socket()
s.bind(
('127.0.0.1',8848)
)
s.listen(5)
print('等待客户端连接')
while True:
conn,addr = s.accept()
print(f'有客户端{addr}成功连接')
while True:
try:
# 1.接受用户输入的cmd命令
cmd = conn.recv(1024).decode('utf-8')
if cmd == 'q':
break
# 2.将用户输入命令利用subprocess得到正确返回
obj = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 3.取出正确和错误结果
data = obj.stdout.read() + obj.stderr.read()
# 将结果发送给客户端
conn.send(data)
except Exception:
break
conn.close()
==========================================================
'''客户端'''
import socket
import subprocess
c = socket.socket()
c.connect(
('127.0.0.1',8848)
)
while True:
data = input('请输入CMD命令:')
# 发送
c.send(data.encode('utf-8'))
if data == 'q':
break
# 接收 进行解码,cmd默认gbk形
msg = c.recv(1024).decode('gbk')
print(msg)
粘包是指基于TCP协议传输数据时,一次传输的数据与接受的数据不匹配的情况,粘包问题分为两种,一种是由发送端引起的,一种是由接受方引起的。例如:使用socket实现一个远程控制cmd的命令程序,输入一个执行命令(如dir)得到的信息与在终端输入该命令得到的结果不同(通常提现为不全(数据过大,接受有限,未全部取走)或者不匹配(上一个命令没有拿完剩下的数据信息))
注:只有TCP有粘包现象,UDP永远不会粘包。
发送端发送数据的大小与接受端接受数据的大小是可以由程序员随意设置的(发送端可以是一K一K地发送数据,而接收端的应用程序可以两K两K地提走数据,当然也有可能一次提走3K或6K数据,或者一次只提走几个字节的数据),也就是说,应用程序所看到的数据是一个整体,或说是一个流(stream),应用程序无法得知一个消息对应由多少个字节,因此TCP协议是面向流的协议,这也是容易出现粘包问题的原因。
而UDP是面向消息的协议,每个UDP段都是一条消息(以包的形式存在系统缓冲区),应用程序必须以消息为单位提取数据,不能一次提取任意字节的数据,这一点和TCP是很不同的。怎样定义消息呢?可以认为对方一次性write/send的数据为一个消息,需要明白的是当对方send一条信息的时候,无论底层怎样分段分片,TCP协议层会把构成整条消息的数据段排序完成后才呈现在内核缓冲区。
例如基于tcp的套接字客户端往服务端上传文件,发送时文件内容是按照一段一段的字节流发送的,在接收方看了,根本不知道该文件的字节流从何处开始,在何处结束。
所谓粘包问题主要还是因为接收方不知道消息之间的界限,不知道一次性提取多少字节的数据所造成的。
此外,发送方引起的粘包是由TCP协议本身造成的,TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一个TCP段。若连续几次需要send的数据都很少,通常TCP会根据优化算法把这些数据合成一个TCP段后一次发送出去,这样接收方就收到了粘包数据。
udp的recvfrom是阻塞的,一个recvfrom(x)必须对唯一一个sendinto(y),收完了x个字节的数据就算完成,若是y>x数据就丢失,这意味着udp根本不会粘包,但是会丢数据,不可靠。
tcp的协议数据不会丢,没有收完包,下次接收,会继续上次继续接收,己端总是在收到ack时才会清除缓冲区内容。数据是可靠的,但是会粘包。
为了避免粘包现象,可采取以下几种措施:
问题的根源在于,接收端不知道发送端将要传送的字节流的长度,所以解决粘包的方法就是围绕,如何让发送端在发送数据前,把自己将要发送的字节流总大小让接收端知晓,然后接收端来一个死循环接收完所有数据。
a、可以通过导入time模块让程序在发完一段数据后睡眠一段时间,让另一端接受玩后在发送下一段数据,这种方法严重影响程序的运行速度,因此不建议使用。
b、为字节流加上自定义固定长度报头(用struct模块来pack个定长的报头),报头中包含字节流长度,然后一次send到对端,对端在接收时,先从缓存中取出定长的报头,然后再取真实数据
(1)对于发送方引起的粘包现象,用户可通过编程设置来避免,TCP提供了强制数据立即传送的操作指令push,TCP软件收到该操作指令后,就立即将本段数据发送出去,而不必等待发送缓冲区满;
(2)对于接收方引起的粘包,则可通过优化程序设计、精简接收进程工作量、提高接收进程优先级等措施,使其及时接收数据,从而尽量避免出现粘包现象;
(3)由接收方控制,将一包数据按结构字段,人为控制分多次接收,然后合并,通过这种手段来避免粘包。
代码编写思路:
我们可以把报头做成字典,字典里包含将要发送的真实数据的详细信息,然后json序列化,然后用struct将序列化后的数据长度打包成4个字节(4个字节已够用)
发送时:
1、先发送报头长度;
2、再编码报头内容然后发送;
3、最后发真实内容 。
接收时:
1、先收取报头长度,用struct取出来;
2、根据取出的长度收取报头内容,然后解码,反序列化;
3、从反序列化的结果中取出待取数据的详细信息,然后去取真实的数据内容。
可以将发送的数据长度提前发送至服务端,服务端接受到数据长度,自定义接受.
必须先定义报头,发送报头,再发送真实数据.
是一个可以将很长的数据的长度,压缩成固定的长度的一个标记(数据报头)
`i:模式`,会将数据长度压缩成4个bytes
'''服务端'''
import socket
import struct
import subprocess
s= socket.socket()
s.bind(
('127.0.0.1',8848)
)
s.listen(5)
print('等待客户端连接')
while True :
conn,addr= s.accept()
print(f'客户端{addr}已连接')
while True:
try:
# 1.接受用户输入的cmd命令
cmd = conn.recv(1024).decode('utf-8')
if cmd == 'q':
break
# 2.将用户输入命令利用subprocess得到正确返回
obj = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
# 3.取出正确和错误结果
data = obj.stdout.read() + obj.stderr.read()
# 4.打包压缩,成只有4位的报头信息,i 模式
headers = struct.pack('i',len(data))
# 5.将报头先行发送,让客户端准备接受的大小
conn.send(headers)
# 6.再发送真实数据
conn.send(data)
except Exception as e:
print(e)
break
conn.close()
=========================================================
'''客户端'''
import socket
import subprocess
import struct
c = socket.socket()
c.connect(
('127.0.0.1',8848)
)
while True:
msg = input('请输入cmd命令')
if msg == 'q':
break
# 1.将cmd 命令传至服务端处理
c.send(msg.encode('utf-8'))
# 2.接受服务端发送的4位报头
headers = c.recv(4)
# 3.将报头解包(unpack),获得元组,索引取数据长度
data_len = struct.unpack('i',headers)[0]
# 4.接受真实的数据信息
data = c.recv(data_len)
print(data.decode('gbk'))
c.close()
发送文件的描述信息(字典)
'''客户端发送字典给服务端
send_dic:{
file_name : 文件名
file_size : 文件的真实长度}
服务端接受到字典,并接受文件的真实数据'''
服务端
import socket
import struct
import json
s = socket.socket()
s.bind(
('127.0.0.1',8848)
)
s.listen(5)
print('等待客户端连接')
while True:
conn,addr = s.accept()
print(f'用户{addr}已连接')
while True:
try:
# 1.接受到客户端发送的报头
beaders = conn.recv(4)
# print(beaders) # b'M\x00\x00\x00'
# 2.报头解压缩,索引获得数据长度
data__len = struct.unpack('i',beaders)[0]
# print(data__len) # 77
# 3.接受真实数据(序列化的数据)
bytes_data = conn.recv(data__len)
# print(bytes_data) # b'{"file_name": "abc\\u7684\\u9017\\u6bd4\\u4eba\\u751f.txt", "file_size": 12345678}'
# 4.反序列化获得数据
dic = json.loads(bytes_data.decode('utf-8'))
# print(dic) # {'file_name': 'abc的逗比人生.txt', 'file_size': 12345678}
except Exception as e :
print(e)
break
conn.close()
=========================================================
'''客户端'''
import socket
import struct
import json
import time
c = socket.socket()
c.connect(
('127.0.0.1',8848)
)
while True:
# 1.用户文件的字典
send_dic = {
'file_name':'abc的逗比人生.txt',
'file_size':12345678
}
# 2.json序列化,并转码成bytes类型数据(为的是struct的len长度)
json_data = json.dumps(send_dic)
bytes_data = json_data.encode('utf-8')
# 3.压缩数据做成报头,发送至服务端
headers = struct.pack('i',len(bytes_data))
# print(bytes_data) # b'{"file_name": "abc\\u7684\\u9017\\u6bd4\\u4eba\\u751f.txt", "file_size": 12345678}'
# print(len(bytes_data)) # 77
# print(headers) # b'M\x00\x00\x00'
c.send(headers)
# 4.发送真实字典
c.send(bytes_data)
time.sleep(10)
# {'file_name': 'abc的逗比人生.txt', 'file_size': 12345678}
# 会一直打印,所以手动停止5秒
利用while
循环进行一段一段的上传防止粘包.
服务端
import socket
import struct
import json
s = socket.socket()
s.bind(
('127.0.0.1',8848)
)
s.listen(5)
print('等待客户端连接')
while True:
conn, addr = s.accept()
print(f'客户端{addr}已连接')
try:
# 1.接受客户端传来的字典报头
headers = conn.recv(4)
# 2.解压索引获得字典的长度
data_len = struct.unpack('i',headers)[0]
# 3.接受文件字典的bytes信息
bytes_data = conn.recv(data_len)
# 4.反序列化得到字典数据
data_dic = json.loads(bytes_data.decode('utf-8'))
print(data_dic)
# 5.获得文件字典的名字与大小
file_name = data_dic.get('file_name')
file_size = data_dic.get('file_size')
# 6.以文件名打开文件(循环控制打开资源占用)
size = 0
with open(file_name,'wb') as f:
while size < file_size:
# 每次接受1024大小
data = conn.recv(1024)
# 每次写入data大小
f.write(data)
# 写完进行追加
size += len(data)
print(f'{file_name}接受完毕')
except Exception as e:
print(e)
break
conn.close()
客户端
import socket
import struct
import json
c= socket.socket()
c.connect(
('127.0.0.1',8848)
)
# 1.打开一个视频文件,获取数据大小
with open(r'F:\老男孩12期开课视频\day 27\5 上传大文件.mp4','rb') as f :
movie_bytes = f.read() #获得的是二进制流
# 文件自动关闭
# 2.为视频文件组织一个信息字典,字典有名称大小
movie_info = {
'file_name':'大视频.mp4',
'file_size':len(movie_bytes)
}
# 3.序列化字典,发送文件字典的报头(客户端获得文件的名字与大小)
json_data = json.dumps(movie_info)
bytes_data = json_data.encode('utf-8')
# 获得字典的报头
headers = struct.pack('i',len(bytes_data))
# 发送报头
c.send(headers)
# 发送真实文件的字典
c.send(bytes_data)
# 4.发送真实的文件数据(大文件循环发送减少占用)
size = 0
num = 1
with open(r'F:\老男孩12期开课视频\day 27\5 上传大文件.mp4','rb') as f :
while size < len(movie_bytes):
# 打开时每次读取1024大小数据
send_data = f.read(1024) # 获得的是二进制流
print(send_data,num)
num += 1
# 每次发送都是1024大小数据
c.send(send_data)
# 为初始数据增加发送的大小,控制循环
size += len(send_data)
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础
进程指的是正在运行的程序,是一系列过程的统称,也是操作系统调度和进行资源分配的基本单位。
进程是实现并发的一种方式,在学习并发编程之前要先了解进程的基本概念以及多进程的实现原理,这就必须提到操作系统了,因为进程这个概念来自于操作系统,没有操作系统就没有进程。
并发指的是多个任务(看起来像是)同时被执行
在之前的TCP通讯中,服务器在建立连接后需要一个循环来与客户端循环的收发数据,但服务器并不知道客户端什么时候会发来数据,导致没有数时服务器进入了一个等待状态,此时其他客户端也无法链接服务器,很明显这是不合理的,学习并发编程就是要找到一种方案,让一个程序中的的多个任务可以同时被处理。
下图是操作系统在整个计算机中所在的位置:
操作系统位于应用软件和硬件设备之间,本质上也是一个软件,
它由系统内核(管理所有硬件资源)与系统接口(提供给程序员使用的接口)组成
操作系统是为了方便用户操作计算机而提供的一个运行在硬件之上的软件
1、为用户屏蔽了复杂繁琐的硬件接口,为应用程序提供了,清晰易用的系统接口。
有了这些接口以后程序员不用再直接与硬件打交道了
例子:有了操作系统后我们就可以使用资源管理器来操作硬盘上的数据,而不用操心,磁头的移动啊,数据的读写等等
2、操作系统将应用程序对硬件资源的竞争变成了有序的使用。
例子:所有软件 qq啊 微信啊 吃鸡啊都共用一套硬件设备 假设现有三个程序都在使用打印机,如果不能妥善管理竞争问题,可能一个程序打印了一半图片后,另一个程序抢到了打印机执行权于是打印了一半文本,导致两个程序的任务都没能完成,操作系统的任务就是将这些无序的操作变得有序
二者的区别不在于的地位,它们都是软件,而操作系统可以看做一款特殊的软件
1.操作系统是受保护的:无法被用户修改(应用软件如qq不属于操作系统可以随便卸载)。
2.大型:linux或widows源代码都在五百万行以上,这仅仅是内核,不包括用户程序,如GUI,库以及基本应用软件(如windows Explorer等),很容易就能达到这个数量的10倍或者20倍之多。
3.长寿:由于操作系统源码量巨大,编写是非常耗时耗力的,一旦完成,操作系统所有者便不会轻易的放弃重写,二是在原有基础上改进,基本上可以把windows95/98/Me看成一个操作系统。
多道技术出现在第三代操作系统中,是为了解决前两代操作系统存在的种种问题而出现,那么前两代操作系统都有哪些问题呢?一起来看看操作系统的发展历史:
第一代计算机的产生背景:
第一代之前人类是想用机械取代人力,第一代计算机的产生是计算机由机械时代进入电子时代的标志,从Babbage失败之后一直到第二次世界大战,数字计算机的建造几乎没有什么进展,第二次世界大战刺激了有关计算机研究的爆炸性进展。
lowa州立大学的john Atanasoff教授和他的学生Clifford Berry建造了据认为是第一台可工作的数字计算机。该机器使用300个真空管。大约在同时,Konrad Zuse在柏林用继电器构建了Z3计算机,英格兰布莱切利园的一个小组在1944年构建了Colossus,Howard Aiken在哈佛大学建造了Mark 1,宾夕法尼亚大学的William Mauchley和他的学生J.Presper Eckert建造了ENIAC。这些机器有的是二进制的,有的使用真空管,有的是可编程的,但都非常原始,设置需要花费数秒钟时间才能完成最简单的运算。
在这个时期,同一个小组里的工程师们,设计、建造、编程、操作及维护同一台机器,所有的程序设计是用纯粹的机器语言编写的,甚至更糟糕,需要通过成千上万根电缆接到插件板上连成电路来控制机器的基本功能。没有程序设计语言(汇编也没有),操作系统则是从来都没听说过。使用机器的过程更加原始,详见下‘工作过程’
特点: 没有操作系统的概念 所有的程序设计都是直接操控硬件
工作过程: 程序员在墙上的机时表预约一段时间,然后程序员拿着他的插件版到机房里,将自己的插件板街道计算机里,这几个小时内他独享整个计算机资源,后面的一批人都得等着(两万多个真空管经常会有被烧坏的情况出现)。
后来出现了穿孔卡片,可以将程序写在卡片上,然后读入机而不用插件板
优点:
程序员在申请的时间段内独享整个资源,可以即时地调试自己的程序(有bug可以立刻处理)
缺点:
浪费计算机资源,一个时间段内只有一个人用。 注意:同一时刻只有一个程序在内存中,被cpu调用执行,比方说10个程序的执行,是串行的
第二代计算机的产生背景:
由于当时的计算机非常昂贵,自认很自然的想办法较少机时的浪费。通常采用的方法就是批处理系统。
特点: 设计人员、生产人员、操作人员、程序人员和维护人员直接有了明确的分工,计算机被锁在专用空调房间中,由专业操作人员运行,这便是‘大型机’。
有了操作系统的概念
有了程序设计语言:FORTRAN语言或汇编语言,写到纸上,然后穿孔打成卡片,再将卡片盒带到输入室,交给操作员,然后喝着咖啡等待输出接口
工作过程:
第二代如何解决第一代的问题/缺点: 1.把一堆人的输入攒成一大波输入, 2.然后顺序计算(这是有问题的,但是第二代计算也没有解决) 3.把一堆人的输出攒成一大波输出
现代操作系统的前身:(见图)
优点:批处理,节省了机时
缺点:1.整个流程需要人参与控制,将磁带搬来搬去(中间俩小人)
2.计算的过程仍然是顺序计算-》串行
3.程序员原来独享一段时间的计算机,现在必须被统一规划到一批作业中,等待结果和重新调试的过程都需要等同批次的其他程序都运作完才可以(这极大的影响了程序的开发效率,无法及时调试程序)
第三代计算机的产生背景:
20世纪60年代初期,大多数计算机厂商都有两条完全不兼容的生产线。
一条是面向字的:大型的科学计算机,如IBM 7094,见上图,主要用于科学计算和工程计算
另外一条是面向字符的:商用计算机,如IBM 1401,见上图,主要用于银行和保险公司从事磁带归档和打印服务
开发和维护完全不同的产品是昂贵的,同时不同的用户对计算机的用途不同。
IBM公司试图通过引入system/360系列来同时满足科学计算和商业计算,360系列低档机与1401相当,高档机比7094功能强很多,不同的性能卖不同的价格
360是第一个采用了(小规模)芯片(集成电路)的主流机型,与采用晶体管的第二代计算机相比,性价比有了很大的提高。这些计算机的后代仍在大型的计算机中心里使用,此乃现在服务器的前身,这些服务器每秒处理不小于千次的请求。
如何解决第二代计算机的问题1: 卡片被拿到机房后能够很快的将作业从卡片读入磁盘,于是任何时刻当一个作业结束时,操作系统就能将一个作业从磁带读出,装进空出来的内存区域运行,这种技术叫做 同时的外部设备联机操作:SPOOLING,该技术同时用于输出。当采用了这种技术后,就不在需要IBM1401机了,也不必将磁带搬来搬去了(中间俩小人不再需要)
如何解决第二代计算机的问题2:
第三代计算机的操作系统广泛应用了第二代计算机的操作系统没有的关键技术:多道技术
cpu在执行一个任务的过程中,若需要操作硬盘,则发送操作硬盘的指令,指令一旦发出,硬盘上的机械手臂滑动读取数据到内存中,这一段时间,cpu需要等待,时间可能很短,但对于cpu来说已经很长很长,长到可以让cpu做很多其他的任务,如果我们让cpu在这段时间内切换到去做其他的任务,这样cpu不就充分利用了吗。这正是多道技术产生的技术背景
多道技术中的多道指的是多个程序,多道技术的实现是为了解决多个程序竞争或者说共享同一个资源(比如cpu)的有序调度问题,解决方式即多路复用,多路复用分为时间上的复用和空间上的复用。
空间上的复用:将内存分为几部分,每个部分放入一个程序,这样,同一时间内存中就有了多道程序。
时间上的复用:当一个程序在等待I/O时,另一个程序可以使用cpu,如果内存中可以同时存放足够多的作业,则cpu的利用率可以接近100%,类似于我们小学数学所学的统筹方法。(操作系统采用了多道技术后,可以控制进程的切换,或者说进程之间去争抢cpu的执行权限。这种切换不仅会在一个进程遇到io时进行,一个进程占用cpu时间过长也会切换,或者说被操作系统夺走cpu的执行权限)
空间上的复用最大的问题是:程序之间的内存必须分割,这种分割需要在硬件层面实现,由操作系统控制。如果内存彼此不分割,则一个程序可以访问另外一个程序的内存,
首先丧失的是安全性,比如你的qq程序可以访问操作系统的内存,这意味着你的qq可以拿到操作系统的所有权限。
其次丧失的是稳定性,某个程序崩溃时有可能把别的程序的内存也给回收了,比方说把操作系统的内存给回收了,则操作系统崩溃。
多道技术案例:
生活中我们进程会同时做多个任务,但是本质上一个人是不可能同时做执行多个任务的,
例1:吃饭和打游戏,同时执行,本质上是在两个任务之间切换执行,吃一口饭然后打打游戏,打会儿游戏再吃一口饭;
例2:做饭和洗衣服,如果没有多道技术,在电饭煲做饭的时候我们就只能等着,假设洗米花费5分钟,煮饭花费40分钟,相当于40分钟是被浪费的时间。那就可以在煮饭的等待过程中去洗衣服,假设把衣服装进洗衣机花费5分钟,洗衣服花费40分钟,那么总耗时为 5(洗米)+5(装衣服)+40(最长等待时间) 大大提高了工作效率
多道技术也是在不同任务间切换执行,由于计算机的切换速度非常快,所以用户是没有任何感觉的,看起来就像是两个任务都在执行,但是另一个问题是,仅仅是切换还不行,还需要在切换前保存当前状态,切换回来时恢复状态,这些切换和保存都是需要花费时间的!在上述案例中由于任务过程中出现了等待即IO操作所以进行了切换,而对于一些不会出现IO操作的程序而言,切换不仅不能提高效率,反而会降低效率
例如:做一百道乘法题和做一百道除法题,两个任务都是计算任务是不需要等待的,此时的切换反而降低了运行效率!
第三代计算机的操作系统仍然是批处理
许多程序员怀念第一代独享的计算机,可以即时调试自己的程序。为了满足程序员们很快可以得到响应,出现了分时操作系统
如何解决第二代计算机的问题3:
分时操作系统: 多个联机终端+多道技术
20个客户端同时加载到内存,有17在思考,3个在运行,cpu就采用多道的方式处理内存中的这3个程序,由于客户提交的一般都是简短的指令而且很少有耗时长的,索引计算机能够为许多用户提供快速的交互式服务,所有的用户都以为自己独享了计算机资源
CTTS:麻省理工(MIT)在一台改装过的7094机上开发成功的,CTSS兼容分时系统,第三代计算机广泛采用了必须的保护硬件(程序之间的内存彼此隔离)之后,分时系统才开始流行
MIT,贝尔实验室和通用电气在CTTS成功研制后决定开发能够同时支持上百终端的MULTICS(其设计者着眼于建造满足波士顿地区所有用户计算需求的一台机器),很明显真是要上天啊,最后摔死了。
后来一位参加过MULTICS研制的贝尔实验室计算机科学家Ken Thompson开发了一个简易的,单用户版本的MULTICS,这就是后来的UNIX系统。基于它衍生了很多其他的Unix版本,为了使程序能在任何版本的unix上运行,IEEE提出了一个unix标准,即posix(可移植的操作系统接口Portable Operating System Interface)
后来,在1987年,出现了一个UNIX的小型克隆,即minix,用于教学使用。芬兰学生Linus Torvalds基于它编写了Linux
第四代也就是我们常见的操作系统,大多是具备图形化界面的,例如:Windows,macOS ,CentOS等
由于采用了IC设计,计算机的体积下降,性能增长,并且成本以及可以被普通消费者接受,而第三代操作系统大都需要进行专业的学习才能使用,于是各个大佬公司开始开发那种不需要专业学习也可以快速上手的操作系统,即上述操作系统!
它们都是用了GUI 图形化用户接口,用户只需要通过鼠标点击拖拽界面上的元素即可完成大部分操作
from multiprocessing import Process
import time
def task(name):
print('%s is running' %name)
time.sleep(3)
print('%s is done' %name)
if __name__ == '__main__':
# 在windows系统之上,开启子进程的操作一定要放到这下面
# Process(target=task,kwargs={'name':'egon'})
p=Process(target=task,args=('jack',))
p.start() # 向操作系统发送请求,操作系统会申请内存空间,然后把父进程的数据拷贝给子进程,作为子进程的初始状态
print('======主')
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super(MyProcess,self).__init__()
self.name=name
def run(self):
print('%s is running' %self.name)
time.sleep(3)
print('%s is done' %self.name)
if __name__ == '__main__':
p=MyProcess('jack')
p.start()
print('主')
需要注意的是 在windows下 开启子进程必须放到__main__
下面,因为windows在开启子进程时会重新加载所有的代码造成递归
进程是正在运行的程序,程序是程序员编写的一堆代码,也就是一堆字符,当这堆代码被系统加载到内存中并执行时,就有了进程 ,一个程序可以产生多个进程.
current_process().pid
在子进程内使用,可以获取子进程号
from multiprocessing import Process
from multiprocessing import current_process
def task():
print('开始执行...',current_process().pid)
print('结束执行')
if __name__ == '__main__':
p = Process(target=task) # 获得process对象
p.start() # 创建子进程
p.join() # 执行完子进程在执行主进程
print('主进程')
'''开始执行... 2092
结束执行
主进程'''
os.getpid()
在主进程中获得主进程号. 系统会给每一个进程分配一个进程编号即PID.
tasklist 用于查看所有的进程信息
# 在python中可以使用os模块来获取pid
import os
print(os.getpid())
#还可以通过current_process模块来获得
current_process().pid #可获得当前运行的程序的pid
from multiprocessing import Process
from multiprocessing import current_process
import os
def task():
print('开始执行...',current_process().pid)
print('结束执行')
if __name__ == '__main__':
p = Process(target=task) # 获得process对象
p.start() # 创建子进程
p.join() # 执行完子进程在执行主进程
print('主进程',os.getpid())
'''
开始执行... 11072
结束执行
主进程 13892
'''
os.getppid
可以查看主主进程的进程号
当一个进程a开启了另一个进程b时,a称为b的父进程,b称为a的子进程 ,在python中可以通过os模块来获取父进程的pid
# 在python中可以使用os模块来获取ppid
import os
print("self",os.getpid()) # 当前进程自己的pid
print("parent",os.getppid()) # 当前进程的父进程的pid
如果是在pycharm中运行的py文件,那pycahrm就是这个python.exe的父进程,当然你可以从cmd中来运行py文件,那此时cmd就是python.exe的父进程
def task():
print('开始执行...',current_process().pid)
time.sleep(1)
print('结束执行')
if __name__ == '__main__':
p = Process(target=task) # 获得process对象
p.start() # 创建子进程
p.join() # 执行完子进程在执行主进程
print('主进程',os.getpid())
print('主主进程',os.getppid())
time.sleep(1000)
'''
开始执行... 12368
结束执行
主进程 10332 # python.exe
主主进程 4152 # pycharm64.exe
'''
join()方法
join函数就可以是父进程等待子进程结束后继续执行
# 让子进程结束后,父进程才结束
from multiprocessing import Process
import time
def task(name):
print('任务开始')
time.sleep(1)
print('任务结束')
if __name__ == '__main__':
p = Process(target= task,args=('你好',))
p.start() # 告诉操作系统,开启子进程
print('join上面的不算')
p.join() # 告诉操作系统,等子进程结束后,父进程再结束
print('主进程')
'''
join上面的不算
任务开始
任务结束
主进程'''
多个子程序的运行
# 多个子进程的运行
from multiprocessing import Process
import time
def task(name,n):
print(f'{name}任务{n}开始')
time.sleep(1)
print(f'{name}任务{n}结束')
if __name__ == '__main__':
p1 = Process(target= task,args=('AAA',1))
p2 = Process(target= task,args=('BBB',2))
p1.start() # 告诉操作系统,开启子进程
p2.start() # 告诉操作系统,开启子进程
print('join上面的不算')
p1.join() # 告诉操作系统,等子进程结束后,父进程再结束
p2.join() # 告诉操作系统,等子进程结束后,父进程再结束
print('主进程')
'''
join上面的不算
AAA任务1开始
BBB任务2开始
AAA任务1结束
BBB任务2结束
主进程
'''
打印时会出现任务1,2顺序的不一致,貌似是因为程序并行导致cpu分配执行打印速度导致
is_alive()
判断子进程是否存活
def task():
print('开始执行...',current_process().pid)
if __name__ == '__main__':
p = Process(target=task) # 获得process对象
p.start() # 创建子进程
print(p.is_alive()) # 判断子进程是否存活
print('主进程',os.getpid())
'''
True
主进程 3740
开始执行... 7004'''
.terminate()
直接告诉操作系统,终止子程序
def task():
print('开始执行...',current_process().pid)
if __name__ == '__main__':
p = Process(target=task) # 获得process对象
p.start() # 创建子进程
# 判断子进程是否存活
print(p.is_alive())
# 告诉操作系统直接终止掉子进程
p.terminate()
time.sleep(0.1)
# 判断子进程是否存活
print(p.is_alive())
print('主进程',os.getpid())
'''
True
False
主进程 7976'''
正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)
出错退出(自愿,python a.py中a.py不存在)
严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)
被其他进程杀死(非自愿,如kill -9)
指的是子进程已经结束,但PID号还存在,但是父进程 没有去处理这些残留信息,就导致残留信息占用系统内存 .
缺点: 占用PID号,占用操作系统资源
PID号:PID是各进程的代号,运行时系统随机分配,但是进程终止后PID标识符就会被系统回收,进程号具有固定数量.
指的是子进程还在执行,但父进程意外结束.
操作系统优化机制:自动回收此类子进程
指的是主进程结束后,该主进程产生的所有子进程跟着结束,并回收.
1、定义:一个进程守护另一个进程,当被守护进程运行结束后,守护进程不管是否运行结束,也都跟着结束。
特点:a、守护进程一般为子进程,进程之间是相互独立的,守护进程会在主进程执行结束后就终止;
b、守护进程内无法再开启子进程,否则会抛出异常。
p.daemon = true
设置为守护进程
p.daemon = true (设置为守护进程)
p.start
# 必须在p.start之前设置,如果在之后会报错
例如洗衣服和做饭,同时发生了,但本质上是两个任务在切换,给人的感觉是同时在进行,也被称为伪并行
例如一个人在写代码另一个人在写书,这两件事件是同时在进行的,要注意的是一个人是无法真正的并行执行任务的,在计算机中单核CPU也是无法真正并行的,之所以单核CPU也能同时运行qq和微信其实就是并发执行
阻塞状态是因为程序遇到了IO操作,或是sleep,导致后续的代码不能被CPU执行
非阻塞与之相反,表示程序正在正常被CPU执行
就绪态,运行态,和阻塞态
多道技术会在进程执行时间过长或遇到IO时自动切换其他进程,意味着IO操作与,进程被剥夺CPU执行权都会造成进程阻塞
from multiprocessing import Lock
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,但是共享带来的是竞争,竞争带来的结果就是错乱,那么如何控制错乱呢,此时就用到了加锁处理。
让并发变成串行,牺牲了执行效率,保证了数据的安全在程序并发执行时,需要修改数据时使用.
运行时出现全部购票成功的情况(余票为1),是因为并发编程,每个子进程都获得res为1,每个都以自己的res执行get,最后成功购票,可以在p.start后面加上p.join使其变为串行,前面结束才能运行后面的
from multiprocessing import Process
import json,time
def search():
'''查询余票'''
time.sleep(1)
with open('db.txt','r',encoding='utf-8') as f:
res = json.load(f)
print(f"还剩{res.get('count')}")
def get():
'''购票'''
with open('db.txt','r',encoding='utf-8') as f:
res = json.load(f)
time.sleep(1) # 模拟网络io延迟
if res['count'] > 0:
res['count'] -= 1
with open('db.txt', 'w', encoding='utf-8') as f:
json.dump(res,f)
print('抢票成功')
time.sleep(1)
else:
print('车票已经售罄')
def task():
search()
get()
if __name__ == '__main__':
for i in range(10):
p =Process(target=task)
p.start()
# 加上join运行结果正确
p.join()
lock
进程锁使第一个进程进去第二个必须等待结束才能进,把锁住的代码变成了串行.
lock.acquire()
加锁, lock.release()
解锁
# 设置进程串行,进程锁/互斥锁,使第一个进程进去第二个必须等待结束才能进
from multiprocessing import Process
import json,time
from multiprocessing import Lock
def search():
time.sleep(1)
with open('db.txt','r',encoding='utf-8') as f:
res = json.load(f)
print(f"还剩{res.get('count')}")
def get():
with open('db.txt','r',encoding='utf-8') as f:
res = json.load(f)
time.sleep(1) # 模拟网络io
if res['count'] > 0:
res['count'] -= 1
with open('db.txt', 'w', encoding='utf-8') as f:
json.dump(res,f)
print('抢票成功')
time.sleep(1)
else:
print('车票已经售罄')
def task(lock):
search()
# lock = Lock() # 写在主进程时为了让子进程拿到同一把锁
# 锁住
lock.acquire()
get() # 同一时间只能一个进程执行get()
# 释放锁
lock.release()
if __name__ == '__main__':
lock = Lock() # 写在主进程时为了让子进程拿到同一把锁
for i in range(10):
p =Process(target=task,args=(lock,)) # 将lock当做参数传入
p.start()
总结
进程锁: 是把锁住的代码变成了串行
join: 是把所有的子进程变成了串行
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,速度是慢了,但牺牲了速度却保证了数据安全。虽然可以用文件共享数据实现进程间通信,但问题是:
1.效率低(共享数据基于文件,而文件是硬盘上的数据)
2.需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:
1、效率高(多个进程共享一块内存的数据)
2、帮我们处理好锁问题。
这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中
队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
jion和锁的区别
1、join中顺序是固定的;
2、join是完全串行,而锁可以使部分代码串行,其他代码仍是并发的。
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。
1、实现进程之间相互通讯的几种方式:
a、使用共享文件,多个进程同时读写同一个文件(IO速度慢,传输数据大小不受限制);
b、管道,管道是基于内存的额,速度快,但是单向的,用起来比较麻烦;
c、队列,申请共享内存空间,多个进程可以共享这个内存区域(速度快,但是数据量不能太大)。
pipe: 基于共享的内存空间
Queue: 管道+锁
from multiprocessing import Queue
队列:先进先出
相当于内存中的产生一个队列空间,可以存放多个数据,但数据的顺序是由先进去的排在前面
堆栈:先进后出
Queue()
调用队列类,实例化对象q
q = Queue(5) #若传参队列中可以放5个数据
q = Queue() #若不传参,队列中可以存放无限大的数据,前提是硬件跟得上
put()
添加数据,若队列中的数据满了放值会阻塞
from multiprocessing import Queue
q = Queue(2)
q.put('山')
q.put('水')
q.put([1,2,3]) # 阻塞住
========================================================
from multiprocessing import Queue
q = Queue(2)
# block=true 是默认会阻塞, timeout = 2 等待时间超时2s
q.put('风',block=True,timeout=2)
q.put('风',block=True,timeout=2)
# 如果满了会进行等待,但最多等待2s否则报错
q.put('风',block=True,timeout=2)
q.get()
遵循先进先出
from multiprocessing import Queue
q = Queue(2)
q.put('山')
q.put('水')
q.put([1,2,3])
print(q.get()) # 山
print(q.get()) # 水
print(q.get()) # [1, 2, 3]
print(q.get()) # 默认存值没有就会阻塞在这
empty()
判断队列是否为空
print(q.empty()) # False
q.get_nowait()
获取数据,队列中若没有则会报错
print(q.get_nowait())
q.put_nowait()
添加数据 若队列满了, 则会报错
q.put_nowait(6)
q.full()
判断队列是否满
print(q.full()) # True
from multiprocessing import Process,Queue
def test1(q):
data = f'你好啊,赛利亚'
# 将数据data传入队列中
q.put(data)
print('进程1开始添加数据到队列中...')
def test2(q):
data = q.get()
print(f'进程2从队列中获取数据[{data}]')
if __name__ == '__main__':
# 获得队列的对象
q = Queue()
# 获取进程对象,并将队列对象传入
p1 = Process(target=test1,args=(q,))
p2 = Process(target=test2,args=(q,))
# 启动进程1,2
p1.start()
p2.start()
'''
进程1开始添加数据到队列中...
进程2从队列中获取数据[你好啊,赛利亚]
'''
生产者:生产数据的
消费者:使用数据的
生产者 --- 队列(容器) --- 消费者
通过队列,生产者把数据添加进去,消费者从队列中获取(不适合传大文件)
生产者可以不停的生产,并可以存放在容器队列中,消费者也可以不停的取,没有因生产者的生产效率低下而等待(一种设计思想)
在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
简单版
from multiprocessing import Queue,Process
def producer(q,name,food):
'''生产者'''
for i in range(10):
print(f'{name}生产出{food}数据{i}')
# 传输的消息
res = f'{food}{i}'
# 放入队列中
q.put(res)
# 循环结束,传入q结束生产
q.put('q')
def consumer(q,name):
'''消费者'''
# 消费者不停的获取
while True:
# 从队列中获取
res = q.get()
if res == 'q':
break
print(f'{name}消费使用了{res}数据')
if __name__ == '__main__':
q = Queue() # 创建队列
p1 = Process(target=producer,args=(q,'小明','电影')) # 传入子进程中
p2 = Process(target=consumer,args=(q,'小红'))
p1.start()
p2.start()
加强版
from multiprocessing import Queue,Process
def producer(q,name,food):
'''生产者'''
for i in range(3):
print(f'{name}生产出{food}数据{i}')
# 传输的消息
res = food,i
# 放入队列中
q.put(res)
def consumer(q,name):
'''消费者'''
# 消费者不停的获取
while True:
# 从队列中获取
res = q.get()
# 判断是否有数据
if not res:
break
print(f'{name}消费使用了{res}数据')
if __name__ == '__main__':
q = Queue() # 创建队列
p1 = Process(target=producer,args=(q,'小明','电影')) # 传入子进程中
p2 = Process(target=producer,args=(q,'小黑','书籍')) # 传入子进程中
p3 = Process(target=producer,args=(q,'小亮','动漫')) # 传入子进程中
c1 = Process(target=consumer,args=(q,'小红'))
c2 = Process(target=consumer,args=(q,'小绿'))
p1.start()
p2.start()
p3.start()
# 将c1,2设置为守护者模式
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
# join 设置子程序结束后主程序再结束
p3.join()
print('主')
线程与进程都是虚拟单位,目的是为了更好的描述某种事物.
进程:资源单位
进程是程序的分配资源的最小单元;一个程序可以有多个进程,但只有一个主进程;进程由程序、数据集、控制器三部分组成。
线程:执行单位
线程是程序最小的执行单元;一个进程可以有多个线程,但是只有一个主线程;线程切换分为两种:一种是I/O切换,一种是时间切换
开启一个进程一定会有一个线程,线程才是真正执行单位,节省内存资源
多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。
进程中包含了运行该程序的所需要的所有资料,进程是一个资源单位,线程是CPU的最小执行单位,每一个进程一旦被创建,就默认开启了一条线程,称之为主线程。一个进程可以包含多个线程,进程包含线程,线程依赖进程。
进程对于操作系统的资源耗费非常高,而线程非常低(比进程低10-100倍); 2.在同一个进程中,多个线程之间资源是共享的。
开启进程
开辟一个名称空间,每开启一个进程都会占用一份内存资源
会自带一个线程
开启线程
注意: 线程不能实现并行,线程只能实现并发
from threading import Thread
调用线程
线程不需要在if __name__ == ‘__main__‘:
下启动也可以执行.
方式1: 实例化线程对象
from threading import Thread
import time
def task():
print('线程开启')
time.sleep(1)
print('线程结束')
# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
# 实例化线程t
t = Thread(target=task)
t.start()
方式2:创建类
from threading import Thread
import time
class MyTh(Thread):
def run(self):
print('线程开启')
time.sleep(1)
print('线程结束')
if __name__ == '__main__':
t = MyTh()
t.start()
# Thread实例对象的方法
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。
# threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
current_thread.name()
获得线程号
from threading import Thread
import time
from threading import current_thread
def task():
print(f'线程开启{current_thread().name}')
time.sleep(1)
print(f'线程结束{current_thread().name}')
# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
for i in range(3):
# 实例化线程t
t = Thread(target=task)
t.start()
isAlive()
判断线程是否存活
def task():
print(f'线程开启{current_thread().name}')
time.sleep(1)
print(f'线程结束{current_thread().name}')
# 不在__name__下也可以执行,名称空间使用的是进程的
if __name__ == '__main__':
# 实例化线程t
t = Thread(target=task)
print(t.isAlive()) # false
t.start()
print(t.isAlive()) # True
print(t.is_alive()) # True
.daemon = True
, 守护线程会等待主线程运行完毕后被销毁
#1.对主进程来说,运行完毕指的是主进程代码运行完毕
#2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
#1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
#2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
from threading import Thread
import time
def task():
print('线程开启')
time.sleep(0.1)
print('线程结束')
if __name__ == '__main__':
# 实例化线程t
t = Thread(target=task)
t.daemon = True
t.start()
print('主')
'''# 主进程结束线程即结束
线程开启
主'''
queue.Queue()
线程队列,线程之间使用,与进程的差不多
import queue
q = queue.Queue() # 默认创建多个线程
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 1
FIFO
普通的使用,默认就是先进先出
LIFO
queue.LifoQueue()
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 3
根据参数内数字的大小进行分级,数字值越小,优先级越高
传参时将元组传入,以数字或ascll码进行排序
import queue
q = queue.PriorityQueue()
q.put((3,'abc'))
q.put((10,'js'))
print(q.get())
'''abc'''
当多个进程或线程同时修改同一份数据时,可能会造成数据的错乱,所以必须要加锁。线程之间的数据是共享的
锁的使用方法与进程中锁的使用方法完全相同。
# 每个线程都会执行task修改操作,但每个人都是并发的,所以使用加锁进来串行操作
from threading import Thread
import time
from threading import Thread, Lock
import time
mutex = Lock()
n = 100
def task(i):
print(f'线程{i}启动...')
global n
mutex.acquire()
temp = n
time.sleep(0.1) # 一共等待10秒
n = temp-1
print(n)
mutex.release()
if __name__ == '__main__':
t_l=[]
for i in range(100):
t = Thread(target=task, args=(i, ))
t_l.append(t)
t.start()
for t in t_l:
t.join()
# 100个线程都是在100-1
print(n)
a、定义:信号量也是一种锁,他的特点是可以设置一个数据可以被几个线程(进程)共享。
b、与普通锁的区别:
普通锁一旦加锁,则意味着这个数据在同一时间只能被一个线程使用;而信号量可以让这个数据在统一时间被限定个数的多个进程使用。
semaphore
是一个内置的计数器,用于控制进入指定数量的锁。
每当调用acquire()时,内置计数器-1
每当调用release()时,内置计数器+1
计数器不能小于0,当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
应用场景
比如说在读写文件的时候,一般只能只有一个线程在写,而读可以有多个线程同时进行,如果需要限制同时读文件的线程个数,这时候就可以用到信号量了(如果用互斥锁,就是限制同一时刻只能有一个线程读取文件)。
又比如在做爬虫的时候,有时候爬取速度太快了,会导致被网站禁止,所以这个时候就需要控制爬虫爬取网站的频率。
from threading import Semaphore
from threading import current_thread
from threading import Thread,Lock
import time
mutex = Lock()
sm = Semaphore(5) # 设置进入锁的数量
def task():
# mutex.acquire() # 一次进入1个(锁住)
sm.acquire() # 一次5个进入锁
print(f'{current_thread().name}抢到执行权限')
time.sleep(1) # 进行延时操作
sm.release()
# mutex.release() # 一次进入1个(解锁)
for l in range(20):
t = Thread(target=task)
t.start()
GIL的优点:
GIL的缺点:
官方解释:
'''
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)
'''
释义:
在CPython中,这个全局解释器锁,也称为GIL,是一个互斥锁,防止多个线程在同一时间执行Python字节码,这个锁是非常重要的,因为CPython的内存管理非线程安全的,很多其他的特性依赖于GIL,所以即使它影响了程序效率也无法将其直接去除
总结:
在CPython中,GIL会把线程的并行变成串行,导致效率降低
需要知道的是,解释器并不只有CPython,还有PyPy,JPython等等。GIL也仅存在与CPython中,这并不是Python这门语言的问题,而是CPython解释器的问题!
首先必须明确执行一个py文件,分为三个步骤
其次需要明确的是每当执行一个py文件,就会立即启动一个python解释器,
当执行test.py时其内存结构如下:
GIL,叫做全局解释器锁,加到了解释器上,并且是一把互斥锁,那么这把锁对应用程序到底有什么影响?
这就需要知道解释器的作用,以及解释器与应用程序代码之间的关系
py文件中的内容本质都是字符串,只有在被解释器解释时,才具备语法意义,解释器会将py代码翻译为当前系统支持的指令交给系统执行。
当进程中仅存在一条线程时,GIL锁的存在没有不会有任何影响,但是如果进程中有多个线程时,GIL锁就开始发挥作用了。如下图:
开启子线程时,给子线程指定了一个target表示该子线程要处理的任务即要执行的代码。代码要执行则必须交由解释器,即多个线程之间就需要共享解释器,为了避免共享带来的数据竞争问题,于是就给解释器加上了互斥锁!
由于互斥锁的特性,程序串行,保证数据安全,降低执行效率,GIL将使得程序整体效率降低!
在使用Python中进行编程时,程序员无需参与内存的管理工作,这是因为Python有自带的内存管理机制,简称GC。那么GC与GIL有什么关联?
要搞清楚这个问题,需先了解GC的工作原理,Python中内存管理使用的是引用计数,每个数会被加上一个整型的计数器,表示这个数据被引用的次数,当这个整数变为0时则表示该数据已经没有人使用,成了垃圾数据。
当内存占用达到某个阈值时,GC会将其他线程挂起,然后执行垃圾清理操作,垃圾清理也是一串代码,也就需要一条线程来执行。
实例代码
from threading import Thread
def task():
a = 10
print(a)
# 开启三个子线程执行task函数
Thread(target=task).start()
Thread(target=task).start()
Thread(target=task).start()
上述代码内存结构如下:
通过上图可以看出,GC与其他线程都在竞争解释器的执行权,而CPU何时切换,以及切换到哪个线程都是无法预知的,这样一来就造成了竞争问题,假设线程1正在定义变量a=10,而定义变量第一步会先到到内存中申请空间把10存进去,第二步将10的内存地址与变量名a进行绑定,如果在执行完第一步后,CPU切换到了GC线程,GC线程发现10的地址引用计数为0则将其当成垃圾进行了清理,等CPU再次切换到线程1时,刚刚保存的数据10已经被清理掉了,导致无法正常定义变量。
当然其他一些涉及到内存的操作同样可能产生问题问题,为了避免GC与其他线程竞争解释器带来的问题,CPython简单粗暴的给解释器加了互斥锁,如下图所示:
有了GIL后,多个线程将不可能在同一时间使用解释器,从而保证了解释器的数据安全。
加锁的时机:在调用解释器时立即加锁
解锁时机:
GIL保护的是解释器级别的数据安全,比如对象的引用计数,垃圾分代数据等等,具体参考垃圾回收机制详解。
对于程序中自己定义的数据则没有任何的保护效果,这一点在没有介绍GIL前我们就已经知道了,所以当程序中出现了共享自定义的数据时就要自己加锁,如下例:
from threading import Thread,Lock
import time
a = 0
def task():
global a
temp = a
time.sleep(0.01)
a = temp + 1
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
t1.join()
t2.join()
print(a)
过程分析:
1.线程1获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL
2.线程2获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL
3.线程1睡醒后获得CPU执行权,并获取GIL执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU并释放GIL
4.线程2睡醒后获得CPU执行权,并获取GIL执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU并释放GIL,最后a的值也就是1
之所以出现问题是因为两个线程在并发的执行同一段代码,解决方案就是加锁!
from threading import Thread,Lock
import time
lock = Lock()
a = 0
def task():
global a
lock.acquire()
temp = a
time.sleep(0.01)
a = temp + 1
lock.release()
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
t1.join()
t2.join()
print(a)
过程分析:
1.线程1获得CPU执行权,并获取GIL锁执行代码 ,得到a的值为0后进入睡眠,释放CPU并释放GIL,不释放lock
2.线程2获得CPU执行权,并获取GIL锁,尝试获取lock失败,无法执行,释放CPU并释放GIL
3.线程1睡醒后获得CPU执行权,并获取GIL继续执行代码 ,将temp的值0+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为1
4.线程2获得CPU执行权,获取GIL锁,尝试获取lock成功,执行代码,得到a的值为1后进入睡眠,释放CPU并释放GIL,不释放lock
5.线程2睡醒后获得CPU执行权,获取GIL继续执行代码 ,将temp的值1+1后赋给a,执行完毕释放CPU释放GIL,释放lock,此时a的值为2
GIL的优点:
GIL的缺点:
但我们并不能因此就否认Python这门语言,其原因如下:
GIL仅仅在CPython解释器中存在,在其他的解释器中没有,并不是Python这门语言的缺点
在单核处理器下,多线程之间本来就无法真正的并行执行
在多核处理下,运算效率的确是比单核处理器高,但是要知道现代应用程序多数都是基于网络的(qq,微信,爬虫,浏览器等等),CPU的运行效率是无法决定网络速度的,而网络的速度是远远比不上处理器的运算速度,则意味着每次处理器在执行运算前都需要等待网络IO,这样一来多核优势也就没有那么明显了
举个例子:
任务1 从网络上下载一个网页,等待网络IO的时间为1分钟,解析网页数据花费,1秒钟
任务2 将用户输入数据并将其转换为大写,等待用户输入时间为1分钟,转换为大写花费,1秒钟
单核CPU下:1.开启第一个任务后进入等待。2.切换到第二个任务也进入了等待。一分钟后解析网页数据花费1秒解析完成切换到第二个任务,转换为大写花费1秒,那么总耗时为:1分+1秒+1秒 = 1分钟2秒
多核CPU下:1.CPU1处理第一个任务等待1分钟,解析花费1秒钟。1.CPU2处理第二个任务等待1分钟,转换大写花费1秒钟。由于两个任务是并行执行的所以总的执行时间为1分钟+1秒钟 = 1分钟1秒
可以发现,多核CPU对于总的执行时间提升只有1秒,但是这边的1秒实际上是夸张了,转换大写操作不可能需要1秒,时间非常短!
上面的两个任务都是需要大量IO时间的,这样的任务称之为IO密集型,与之对应的是计算密集型即没有IO操作全都是计算任务。
对于计算密集型任务,Python多线程的确比不上其他语言!为了解决这个弊端,Python推出了多进程技术,可以良好的利用多核处理器来完成计算密集任务。
总结:
1.单核下无论是IO密集还是计算密集GIL都不会产生任何影响
2.多核下对于IO密集任务,GIL会有细微的影响,基本可以忽略
3.Cpython中IO密集任务应该采用多线程,计算密集型应该采用多进程
另外:之所以广泛采用CPython解释器,就是因为大量的应用程序都是IO密集型的,还有另一个很重要的原因是CPython可以无缝对接各种C语言实现的库,这对于一些数学计算相关的应用程序而言非常的happy,直接就能使用各种现成的算法
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。
Event事件的作用: 用来控制线程
事件处理的机制:全局定义了一个内置标志Flag,如果Flag值为 False,那么当程序执行 event.wait方法时就会阻塞,如果Flag值为True,那么event.wait 方法时便不再阻塞。
set()
: 将标志设为True,使处于阻塞状态的线程恢复运行状态。wait()
: 如果标志为True将立即返回,否则线程阻塞(可传时间)isSet()
: 获取内置标志状态,返回True或False。from threading import Event
import time
from threading import Thread
# 调用Event类实例化一个对象
e = Event()
def light():
print('红灯亮...')
time.sleep(5)
# 发送信号true,其他所有进程准备执行
e.set() # flag变为true ,阻塞态变为运行
print('绿灯亮...')
def car(name):
print('正在等红灯...')
# 所有汽车??处于阻塞态
e.wait() # 为false是阻塞,直到为true
print(f'{name}出发...')
# 让一个线程任务控制多个car任务
t = Thread(target=light)
t.start()
for i in range(10):
s = Thread(target=car,args=(f'{i}号赛车',))
s.start()
当light线程执行时,打印'红灯亮',IO操作切换线程执行car线程,打印'正在等红灯',wait时flag是false所以阻塞,遇到IO切换,执行第二个car打印'正在等红灯',.....
直到5秒结束,set让flag变为true,打印'绿灯亮',IO操作
协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
需要强调的是:
#1. python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
#2. 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
对比操作系统控制线程的切换,用户在单线程内控制协程的切换
优点如下:
#1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
#2. 单线程内就可以实现并发的效果,最大限度地利用cpu
缺点如下:
#1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程来尽可能提高效率
#2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
上一节中我们知道GIL锁将导致CPython无法利用多核CPU的优势,只能使用单核并发的执行。很明显效率不高,那有什么办法能够提高效率呢?
效率要高只有一个方法就是让这个当前线程尽可能多的占用CPU时间,如何做到?
任务类型可以分为两种 IO密集型 和 计算密集型
对于计算密集型任务而言 ,无需任何操作就能一直占用CPU直到超时为止,没有任何办法能够提高计算密集任务的效率,除非把GIL锁拿掉,让多核CPU并行执行。
对于IO密集型任务任务,一旦线程遇到了IO操作CPU就会立马切换到其他线程,而至于切换到哪个线程,应用程序是无法控制的,这样就导致了效率降低。
如何能提升效率呢?想一想如果可以监测到线程的IO操作时,应用程序自发的切换到其他的计算任务,是不是就可以留住CPU?的确如此
单线程实现并发这句话乍一听好像在瞎说
首先需要明确并发的定义
并发:指的是多个任务同时发生,看起来好像是同时都在进行
并行:指的是多个任务真正的同时进行
早期的计算机只有一个CPU,既然CPU可以切换线程来实现并发,那么为何不能再线程中切换任务来并发呢?
上面的引子中提到,如果一个线程能够检测IO操作并且将其设置为非阻塞,并自动切换到其他任务就可以提高CPU的利用率,指的就是在单线程下实现并发。
并发 = 切换任务+保存状态,只要找到一种方案,能够在两个任务之间切换执行并且保存状态,那就可以实现单线程并发
python中的生成器就具备这样一个特点,每次调用next都会回到生成器函数中执行代码,这意味着任务之间可以切换,并且是基于上一次运行的结果,这意味着生成器会自动保存执行状态!
于是乎我们可以利用生成器来实现并发执行:
串行的情况下
import time
def func1():
for i in range(10000000):
i + 1
def func2():
for i in range(10000000):
i + 1
start = time.time()
func1()
func2()
end = time.time()
print(f'用时{end-start}') # 用时1.9524507522583008
==========================================================
切换+保存实现并发
import time
def func1():
while True:
10000000 + 1
yield
def func2():
# g 是生成器对象
g = func1()
for i in range(10000000):
i + 1
next(g) # 每次执行next相当于切换到func1下面
start = time.time()
func2()
stop = time.time()
print(stop-start) # 3.2243001461029053
IO密集型的情况下
如何检测到IO操作,引出gevent
第三方库实现并发同步或异步编程,遇到IO阻塞会自动切换任务
pip install gevent # 安装第三方库
from gevent import monkey # 进行导入monkey
monkey.patch_all() # 可以监听该程序下的所有IO操作
from gevent import spawn # 用于做切换+保存状态
from gevent import monkey
monkey.patch_all() # 可以监听该程序下的所有IO操作
from gevent import spawn # 用于做切换+保存状态
import time
def func1():
print('1')
# IO操作
time.sleep(3)
def func2():
print('2')
# IO操作
time.sleep(1)
def func3():
print('3')
# IO操作
time.sleep(5)
start = time.time()
# 调用spawn(任务函数) 让其单线程下实行并发
s1 = spawn(func1)
s2 = spawn(func2)
s3 = spawn(func3)
# join发送信号,相当于等待自己结束再全结束(单线程)
s1.join()
s2.join()
s3.join()
stop = time.time()
print(stop-start) # 5.014296770095825
# 实现并发执行,func123会并发一起执行
'''服务端'''
from gevent import monkey
monkey.patch_all() # 检测IO
import socket
from gevent import spawn
s = socket.socket()
s.bind(
('127.0.0.1',8848)
)
s.listen(5)
print('等待客户端连接')
def work(conn):
while True:
try:
data = conn.recv(1024)
print(data.decode('utf-8'))
conn.send(data.upper())
except Exception as e:
print(e)
break
conn.close()
def server():
while True:
conn,addr= s.accept()
print(f'用户{addr}已连接')
# 监听work函数,进行并发
spawn(work,conn)
if __name__ == '__main__':
# 监听server函数,进行并发
s1 = spawn(server)
s1.join()
==========================================================
'''客户端'''
import socket
from threading import Thread, current_thread
def client():
c = socket.socket()
c.connect(
('127.0.0.1',8848)
)
print('启动客户端...')
num = 0
while True:
data = f'{current_thread().name}{num}'
c.send(data.encode('utf-8'))
da = c.recv(1024)
print(da.decode('utf-8'))
num += 1
# 模拟30个用户并发去访问客户端
for i in range(30):
t = Thread(target=client)
t.start()
假设四个任务每个任务需要10s,在单核多核的情况下代码情况.
单核下
开启进程
4个进程共: 40s,消耗资源过大
开启线程
4个进程共: 40s,消耗资源远小于进程
多核下
开启进程
并行执行,效率比较高,4个进程需要10s
开启线程
并发执行,效率比较低,4个进程40s
计算密集型中,多核操作多进程比多线程效率高,因为线程不能实现并行,
而进程可以实现多进程操作大大节省时间.
单核下
开启进程
4个进程共: 40s,消耗资源过大
开启线程
4个进程共: 40s,消耗资源远小于进程
多核下
开启进程
并行执行,效率小于线程,因为遇到IO会立马切换CPU的执行权限
4个进程: 40s + 开启进程消耗的额外时间
开启线程
并发执行,4个进程 40s
在IO密集型情况下,因为碰到IO操作会使CPU进行切换+保存的操作,而线程又比进程占用的资源切换的速度小,再加上开启进程消耗的时间,多核上多线程反而比多进程效率要高(IO密集型的情况下)
计算密集型,(多进程明显比多线程要快)
from threading import Thread
from multiprocessing import Process
import time
# 计算密集型
def work1():
num = 0
for l in range(100000000):
num += 1
# IO密集型
def work2():
time.sleep(1)
if __name__ == '__main__':
# 开始时间
start_time = time.time()
list1 = []
for l in range(4):
# 测试计算密集型
# p = Process(target=work1) # 进程 程序执行时间为21.6076238155365
p = Thread(target=work1) # 线程 程序执行时间为39.6529426574707
list1.append(p)
p.start()
for p in list1:
p.join()
end_start = time.time()
print(f'程序执行时间为{end_start-start_time}')
# print(os.cpu_count()) # 查看cpu的个数 4
IO密集型 (多线程比多进程效率高)
from threading import Thread
from multiprocessing import Process
import time
# 计算密集型
def work1():
num = 0
for l in range(100000000):
num += 1
# IO密集型
def work2():
time.sleep(1)
if __name__ == '__main__':
# 开始时间
start_time = time.time()
list1 = []
for l in range(10):
# 测试计算密集型
p = Process(target=work2) # 进程 程序执行时间为6.449297666549683
# p = Thread(target=work2) # 线程 程序执行时间为1.00590920448303224707
list1.append(p)
p.start()
for p in list1:
p.join()
end_start = time.time()
print(f'程序执行时间为{end_start-start_time}')
在计算密集型的情况下,使用多进程
在IO密集型的情况下,使用多线程
如果要高效执行多个IO密集型的程序: 使用多进程 + 多线程
进程池与线程池是用来控制当前程序允许创建(进程/线程)的数量.
线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。
使用线程池可以有效地控制系统中并发线程的数量
进程池
from concurrent.futures import ProcessPoolExecutor来调用
ProcessPoolExecutor(5) # 代表只能开启5个进程
ProcessPoolExecutor() # 不写默认以cpu的个数限制进程数
线程池
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(5) # 代表只能开启5个进程
pool = ThreadPoolExecutor() # 不写默认以'cpu个数*5'限制进程数
池表示一个容器,本质上就是一个存储进程或线程的列表
如果是IO密集型任务使用线程池
如果是计算密集任务则使用进程池
在很多情况下需要控制进程或线程的数量在一个合理的范围,例如TCP程序中,一个客户端对应一个线程,虽然线程的开销小,但肯定不能无限的开,否则系统资源迟早被耗尽,解决的办法就是控制线程的数量。
线程/进程池不仅帮我们控制线程/进程的数量,还帮我们完成了线程/进程的创建,销毁,以及任务的分配
进程池的使用:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,os
# 创建进程池,指定最大进程数为3,此时不会创建进程,不指定数量时,默认为CPU和核数
pool = ProcessPoolExecutor(3)
def task():
time.sleep(1)
print(os.getpid(),"working..")
if __name__ == '__main__':
for i in range(10):
pool.submit(task) # 提交任务时立即创建进程
# 任务执行完成后也不会立即销毁进程
time.sleep(2)
for i in range(10):
pool.submit(task) #再有新任务是 直接使用之前已经创建好的进程来执行
线程池的使用:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import current_thread,active_count
import time,os
# 创建进程池,指定最大线程数为3,此时不会创建线程,不指定数量时,默认为CPU和核数*5
pool = ThreadPoolExecutor(3)
print(active_count()) # 只有一个主线
def task():
time.sleep(1)
print(current_thread().name,"working..")
if __name__ == '__main__':
for i in range(10):
pool.submit(task) # 第一次提交任务时立即创建线程
# 任务执行完成后也不会立即销毁
time.sleep(2)
for i in range(10):
pool.submit(task) #再有新任务是 直接使用之前已经创建好的线程来执行
案例:TCP中的应用
首先要明确,TCP是IO密集型,应该使用线程池
.submit()
.submit(‘传函数地址‘)
异步提交任务,与t = Thread() t.start()
一样
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(5) # 同时提交5个线程任务
def task():
print('线程任务开始...')
time.sleep(1)
print('线程任务结束...')
for i in range(5):
pool.submit(task)
.shutdown()
会让所有线程池的任务结束后,才往下执行代码
add_done_callback()
回调函数就是一个通过函数指针调用的函数。如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指针被用来调用其所指向的函数时,我们就说这是回调函数
异步回调指的是:在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数。
pool.submit(‘传函数地址‘).add_done_callback(‘添加回调函数‘)
'''submit(task)执行task函数拿到task返回值,通过add_done_callback将返回值传入回调函数,回调函数需传形参,得到的是对象通过`对象.result()`得到线程任务返回的结果'''
from concurrent.futures import ThreadPoolExecutor
import time
pool = ThreadPoolExecutor(5) # 同时提交5个线程任务
def task():
print('任务开始...')
time.sleep(1)
print('线程任务结束...')
return 123
# 回调函数
def call_b(res):
# 渎职操作不要与接收的res重名
data = res.result() # .result()去取出res对象中的值
print(data)
for i in range(5):
pool.submit(task).add_done_callback(call_b)
1、什么是回调:
异步回调指的是:在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数。
2、为什么需要回调函数
需要获取异步任务的执行结果,但是又不应该让其阻塞(降低效率),即想要高效的获取任务的执行结果。
之前在使用线程池或进程池提交任务时,如果想要处理任务的执行结果则必须调用result函数或是shutdown函数,而它们都是是阻塞的,会等到任务执行完毕后才能继续执行,这样一来在这个等待过程中就无法执行其他任务,降低了效率,所以需要一种方案,即保证解析结果的线程不用等待,又能保证数据能够及时被解析,该方案就是异步回调。
3、如何使用异步回调
通常情况下,异步都会和回调函数一起使用,使用方法即是add_done_callback(),给Future对象绑定一个回调函数。
注意:在多进程中回调函数 是交给主进程来执行 而在多线程中 回调函数是谁有空谁执行(不是主线程)
总结:异步回调使用方法就是在提交任务后得到一个Futures对象,调用对象的add_done_callback来指定一个回调函数。
如果把任务比喻为烧水,没有回调时就只能守着水壶等待水开,有了回调相当于换了一个会响的水壶,烧水期间可用作其他的事情,等待水开了水壶会自动发出声音,这时候再回来处理。水壶自动发出声音就是回调。
注意:
同步异步-阻塞非阻塞,经常会被程序员提及,并且概念非常容易混淆!
指的是程序的运行状态
阻塞:当程序执行过程中遇到了IO操作,在执行IO操作时,程序无法继续执行其他代码,称为阻塞!
非阻塞:程序在正常运行没有遇到IO操作,或者通过某种方式使程序即时遇到了也不会停在原地,还可以执行其他操作,以提高CPU的占用率
指的是提交任务的方式
同步调用:发起任务后必须在原地等待任务执行完成,才能继续执行
异步调用:发起任务后不用等待任务执行,可以立即开启执行其他操作
同步会有等待的效果但是这和阻塞是完全不同的,阻塞时程序会被剥夺CPU执行权,而同步调用则不会!
很明显异步调用效率更高,但是任务的执行结果如何获取呢?
程序中的异步调用并获取结果:
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time
?
pool = ThreadPoolExecutor(3)
def task(i):
time.sleep(0.01)
print(current_thread().name,"working..")
return i ** i
?
if __name__ == '__main__':
objs = []
for i in range(3):
res_obj = pool.submit(task,i) # 异步方式提交任务# 会返回一个对象用于表示任务结果
objs.append(res_obj)
?
# 该函数默认是阻塞的 会等待池子中所有任务执行结束后执行
pool.shutdown(wait=True)
?
# 从结果对象中取出执行结果
for res_obj in objs:
print(res_obj.result())
print("over")
程序中的异步调用并获取结果:
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time
?
pool = ThreadPoolExecutor(3)
def task(i):
time.sleep(0.01)
print(current_thread().name,"working..")
return i ** i
?
if __name__ == '__main__':
objs = []
for i in range(3):
res_obj = pool.submit(task,i) # 会返回一个对象用于表示任务结果
print(res_obj.result()) #result是同步的一旦调用就必须等待 任务执行完成拿到结果
print("over")
死锁是指锁无法打开了,二导致程序卡死。因为一把锁一般不会存在锁死的情况,所以死锁一般出现在多把锁共同作用的情况下。
进程锁的开锁与解锁进程因遇到IO操作切换进程,导致各自被占用,导致死锁.
可以通过给锁上锁来保证每次进入锁定状态的代码出去之后才能进行第二个进程,并发变串行.
###死锁示例
from multiprocessing import Process,Lock
import time
def task1(l1,l2,i):
l1.acquire()
print("盘子被%s抢走了" % i)
time.sleep(1)
?
l2.acquire()
print("筷子被%s抢走了" % i)
print("吃饭..")
l1.release()
l2.release()
?
def task2(l1,l2,i):
?
l2.acquire()
print("筷子被%s抢走了" % i)
?
l1.acquire()
print("盘子被%s抢走了" % i)
?
print("吃饭..")
l1.release()
l2.release()
?
if __name__ == '__main__':
l1 = Lock()
l2 = Lock()
Process(target=task1,args=(l1,l2,1)).start()
Process(target=task2,args=(l1,l2,2)).start()
from threading import Lock,Thread,current_thread
import time
mutex_a = Lock()
mutex_b = Lock()
class MyThread(Thread):
# 线程执行任务
def run(self):
self.func1()
self.func2()
def func1(self):
mutex_a.acquire() # a加锁
print(f'用户{current_thread().name}抢到了锁a')
# 与 print(f'用户{self.name}抢到了锁a') 一样
mutex_b.acquire() # b加锁
print(f'用户{current_thread().name}抢到了锁b')
mutex_b.release() # b 解锁
print(f'{current_thread().name}解放锁b')
mutex_a.release() # a 解锁
print(f'{current_thread().name}解放锁a')
def func2(self):
mutex_b.acquire() # b 加锁
print(f'用户{current_thread().name}抢到了锁b')
# IO操作
time.sleep(1)
mutex_a.acquire() # a 加锁
print(f'用户{current_thread().name}抢到了锁a')
mutex_a.release() # a 解锁
print(f'{current_thread().name}解放锁a')
mutex_b.release() # b 解锁
print(f'{current_thread().name}解放锁b')
for l in range(10):
t = MyThread()
t.start()
'''
用户Thread-1抢到了锁a
用户Thread-1抢到了锁b
Thread-1解放锁b
Thread-1解放锁a
用户Thread-1抢到了锁b
用户Thread-2抢到了锁a'''
用户Thread-1依次加锁解锁 a b,
运行func1结束进入func2:抢到了锁b,然后遇到IO暂停操作
用户Thread-2开始启动,func1 中抢到了锁a,
但锁b 被用户Thread-1所持有,
程序暂停
锁不能乱用
解决死锁问题# RLocK
可以提供给多人使用,但是第一个使用时,会对该锁做引用计数,只用引用计数为0时,才会让另一个人继续使用
'''用于解决死锁现象'''
# RLocK
from threading import RLock,Thread,current_thread
import time
mutex_a = mutex_b = RLock()
class MyThread(Thread):
# 线程执行任务
def run(self):
self.func1()
self.func2()
def func1(self):
mutex_a.acquire() # a加锁
print(f'用户{current_thread().name}抢到了锁a')
# 与 print(f'用户{self.name}抢到了锁a') 一样
mutex_b.acquire() # b加锁
print(f'用户{current_thread().name}抢到了锁b')
mutex_b.release() # b 解锁
print(f'{current_thread().name}解放锁b')
mutex_a.release() # a 解锁
print(f'{current_thread().name}解放锁a')
def func2(self):
mutex_b.acquire() # b 加锁
print(f'用户{current_thread().name}抢到了锁b')
# IO操作
time.sleep(1)
mutex_a.acquire() # a 加锁
print(f'用户{current_thread().name}抢到了锁a')
mutex_a.release() # a 解锁
print(f'{current_thread().name}解放锁a')
mutex_b.release() # b 解锁
print(f'{current_thread().name}解放锁b')
for l in range(10):
t = MyThread()
t.start()
Python中的io模块是用来处理各种类型的I/O操作流。主要有三种类型的I/O类型:文本I/O(Text I/O),二进制I/O(Binary I/O)和原始I/O(Raw I/O)。它们都是通用类别,每一种都有不同的后备存储。属于这些类别中的任何一个的具体对象称为文件对象,其他常用的术语为流或者类文件对象。
IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:
1)等待数据准备 (Waiting for the data to be ready)
2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)
常用四种IO模型
原文:https://www.cnblogs.com/fwzzz/p/11743223.html