3.1 多进程:进程间通信
admin
2024-03-20 19:26:03
0

1. 进程间通信

为了进程安全起见,两个进程之间的数据是不能够互相访问的(默认情况下),进程与进程之间的数据是不可以互相访问的,而且每一个进程的内存是独立的。多进程的资源是独立的,不可以互相访问,如果想多个进程之间实现数据交互就必须通过中间件实现。进程间通信方法有Queue、Pipes、Mangers和Value,Array四种。

(1)进程队列(Queue)通信

Queue([maxsize]):建立一个共享的队列(其实并不是共享的,实际是克隆的,内部维护着数据的共享),多个进程可以向队列里存/取数据。其中,参数是队列最大项数,省略则无限制。Queue的常用方法如下:

  • qsize():返回当前队列包含的消息数量。
  • empty():如果队列为空,返回True;返之返回False。
  • full():如果队列满了,返回True;反之返回False。
  • get(block[,timeout]):获取队列中的一条信息,然后将其从队列中移除,block默认值为True。如果block使用默认值,且没有设置timeout(单位秒),消息队列为空,此时程序将被阻塞(停在读取状态),直到从消息队列读到消息为止。如果设置了timeout,则会等待timeout秒,若还没有读取任何消息,则抛出“Queue.Empty”异常。 如果block值为False,消息队列为空,则会立刻抛出“Queue.Empty”异常。
  • Queue.get_nowait():相当于Queue.get(False)。
  • Queue.put(item,[block[,timeout]]):将item消息写入队列,block默认值为True。如果block使用默认值,且没有设置timeout(单位秒),消息队列如果已经没有空间可以写入,此时程序将被阻塞(停在写入状态),直到从消息队列腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没有空间,则抛出“Queue.Full”异常。如果block值为False,消息队列没有空间可写入,则会立刻抛出“Queue.Full”异常
  • Queue.put_nowait(item):相当Queue.put(item,False)。
from multiprocessing import Queue, Processdef fun(q, i):q.put([1 * i, 2 * i, 3 * i])if __name__ == '__main__':Q = Queue(5)  # 设置进程队列长度for i in range(2):  # 启动两个进程,想队列里put数据process = Process(target=fun, args=(Q, i + 1))  # 创建一个进程,将Q传入,实际上是克隆了Qprocess.start()process.join()print(Q.qsize())  # 2print(Q.get())    # [1, 2, 3]print(Q.get())    # [2, 4, 6]print(Q.qsize())  # 0

(2)进程管道(Pipes)通信

多进程还有一种数据传递方式叫管道原理和 Queue相同。Pipe可以在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。

构造方法:Pipe([dumplex]) dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

实例方法:

  • send(obj):通过连接发送对象。obj是与序列化兼容的任意对象。
  • recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
  • close():关闭连接。如果conn1被垃圾回收,将自动调用此方法。
  • fileno():返回连接使用的整数文件描述符。
  • poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
  • recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
  • send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收。
  • recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
from multiprocessing import Process, Pipeimport timedef f(child_conn):time.sleep(1)child_conn.send("吃了吗") # 给父进程发送消息print("来自父亲的问候:", child_conn.recv()) #接收父进程发送的消息child_conn.close()if __name__ == "__main__":parent_conn, child_conn = Pipe()  # 创建管道两端,必须在创建Process之前创建Pipep = Process(target=f, args=(child_conn,))  # 创建子进程p.start()print("来自儿子的问候:", parent_conn.recv())# 接收子进程的消息parent_conn.send("嗯") # 给子进程发送消息

(3)进程的Mangers通信

Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。

Manager实现了多个进程间的数据共享,支持的数据类型有 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value , Array。

Manager中 以下类型均不是进程安全的, Manager只是保证的数据能传递但是没有保证数据的安全所以很多时候需要我们自己通过进程锁来控制的。下面不安全的类型部分在进程里是有安全的可以通过from multiprocessing import Array,Lock,Queue 方式使用,没有的那么只能加锁了。

