Python进程、线程、协程

type
Post
status
Published
summary
在python中,由于大部分编译器中都默认设置了GIL机制,GIL锁的的存在就是为了防止多线程并发执行机器码的互斥锁(mutex)(为了确保线程运行安全)。
slug
python-process-thread-coroutine
date
Apr 16, 2020
tags
进程
python基础
线程
category
基础知识
password
icon
URL
Property
Feb 28, 2024 01:10 PM

一、概念理解

需求创造了功能,为了满足各种实际需求,就出现了各种功能。
并发:为了能一起执行多个程序,于是就出现了并发;并发现象表现在视觉上就是多个程序一起执行,但是实际上是多个cpu同时运行多个程序呢,还是一个cpu在多个程序之间切换呢,这就不得而知了,反正这就是并发。
进程:并发有一个问题就是程序之间的数据,变量这些程序独有的东西容易混乱;于是需求出现了,为了解决并发带来的问题,进程出现了,进程能够将每个程序的地址空间,内存,数据等进行独立管理,避免混乱。
并行:后来cpu核数多了,那不能让其他人闲着吧,大家一起跑吧,这就是并行。
线程:但是后来发现,进程一多,当正在占用cpu的进程卡住了(可能是I\O阻塞、时钟阻塞等),那不能一直等你啊,我得把cpu给下一个进程用啊,那就切换嘛;但是切换进程需要进入内核,置换掉一大堆状态,进程数一高,大部分系统资源就被进程切换给吃掉了。所以,线程就出现了。在一个进程里面搞多个线程,一个线程卡住了,那我另一个线程还可以跑啊。而线程之间的切换就比进程的切换要省事多了,因为进程内的所有线程共享同内存、变量、数据等,甚至连进程号都相同。
协程:原本的线程太过僵硬了,必须要等到有人操作或者规定的时间到了才会切换,这样可能就会导致有程序阻塞后其他没有执行的线程只能看着,而协程就是用户自己写的逻辑流调度程序,即可以利用到并发优势,又可以避免反复系统调用,还有进程切换造成的开销。
首先协程算是用户态的线程,优势主要是少了内核态用户态的切换和能自己来做调度。然后协程一般只在有IO操作的时候才能用到,对于一些会阻塞的IO操作,可以自己选择协程切换,等IO就绪了再切回来,可以更充分利用CPU。
 
他们之间的关系是:系统中可以运行多个进程,一个进程中可有多个线程,一个线程中可有多个协程
 
深入了解推荐:
 
在python中,由于大部分编译器中都默认设置了GIL机制,所以python在使用这些编译器的时候会受到GIL锁的限制,GIL锁的的存在就是为了防止多线程并发执行机器码的互斥锁(mutex)(为了确保线程运行安全)。GIL的运行机制是只有拿到GIL的线程才能调用CPU运行。一个线程有两种情况下会释放全局解释器锁,一种情况是在该线程进入IO操作之前,会主动释放GIL,另一种情况是解释器不间断运行了1000字节码(Py2)或运行15毫秒(Py3)后,该线程也会放弃GIL。所以python的多线程其实并不是同时运行,而是交替运行,只是交替的速度很快,看起来像是多线程。
 

二、多进程实现

python中想实现多进程需要依赖multiprocessing或jobli包,multiprocessing包中又有多种实现多进程的方法,详细如下:

2.1、Process(用于创建进程模块)

2.1.0、方法属性

