3.1 多进程:进程间通信
admin
2024-03-20 19:26:03
0

1. 进程间通信

为了进程安全起见,两个进程之间的数据是不能够互相访问的(默认情况下),进程与进程之间的数据是不可以互相访问的,而且每一个进程的内存是独立的。多进程的资源是独立的,不可以互相访问,如果想多个进程之间实现数据交互就必须通过中间件实现。进程间通信方法有Queue、Pipes、Mangers和Value,Array四种。

(1)进程队列(Queue)通信

Queue([maxsize]):建立一个共享的队列(其实并不是共享的,实际是克隆的,内部维护着数据的共享),多个进程可以向队列里存/取数据。其中,参数是队列最大项数,省略则无限制。Queue的常用方法如下:

  • qsize():返回当前队列包含的消息数量。
  • empty():如果队列为空,返回True;返之返回False。
  • full():如果队列满了,返回True;反之返回False。
  • get(block[,timeout]):获取队列中的一条信息,然后将其从队列中移除,block默认值为True。如果block使用默认值,且没有设置timeout(单位秒),消息队列为空,此时程序将被阻塞(停在读取状态),直到从消息队列读到消息为止。如果设置了timeout,则会等待timeout秒,若还没有读取任何消息,则抛出“Queue.Empty”异常。 如果block值为False,消息队列为空,则会立刻抛出“Queue.Empty”异常。
  • Queue.get_nowait():相当于Queue.get(False)。
  • Queue.put(item,[block[,timeout]]):将item消息写入队列,block默认值为True。如果block使用默认值,且没有设置timeout(单位秒),消息队列如果已经没有空间可以写入,此时程序将被阻塞(停在写入状态),直到从消息队列腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没有空间,则抛出“Queue.Full”异常。如果block值为False,消息队列没有空间可写入,则会立刻抛出“Queue.Full”异常
  • Queue.put_nowait(item):相当Queue.put(item,False)。
from multiprocessing import Queue, Processdef fun(q, i):q.put([1 * i, 2 * i, 3 * i])if __name__ == '__main__':Q = Queue(5)  # 设置进程队列长度for i in range(2):  # 启动两个进程,想队列里put数据process = Process(target=fun, args=(Q, i + 1))  # 创建一个进程,将Q传入,实际上是克隆了Qprocess.start()process.join()print(Q.qsize())  # 2print(Q.get())    # [1, 2, 3]print(Q.get())    # [2, 4, 6]print(Q.qsize())  # 0

(2)进程管道(Pipes)通信

多进程还有一种数据传递方式叫管道原理和 Queue相同。Pipe可以在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。

构造方法:Pipe([dumplex]) dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。

实例方法:

  • send(obj):通过连接发送对象。obj是与序列化兼容的任意对象。
  • recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
  • close():关闭连接。如果conn1被垃圾回收,将自动调用此方法。
  • fileno():返回连接使用的整数文件描述符。
  • poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
  • recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
  • send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收。
  • recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
from multiprocessing import Process, Pipeimport timedef f(child_conn):time.sleep(1)child_conn.send("吃了吗") # 给父进程发送消息print("来自父亲的问候:", child_conn.recv()) #接收父进程发送的消息child_conn.close()if __name__ == "__main__":parent_conn, child_conn = Pipe()  # 创建管道两端,必须在创建Process之前创建Pipep = Process(target=f, args=(child_conn,))  # 创建子进程p.start()print("来自儿子的问候:", parent_conn.recv())# 接收子进程的消息parent_conn.send("嗯") # 给子进程发送消息

(3)进程的Mangers通信

Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。

Manager实现了多个进程间的数据共享,支持的数据类型有 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value , Array。

Manager中 以下类型均不是进程安全的, Manager只是保证的数据能传递但是没有保证数据的安全所以很多时候需要我们自己通过进程锁来控制的。下面不安全的类型部分在进程里是有安全的可以通过from multiprocessing import Array,Lock,Queue 方式使用,没有的那么只能加锁了。

Array(self,*args,**kwds)
BoundedSemaphore(self,*args,**kwds)
Condition(self,*args,**kwds)
Event(self,*args,**kwds)
JoinableQueue(self,*args,**kwds)
Lock(self,*args,**kwds)
Namespace(self,*args,**kwds)
Pool(self,*args,**kwds)
Queue(self,*args,**kwds)
RLock(self,*args,**kwds)
Semaphore(self,*args,**kwds)
Value(self,*args,**kwds)
dict(self,*args,**kwds)
list(self,*args,**kwds)

例子:

import multiprocessingdef f(x, arr, l, d, n):x.value = 3.14arr[0] = 5l.append('Hello')d["name"] = "xxx"n.a = 10
if __name__ == '__main__':manager = multiprocessing.Manager()x = manager.Value('d', 0.0)arr = manager.Array('i', range(10))l = manager.list()d = manager.dict()n = manager.Namespace() # 以上代码需要在进程创建前使用proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))proc.start()proc.join()print(x.value)print(arr)print(l)print(d)print(n.a)

假如有一个IpConnectionPool对象需要多个进程共享,步骤如下:

  • 创建BaseManager;
  • 注册IpConnectionPool对象;
  • 开启一个IpConnectionPool对象进程;
  • 获取进程共享IpConnectionPool对象;
  • IpConnectionPool对象传进需要使用的进程里。