Array(self,*args,**kwds)
BoundedSemaphore(self,*args,**kwds)
Condition(self,*args,**kwds)
Event(self,*args,**kwds)
JoinableQueue(self,*args,**kwds)
Lock(self,*args,**kwds)
Namespace(self,*args,**kwds)
Pool(self,*args,**kwds)
Queue(self,*args,**kwds)
RLock(self,*args,**kwds)
Semaphore(self,*args,**kwds)
Value(self,*args,**kwds)
dict(self,*args,**kwds)
list(self,*args,**kwds)

例子:

import multiprocessingdef f(x, arr, l, d, n):x.value = 3.14arr[0] = 5l.append('Hello')d["name"] = "xxx"n.a = 10
if __name__ == '__main__':manager = multiprocessing.Manager()x = manager.Value('d', 0.0)arr = manager.Array('i', range(10))l = manager.list()d = manager.dict()n = manager.Namespace() # 以上代码需要在进程创建前使用proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))proc.start()proc.join()print(x.value)print(arr)print(l)print(d)print(n.a)

假如有一个IpConnectionPool对象需要多个进程共享,步骤如下:

  • 创建BaseManager;
  • 注册IpConnectionPool对象;
  • 开启一个IpConnectionPool对象进程;
  • 获取进程共享IpConnectionPool对象;
  • IpConnectionPool对象传进需要使用的进程里。
from multiprocessing.managers import BaseManager# 多进程对象共享
manager = BaseManager()
# 一定要在start前注册,不然就注册无效
# 注册数据库线程池对象
manager.register('IpConnectionPool', IpConnectionPool)
# 启动manager服务(就是开了一个新进程来管理IpConnectionPool对象的)
manager.start()
# 获取进程对象
ipConnectionPool = manager.IpConnectionPool() 
# manager必须在主线程进行创建进程否则报错
pool = multiprocessing.Pool(int(multiprocessing.cpu_count()))  # 创建进程池
for i in range(2,3423):# 数据库线程池对象传入到进程中使用pool.apply_async(batchInsert,args=(i,ipConnectionPool),callback=call_back,error_callback=err_call_back)time.sleep(5)
pool.close()
pool.join()

(4)Value,Array资源共享

multiprocessing 中Value和Array的实现原理都是在共享内存中创建ctypes()对象来达到共享数据的目的,两者实现方法大同小异,只是选用不同的ctypes数据类型而已。

① Value

构造方法:Value((typecode_or_type, args[, lock])
  • typecode_or_type:定义ctypes()对象的类型,可以传Type code或 C Type。
  • args:传递给typecode_or_type构造函数的参数
  • lock:默认为True,创建一个互斥锁来限制对Value对象的访问,如果传入一个锁,如Lock或RLock的实例,将用于同步。如果传入False,Value的实例就不会被锁保护,它将不是进程安全的。

② Array

构造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])
  • typecode_or_type:同上。
  • size_or_initializer:如果它是一个整数,那么它确定数组的长度,并且数组将被初始化为零。否则,size_or_initializer是用于初始化数组的序列,其长度决定数组的长度。
  • kwds:传递给typecode_or_type构造函数的参数。
  • lock:同上。
import multiprocessingdef f(n, a):n.value = 3.14a[0] = 5if __name__ == '__main__':num = multiprocessing.Value('d', 0.0)arr = multiprocessing.Array('i', range(10))p = multiprocessing.Process(target=f, args=(num, arr))p.start()p.join()print(num.value)print(arr[:])

2. 进程锁

多进程之间不共享数据,但共享同一套文件系统,像访问同一个文件、同一终端打印,如果不进行同步操作,就会出现错乱的现象。
所有在 threading 存在的同步方式,multiprocessing 中都有类似的等价物,如:锁(lock,RLock)、信号量Semaphore,Event(事件)Condition,Barrier等。

(1)锁Lock

from multiprocessing import Process, Lock
import timedef fun(l, i):l.acquire()      # 获取锁print("正在运行进程: ", i)time.sleep(2)l.release()      # 使用完后释放锁if __name__ == '__main__':lock = Lock()                                # 生成锁的实例for i in range(5):p = Process(target=fun, args=(lock, i))  # 创建进程p.start()                                # 启动,这里没有join,进程可以并发

(2)可重入锁RLock