点击查看
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) ## 参数说明 # 常用参数 target: 要要调用的函数或方法; args/kwargs: 要传入方法的参数。args=(a,);kwargs={'name'=a,} # 以下是不常用参数 group: 线程组,目前还没有实现,库引用中提示必须是None; # 每个进程都属于一个进程组(PG,Process Group),进程组可以包含多个进程。进程组有一个进程组长(Leader),进程组长的ID(PID, Process ID)就作为整个进程组的ID(PGID,Process Groupd ID)。 name: 进程名; ## 方法实例 需要注意的是start(),join(),is_alive(), terminate()和exitcode方法只能由创建进程对象的过程调用。 is_alive():返回进程是否存活。从start() 方法返回到子进程终止的那一刻,进程对象仍处于活动状态。 join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。 start():进程准备就绪,等待CPU调度 run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。 terminate():不管任务是否完成,立即停止工作进程  daemon:进程的守护进程标志,一个布尔值。必须在start()调用之前设置。 # 在创建线程实例的时候可以设置该参数(实例中默认为None),或者在start()前设置这个参数,如果该参数为True,则会将该进程设置为主进程的守护进程,只要是其他子进程结束且主进程执行完毕,不管被设置为守护进程的程序是否执行完,主线程都会关闭。 name:进程名字。 pid:进程号。 ## 代码示例 from multiprocessing import Process import time import os def info(): print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info() time.sleep(3) print('hello', name) if __name__ == '__main__': info() p = Process(target=f, args=('bob',)) # p.daemon = False print(p.daemon) p.start() p.join(1) print('name:', p.name) print('is_alive:', p.is_alive()) print('exitcode:', p.exitcode) ''' ------------------------------------------------------------ module name: __main__ parent process: 1188 process id: 13060 False module name: __mp_main__ parent process: 13060 process id: 13424 name: Process-1 is_alive: True exitcode: None hello bob ------------------------------------------------------------ ''' if __name__ == '__main__': info() p = Process(target=f, args=('bob',)) p.daemon = True print(p.daemon) p.start() # p.join(1) print('name:', p.name) print('is_alive:', p.is_alive()) print('exitcode:', p.exitcode) ''' ------------------------------------------------------------ module name: __main__ parent process: 1188 process id: 1668 True name: Process-1 is_alive: True exitcode: None ------------------------------------------------------------ '''

2.1.1、直接创建

点击查看
# 1.导入包和模块 import multiprocessing import time def sing(): for i in range(3): print("i am sing ooo~") time.sleep(0.5) if __name__ == '__main__': # 2.使用进程类创建进程对象 # target :指定进程执行的函数名,不加括号 sing_process = multiprocessing.Process(target=sing) # 3. 使用进程对象启动进程执行指定任务 sing_process.start()

2.1.2、循环创建(🉑️;重点关注获取返回值)

点击查看
# 1.导入包和模块 import multiprocessing import time def main_province2(data_prov, return_list): k = data_prov return_list.append(k) return return_list return_list = multiprocessing.Manager().list() # 使用multiprocessing的Manager下的list,dict等模块接收返回值 jobs = [] for data_prov in data_prov_list: p = multiprocessing.Process(target=main_province2, args=(data_prov,return_list)) # 将返回值送到函数去接收返回值 jobs.append(p) p.start() # 开始进程 for proc in jobs: proc.join() # 阻塞进程,等待所有进程执行完成 return_list # 最终的接收值

2.2、Pool(用于创建管理进程池)

Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

2.2.0、方法属性

点击查看
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) ## 参数详解 # processes: 是要使用的工作进程数。如果进程是None,那么使用返回的数字os.cpu_count()。也就是说根据本地的cpu个数决定,processes小于等于本地的cpu个数; # initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。 # maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。 # context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。 ## 方法实例 apply(func [,args [,kwds ] ]) # pool.apply(test, args=(i,)) # 使用堵塞方式调用函数,必须等待上一个进程退出才能执行下一个进程 apply_async(func [,args [,kwds [,callback [,error_callback ] ] ] ]) # pool.apply_async(test, args=(i,)) # 使用非阻塞方式调用函数,可以并行执行 map(func,iterable [,chunksize ]) # pool.map(test, lists) # 使用堵塞方式调用函数,此方法将iterable内的每一个对象作为单独的任务提交给进程池 map_async(func,iterable [,chunksize [,callback [,error_callback ] ] ]) # pool.map_async(test, range(500)) # 使用非阻塞方式调用函数,将iterable内的每一个对象作为单独的任务提交给进程池 apply_async 和 map_async 的区别 imap(func,iterable [,chunksize ]) # 使用和map相同,但是该方法将返回迭代器,使用next()获取迭代器内的内容 close():不在接受新的任务进入进程池,等待正在运行的所有进程结束后,关闭线程池 terminal() — 立即结束所有进程,即使在运行中的进程也会被停止 join() — 主进程阻塞等待子进程执行完之后再执行, join方法要在close或terminate之后使用。(守护进程) ## 异步方法返回的结果获取 get([timeout]) # result = pool.apply_async(time.sleep, (10,)) # print(result.get(timeout=1)) # 获取返回结果,timeout可以不设置。如果timeout不是None并且结果没有在timeout秒内到达,则 multiprocessing.TimeoutError 被引发。