from multiprocessing.managers import BaseManager# 多进程对象共享
manager = BaseManager()
# 一定要在start前注册,不然就注册无效
# 注册数据库线程池对象
manager.register('IpConnectionPool', IpConnectionPool)
# 启动manager服务(就是开了一个新进程来管理IpConnectionPool对象的)
manager.start()
# 获取进程对象
ipConnectionPool = manager.IpConnectionPool() 
# manager必须在主线程进行创建进程否则报错
pool = multiprocessing.Pool(int(multiprocessing.cpu_count()))  # 创建进程池
for i in range(2,3423):# 数据库线程池对象传入到进程中使用pool.apply_async(batchInsert,args=(i,ipConnectionPool),callback=call_back,error_callback=err_call_back)time.sleep(5)
pool.close()
pool.join()

(4)Value,Array资源共享

multiprocessing 中Value和Array的实现原理都是在共享内存中创建ctypes()对象来达到共享数据的目的,两者实现方法大同小异,只是选用不同的ctypes数据类型而已。

① Value

构造方法:Value((typecode_or_type, args[, lock])
  • typecode_or_type:定义ctypes()对象的类型,可以传Type code或 C Type。
  • args:传递给typecode_or_type构造函数的参数
  • lock:默认为True,创建一个互斥锁来限制对Value对象的访问,如果传入一个锁,如Lock或RLock的实例,将用于同步。如果传入False,Value的实例就不会被锁保护,它将不是进程安全的。

② Array

构造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])
  • typecode_or_type:同上。
  • size_or_initializer:如果它是一个整数,那么它确定数组的长度,并且数组将被初始化为零。否则,size_or_initializer是用于初始化数组的序列,其长度决定数组的长度。
  • kwds:传递给typecode_or_type构造函数的参数。
  • lock:同上。
import multiprocessingdef f(n, a):n.value = 3.14a[0] = 5if __name__ == '__main__':num = multiprocessing.Value('d', 0.0)arr = multiprocessing.Array('i', range(10))p = multiprocessing.Process(target=f, args=(num, arr))p.start()p.join()print(num.value)print(arr[:])

2. 进程锁

多进程之间不共享数据,但共享同一套文件系统,像访问同一个文件、同一终端打印,如果不进行同步操作,就会出现错乱的现象。
所有在 threading 存在的同步方式,multiprocessing 中都有类似的等价物,如:锁(lock,RLock)、信号量Semaphore,Event(事件)Condition,Barrier等。

(1)锁Lock

from multiprocessing import Process, Lock
import timedef fun(l, i):l.acquire()      # 获取锁print("正在运行进程: ", i)time.sleep(2)l.release()      # 使用完后释放锁if __name__ == '__main__':lock = Lock()                                # 生成锁的实例for i in range(5):p = Process(target=fun, args=(lock, i))  # 创建进程p.start()                                # 启动,这里没有join,进程可以并发

(2)可重入锁RLock

from multiprocessing import Process,RLockif __name__ == '__main__':# 创建一个rlock对象lock = RLock()# 初始化共享资源abce = 0# 本线程访问共享资源lock.acquire()  # 加锁abce = abce + 1# 这个线程尝试访问共享资源lock.acquire()  # 再次加锁abce = abce + 2lock.release()  # 释放里面的锁lock.release()  # 释放外面的锁print(abce)

上面就够用了。

(3)Condition状态

(4)Semaphore信号量

(5)Event事件

(6)Barrier屏障

相关内容

热门资讯

【MySQL】锁 锁 文章目录锁全局锁表级锁表锁元数据锁(MDL)意向锁AUTO-INC锁...
【内网安全】 隧道搭建穿透上线... 文章目录内网穿透-Ngrok-入门-上线1、服务端配置:2、客户端连接服务端ÿ...
GCN的几种模型复现笔记 引言 本篇笔记紧接上文,主要是上一篇看写了快2w字,再去接入代码感觉有点...
数据分页展示逻辑 import java.util.Arrays;import java.util.List;impo...
Redis为什么选择单线程?R... 目录专栏导读一、Redis版本迭代二、Redis4.0之前为什么一直采用单线程?三、R...
【已解决】ERROR: Cou... 正确指令: pip install pyyaml
关于测试,我发现了哪些新大陆 关于测试 平常也只是听说过一些关于测试的术语,但并没有使用过测试工具。偶然看到编程老师...
Lock 接口解读 前置知识点Synchronized synchronized 是 Java 中的关键字,...
Win7 专业版安装中文包、汉... 参考资料:http://www.metsky.com/archives/350.htm...
3 ROS1通讯编程提高(1) 3 ROS1通讯编程提高3.1 使用VS Code编译ROS13.1.1 VS Code的安装和配置...
大模型未来趋势 大模型是人工智能领域的重要发展趋势之一,未来有着广阔的应用前景和发展空间。以下是大模型未来的趋势和展...
python实战应用讲解-【n... 目录 如何在Python中计算残余的平方和 方法1:使用其Base公式 方法2:使用statsmod...
学习u-boot 需要了解的m... 一、常用函数 1. origin 函数 origin 函数的返回值就是变量来源。使用格式如下...
常用python爬虫库介绍与简... 通用 urllib -网络库(stdlib)。 requests -网络库。 grab – 网络库&...
药品批准文号查询|药融云-中国... 药品批文是国家食品药品监督管理局(NMPA)对药品的审评和批准的证明文件...
【2023-03-22】SRS... 【2023-03-22】SRS推流搭配FFmpeg实现目标检测 说明: 外侧测试使用SRS播放器测...
有限元三角形单元的等效节点力 文章目录前言一、重新复习一下有限元三角形单元的理论1、三角形单元的形函数(Nÿ...
初级算法-哈希表 主要记录算法和数据结构学习笔记,新的一年更上一层楼! 初级算法-哈希表...
进程间通信【Linux】 1. 进程间通信 1.1 什么是进程间通信 在 Linux 系统中,进程间通信...
【Docker】P3 Dock... Docker数据卷、宿主机与挂载数据卷的概念及作用挂载宿主机配置数据卷挂载操作示例一个容器挂载多个目...