from multiprocessing import Process,RLockif __name__ == '__main__':# 创建一个rlock对象lock = RLock()# 初始化共享资源abce = 0# 本线程访问共享资源lock.acquire()  # 加锁abce = abce + 1# 这个线程尝试访问共享资源lock.acquire()  # 再次加锁abce = abce + 2lock.release()  # 释放里面的锁lock.release()  # 释放外面的锁print(abce)

上面就够用了。

(3)Condition状态

(4)Semaphore信号量

(5)Event事件

(6)Barrier屏障

相关内容

热门资讯

安卓系统的如何测试软件,从入门... 你有没有想过,你的安卓手机里那些神奇的软件是怎么诞生的呢?它们可不是凭空出现的,而是经过一系列严格的...
小米8安卓系统版本,安卓系统版... 你有没有发现,手机更新换代的速度简直就像坐上了火箭呢?这不,小米8这款手机自从上市以来,就凭借着出色...
华为手机安卓系统7以上,创新体... 你有没有发现,最近华为手机越来越受欢迎了呢?尤其是那些搭载了安卓系统7.0及以上版本的机型,简直让人...
儿童英语免费安卓系统,儿童英语... 哇,亲爱的家长朋友们,你是否在为孩子的英语学习发愁呢?别担心,今天我要给你带来一个超级好消息——儿童...
ios系统切换安卓系统还原,还... 你有没有想过,有一天你的手机从iOS系统切换到了安卓系统,然后再从安卓系统回到iOS系统呢?这听起来...
灵焕3装安卓系统,引领智能新体... 你知道吗?最近手机圈里可是掀起了一股热潮,那就是灵焕3这款神器的安卓系统升级。没错,就是那个曾经以独...
安卓系统指南针软件,探索未知世... 手机里的指南针功能是不是让你在户外探险时倍感神奇?但你知道吗,安卓系统中的指南针软件可是大有学问呢!...
华为是不用安卓系统了吗,迈向自... 最近有个大新闻在科技圈里炸开了锅,那就是华为是不是不再使用安卓系统了?这可不是一个简单的问题,它涉及...
安卓系统热点开启失败,排查与解... 最近是不是你也遇到了安卓系统热点开启失败的小麻烦?别急,让我来给你详细说说这个让人头疼的问题,说不定...
小米max2系统安卓,安卓系统... 你有没有听说过小米Max2这款手机?它那超大的屏幕,简直就像是个移动的电脑屏幕,看视频、玩游戏,那叫...
电池健康怎么保持安卓系统,优化... 手机可是我们生活中不可或缺的好伙伴,而电池健康度就是它的生命力。你有没有发现,随着使用时间的增长,你...
安卓手机怎么调系统颜色,安卓手... 你有没有发现,你的安卓手机屏幕颜色突然变得不那么顺眼了?是不是也想给它换换“脸色”,让它看起来更有个...
安卓系统清粉哪个好,哪款清粉工... 手机用久了,是不是觉得卡得要命?别急,今天就来聊聊安卓系统清理垃圾哪个软件好。市面上清理工具那么多,...
华为被限制用安卓系统,挑战安卓... 你知道吗?最近科技圈可是炸开了锅!华为,这个我们耳熟能详的名字,竟然因为一些“小插曲”被限制了使用安...
安卓系统是不是外国,源自外国的... 你有没有想过,我们每天离不开的安卓系统,它是不是外国货呢?这个问题听起来可能有点奇怪,但确实很多人都...
安卓系统缺少文件下载,全面解析... 你有没有发现,用安卓手机的时候,有时候下载个文件真是让人头疼呢?别急,今天就来聊聊这个让人烦恼的小问...
kktv系统刷安卓系统怎么样,... 你有没有听说最近KKTV系统刷安卓系统的事情?这可是个热门话题呢!咱们一起来聊聊,看看这个新玩意儿到...
安卓系统连接电脑蓝牙,操作指南... 你有没有遇到过这种情况:手机里堆满了各种好用的应用,可就是想找个方便快捷的方式,把手机里的音乐、照片...
安卓车机11.0系统包,智能驾... 你有没有发现,最近你的安卓车机系统好像悄悄升级了呢?没错,就是那个安卓车机11.0系统包!这可不是一...
安卓系统最高到多少,从初代到最... 你有没有想过,你的安卓手机系统升级到哪一步了呢?是不是好奇安卓系统最高能到多少呢?别急,今天就来带你...