2.2.1、apply_async循环启用进程池中的进程

点击查看
# 导入包和模块 import multiprocessing # 创建进程池 pool = multiprocessing.Pool(20) #异步调用 results = [] for data_4a in data_prov_system_acct4a_lists: result = pool.apply_async(main_province, args=(data_4a,)) results.append(result) pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 #获取处理结果 data_acct4a_out_list_2 = [res.get() for res in results] data_acct4a_out_df_2 = pd.concat(data_acct4a_out_list_2, join='inner', axis=0)
使用apply_async方法时,子进程不执行的情况
  • 参数需要以元组的形式传递,并在最后一个参数后面加上 ,号,如果没有加,子进程不会执行
  • 关闭进程池之前使用get()函数会导致进程阻塞(可以使用callback = log_result参数来代替get调回函数结果)

2.2.2、map_async映射启用进程池中的进程(🉑️)

点击查看
# 导入包和模块 import multiproceing # 创建进程池 pool = multiprocessing.Pool(20) #异步调用 res = pool.map_async(main_province, data_prov_system_acct4a_lists) #获取处理结果 data_acct4a_out_list_1 = [res_img for res_img in res.get()] data_acct4a_out_df_1 = pd.concat(data_acct4a_out_list_1, join='inner', axis=0)

2.3、jobli包实现多线程

推荐阅读:
pip install joblib from math import sqrt from joblib import Parallel, delayed Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] # Parallel(n_jobs=2): 指定两个CPU(默认是分配给不同的CPU) # 后面的delayed(sqrt)表示要用的函数是sqrt,这里这种用法就非常类似C++里面的委托(delegate)。 # (i**2) for i in range(10): 这里注意(i**2)的括号和delayed(sqrt)是紧挨着的。这一小段表示要传递给delayed中指定的函数的参数是i^2。

2.4、队列

2.4.1、队列应用场景

有时候在启用多进程时, 多个进程可能同时访问和修改共享的数据,这可能导致数据不一致性和竞争条件。为了解决这个问题,可以考虑使用进程间通信机制(IPC)来同步和协调进程之间的操作。
1, 使用进程池(multiprocessing.Pool)中的锁机制。可以使用multiprocessing.Lock()创建锁对象,然后在进程内部的关键区域使用lock.acquire()获取锁,完成关键操作后使用lock.release()释放锁。这样可以确保同一时间只有一个进程在访问共享数据。使用锁机制确保了数据的正确性,但是同时也降低了并行的效率,因为多个进程需要竞争同一个锁。
2, 使用进程池(multiprocessing.Pool)中的队列(multiprocessing.Queue)机制。可以将数据放入队列中,然后让进程从队列中获取数据并进行处理。这种方式可以确保多个进程之间的数据互不干扰,避免了数据竞争的问题。数据进入不同进程是经过拷贝的

2.4.2、方法属性

import multiprocessing queue = multiprocessing.Queue(队列长度) put # queue.put(数据),放入数据(如队列已满,则程序进入阻塞状态,等待队列取出后再放入) put_nowait # queue.put_nowati(数据),放入数据(如队列已满,则不等待队列信息取出后再放入,直接报错) get # queue.get(数据),取出数据(如队列为空,则程序进入阻塞状态,等待队列防如数据后再取出) get_nowait # queue.get_nowait(数据),取出数据(如队列为空,则不等待队列放入信息后取出数据,直接报错),放入数据后立马判断是否为空有时为True,原因是放入值和判断同时进行 qsize # queue.qsize(),消息数量 empty # queue.empty()(返回值为True或False),判断是否为空 full # queue.full()(返回值为True或False),判断是否为满

2.4.3、实例

import multiprocessing def square(n): return n * n if __name__ == '__main__': data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 创建进程池和队列 pool = multiprocessing.Pool() queue = multiprocessing.Manager().Queue() # 将要处理的数据放入队列 for n in data: queue.put(n) # 并发处理数据 results = [] while not queue.empty(): n = queue.get() result = pool.apply_async(square, args=(n,)) results.append(result) # 关闭进程池 pool.close() pool.join() # 获取处理结果 final_results = [r.get() for r in results] print(final_results)

2.5、资源查看

os.cpu_count() # 查看cpu核数 os.getpid() # 当前进程id os.getppid() # 当前父进程id

2.6、推荐阅读

三、多线程实现

多线程主要是通过threading包实现

python多线程详解

什么是线程? 线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。 线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其他线程共享进程所 拥有的全部资源。一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发执行

为什么要使用多线程?

线程在程序中是独立的、并发的执行流。与分隔的进程相比,进程中线程之间的隔离程度要小,它们共享内存、文件句柄 和其他进程应有的状态。 因为线程的划分尺度小于进程,使得多线程程序的并发性高。进程在执行过程之中拥有独立的内存单元,而多个线程共享 内存,从而极大的提升了程序的运行效率。 线程比进程具有更高的性能,这是由于同一个进程中的线程都有共性,多个线程共享一个进程的虚拟空间。线程的共享环境 包括进程代码段、进程的共有数据等,利用这些共享的数据,线程之间很容易实现通信。 操作系统在创建进程时,必须为改进程分配独立的内存空间,并分配大量的相关资源,但创建线程则简单得多。因此,使用多线程 来实现并发比使用多进程的性能高得要多。
总结起来,使用多线程编程具有如下几个优点:
进程之间不能共享内存,但线程之间共享内存非常容易。 操作系统在创建进程时,需要为该进程重新分配系统资源,但创建线程的代价则小得多。因此使用多线程来实现多任务并发执行比使用多进程的效率高 python语言内置了多线程功能支持,而不是单纯地作为底层操作系统的调度方式,从而简化了python的多线程编程。

实现多线程

点击查看
import threading from threading import Lock,Thread import time,os # 普通创建方式 def run(n): print('task',n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if __name__ == '__main__': t1 = threading.Thread(target=run,args=('t1',)) # target是要执行的函数名(不是函数),args是函数对应的参数,以元组的形式存在 t2 = threading.Thread(target=run,args=('t2',)) t1.start() t2.start() # 自定义线程:继承threading.Thread来定义线程类,其本质是重构Thread类中的run方法 class MyThread(threading.Thread): def __init__(self,n): super(MyThread,self).__init__() #重构run函数必须写 self.n = n def run(self): print('task',self.n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if __name__ == '__main__': t1 = MyThread('t1') t2 = MyThread('t2') t1.start() t2.start()

守护线程

下面这个例子,这里使用setDaemon(True)把所有的子线程都变成了主线程的守护线程, 因此当主线程结束后,子线程也会随之结束,所以当主线程结束后,整个程序就退出了。 所谓’线程守护’,就是主线程不管该线程的执行情况,只要是其他子线程结束且主线程执行完毕,主线程都会关闭。也就是说:主线程不等待该守护线程的执行完再去关闭。
点击查看
def run(n): print('task',n) time.sleep(1) print('3s') time.sleep(1) print('2s') time.sleep(1) print('1s') if __name__ == '__main__': t=threading.Thread(target=run,args=('t1',)) t.setDaemon(True) t.start() print('end') # 通过执行结果可以看出,设置守护线程之后,当主线程结束时,子线程也将立即结束,不再执行 # 主线程等待子线程结束 # 为了让守护线程执行结束之后,主线程再结束,我们可以使用join方法,让主线程等待子线程执行 def run(n): print('task',n) time.sleep(2) print('5s') time.sleep(2) print('3s') time.sleep(2) print('1s') if __name__ == '__main__': t=threading.Thread(target=run,args=('t1',)) t.setDaemon(True) #把子线程设置为守护线程,必须在start()之前设置 t.start() t.join() #设置主线程等待子线程结束 print('end') # 多线程共享全局变量 # 线程时进程的执行单元,进程时系统分配资源的最小执行单位,所以在同一个进程中的多线程是共享资源的 g_num = 100 def work1(): global g_num for i in range(3): g_num+=1 print('in work1 g_num is : %d' % g_num) def work2(): global g_num print('in work2 g_num is : %d' % g_num) if __name__ == '__main__': t1 = threading.Thread(target=work1) t1.start() time.sleep(1) t2=threading.Thread(target=work2) t2.start()

GIL和其他

点击查看
''' 由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据, 所以出现了线程锁,即同一时刻允许一个线程执行操作。线程锁用于锁定资源,可以定义多个锁,像下面的代码,当需要独占 某一个资源时,任何一个锁都可以锁定这个资源,就好比你用不同的锁都可以把这个相同的门锁住一样。 由于线程之间是进行随机调度的,如果有多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期, 我们因此也称为“线程不安全”。 为了防止上面情况的发生,就出现了互斥锁(Lock) ''' # def work(): # global n # lock.acquire() # temp = n # time.sleep(0.1) # n = temp-1 # lock.release() # # # if __name__ == '__main__': # lock = Lock() # n = 100 # l = [] # for i in range(100): # p = Thread(target=work) # l.append(p) # p.start() # for p in l: # p.join() ''' 递归锁:RLcok类的用法和Lock类一模一样,但它支持嵌套,在多个锁没有释放的时候一般会使用RLock类 ''' # def func(lock): # global gl_num # lock.acquire() # gl_num += 1 # time.sleep(1) # print(gl_num) # lock.release() # # # if __name__ == '__main__': # gl_num = 0 # lock = threading.RLock() # for i in range(10): # t = threading.Thread(target=func,args=(lock,)) # t.start() ''' 信号量(BoundedSemaphore类) 互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,比如厕所有3个坑, 那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去 ''' # def run(n,semaphore): # semaphore.acquire() #加锁 # time.sleep(3) # print('run the thread:%s\n' % n) # semaphore.release() #释放 # # # if __name__== '__main__': # num=0 # semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行 # for i in range(22): # t = threading.Thread(target=run,args=('t-%s' % i,semaphore)) # t.start() # while threading.active_count() !=1: # pass # else: # print('----------all threads done-----------') ''' python线程的事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象,其主要提供以下的几个方法: clear将flag设置为 False set将flag设置为 True is_set判断是否设置了flag wait会一直监听flag,如果没有检测到flag就一直处于阻塞状态 事件处理的机制:全局定义了一个Flag,当Flag的值为False,那么event.wait()就会阻塞,当flag值为True, 那么event.wait()便不再阻塞 ''' event = threading.Event() def lighter(): count = 0 event.set() #初始者为绿灯 while True: if 5 < count <=10: event.clear() #红灯,清除标志位 print("\33[41;lmred light is on...\033[0m]") elif count > 10: event.set() #绿灯,设置标志位 count = 0 else: print('\33[42;lmgreen light is on...\033[0m') time.sleep(1) count += 1 def car(name): while True: if event.is_set(): #判断是否设置了标志位 print('[%s] running.....'%name) time.sleep(1) else: print('[%s] sees red light,waiting...'%name) event.wait() print('[%s] green light is on,start going...'%name) # startTime = time.time() light = threading.Thread(target=lighter,) light.start() car = threading.Thread(target=car,args=('MINT',)) car.start() endTime = time.time() # print('用时:',endTime-startTime)
 

GIL 全局解释器

在非python环境中,单核情况下,同时只能有一个任务执行。多核时可以支持多个线程同时执行。但是在python中,无论有多少个核同时只能执行一个线程。究其原因,这就是由于GIL的存在导致的。 GIL的全程是全局解释器,来源是python设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到GIL,我们可以把GIL看做是“通行证”,并且在一个python进程之中,GIL只有一个。拿不到线程的通行证,并且在一个python进程中,GIL只有一个,拿不到通行证的线程,就不允许进入CPU执行。GIL只在cpython中才有,因为cpython调用的是c语言的原生线程,所以他不能直接操作cpu,而只能利用GIL保证同一时间只能有一个线程拿到数据。而在pypy和jpython中是没有GIL的python在使用多线程的时候,调用的是c语言的原生过程。

python针对不同类型的代码执行效率也是不同的

1、CPU密集型代码(各种循环处理、计算等),在这种情况下,由于计算工作多,ticks技术很快就会达到阀值,然后出发GIL的释放与再竞争(多个线程来回切换当然是需要消耗资源的),所以python下的多线程对CPU密集型代码并不友好。
2、IO密集型代码(文件处理、网络爬虫等设计文件读写操作),多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序的执行效率)。所以python的多线程对IO密集型代码比较友好。
主要要看任务的类型,我们把任务分为I/O密集型和计算密集型,而多线程在切换中又分为I/O切换和时间切换。如果任务属于是I/O密集型,若不采用多线程,我们在进行I/O操作时,势必要等待前面一个I/O任务完成后面的I/O任务才能进行,在这个等待的过程中,CPU处于等待状态,这时如果采用多线程的话,刚好可以切换到进行另一个I/O任务。这样就刚好可以充分利用CPU避免CPU处于闲置状态,提高效率。但是如果多线程任务都是计算型,CPU会一直在进行工作,直到一定的时间后采取多线程时间切换的方式进行切换线程,此时CPU一直处于工作状态,此种情况下并不能提高性能,相反在切换多线程任务时,可能还会造成时间和资源的浪费,导致效能下降。这就是造成上面两种多线程结果不能的解释。 结论: I/O密集型任务,建议采取多线程,还可以采用多进程+协程的方式(例如:爬虫多采用多线程处理爬取的数据); 对于计算密集型任务,python此时就不适用了。
 

使用过程中遇到的问题

1、python版本限制进程池处理的数据量

报错代码:'i' format requires -2147483648 <= number <= 2147483647
问题原因:多进程时候,进程间数据交换是通过pickling,因为处理的文本都比较大,当数据pickled时候超过了i struct的限制,-2147483648 <= number <= 2147483647;Python3.8中在非windows平台修复了这个问题,可以支持最大4EB的数据。
解决办法:
  1. 将每个进程结果写入文件,最后汇总处理,这样避免进程间大量数据传递
  1. 代码中检查Python版本,低于3.8时候不使用多进程。
  1. 使用dask dataframe等Python并行处理库
具体思路:主要路线是上面👆的第一条
尝试:
1、换成process方式实现(缺点是进程数会跟着省份数增加,不固定)
2、由于使用pool方式的循环创建并不能缩短时间,所以还是使用pool直接创建,但是可以把process中的数据接收参数使用进来
 

2、进程池中进程的数量增加,而运行效率并没有提升

notion imagenotion image
进程还需要处理I/O请求,在一定数据量内,进程数量和处理速度会有一个平衡点。
If you have any questions, please contact me.