Python进程、线程、协程
type
Post
status
Published
summary
在python中,由于大部分编译器中都默认设置了GIL机制,GIL锁的的存在就是为了防止多线程并发执行机器码的互斥锁(mutex)(为了确保线程运行安全)。
slug
python-process-thread-coroutine
date
Apr 16, 2020
tags
进程
python基础
线程
category
基础知识
password
icon
URL
Property
Feb 28, 2024 01:10 PM
一、概念理解
需求创造了功能,为了满足各种实际需求,就出现了各种功能。
并发:为了能一起执行多个程序,于是就出现了并发;并发现象表现在视觉上就是多个程序一起执行,但是实际上是多个cpu同时运行多个程序呢,还是一个cpu在多个程序之间切换呢,这就不得而知了,反正这就是并发。
进程:并发有一个问题就是程序之间的数据,变量这些程序独有的东西容易混乱;于是需求出现了,为了解决并发带来的问题,进程出现了,进程能够将每个程序的地址空间,内存,数据等进行独立管理,避免混乱。
并行:后来cpu核数多了,那不能让其他人闲着吧,大家一起跑吧,这就是并行。
线程:但是后来发现,进程一多,当正在占用cpu的进程卡住了(可能是I\O阻塞、时钟阻塞等),那不能一直等你啊,我得把cpu给下一个进程用啊,那就切换嘛;但是切换进程需要进入内核,置换掉一大堆状态,进程数一高,大部分系统资源就被进程切换给吃掉了。所以,线程就出现了。在一个进程里面搞多个线程,一个线程卡住了,那我另一个线程还可以跑啊。而线程之间的切换就比进程的切换要省事多了,因为进程内的所有线程共享同内存、变量、数据等,甚至连进程号都相同。
协程:原本的线程太过僵硬了,必须要等到有人操作或者规定的时间到了才会切换,这样可能就会导致有程序阻塞后其他没有执行的线程只能看着,而协程就是用户自己写的逻辑流调度程序,即可以利用到并发优势,又可以避免反复系统调用,还有进程切换造成的开销。
首先协程算是用户态的线程,优势主要是少了内核态用户态的切换和能自己来做调度。然后协程一般只在有IO操作的时候才能用到,对于一些会阻塞的IO操作,可以自己选择协程切换,等IO就绪了再切回来,可以更充分利用CPU。
他们之间的关系是:系统中可以运行多个进程,一个进程中可有多个线程,一个线程中可有多个协程
深入了解推荐:
在python中,由于大部分编译器中都默认设置了GIL机制,所以python在使用这些编译器的时候会受到GIL锁的限制,GIL锁的的存在就是为了防止多线程并发执行机器码的互斥锁(mutex)(为了确保线程运行安全)。GIL的运行机制是只有拿到GIL的线程才能调用CPU运行。一个线程有两种情况下会释放全局解释器锁,一种情况是在该线程进入IO操作之前,会主动释放GIL,另一种情况是解释器不间断运行了1000字节码(Py2)或运行15毫秒(Py3)后,该线程也会放弃GIL。所以python的多线程其实并不是同时运行,而是交替运行,只是交替的速度很快,看起来像是多线程。
二、多进程实现
python中想实现多进程需要依赖multiprocessing或jobli包,multiprocessing包中又有多种实现多进程的方法,详细如下:
2.1、Process(用于创建进程模块)
2.1.0、方法属性
点击查看
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) ## 参数说明 # 常用参数 target: 要要调用的函数或方法; args/kwargs: 要传入方法的参数。args=(a,);kwargs={'name'=a,} # 以下是不常用参数 group: 线程组,目前还没有实现,库引用中提示必须是None; # 每个进程都属于一个进程组(PG,Process Group),进程组可以包含多个进程。进程组有一个进程组长(Leader),进程组长的ID(PID, Process ID)就作为整个进程组的ID(PGID,Process Groupd ID)。 name: 进程名; ## 方法实例 需要注意的是start(),join(),is_alive(), terminate()和exitcode方法只能由创建进程对象的过程调用。 is_alive():返回进程是否存活。从start() 方法返回到子进程终止的那一刻,进程对象仍处于活动状态。 join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。 start():进程准备就绪,等待CPU调度 run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。 terminate():不管任务是否完成,立即停止工作进程 daemon:进程的守护进程标志,一个布尔值。必须在start()调用之前设置。 # 在创建线程实例的时候可以设置该参数(实例中默认为None),或者在start()前设置这个参数,如果该参数为True,则会将该进程设置为主进程的守护进程,只要是其他子进程结束且主进程执行完毕,不管被设置为守护进程的程序是否执行完,主线程都会关闭。 name:进程名字。 pid:进程号。 ## 代码示例 from multiprocessing import Process import time import os def info(): print('module name:', __name__) print('parent process:', os.getppid()) print('process id:', os.getpid()) def f(name): info() time.sleep(3) print('hello', name) if __name__ == '__main__': info() p = Process(target=f, args=('bob',)) # p.daemon = False print(p.daemon) p.start() p.join(1) print('name:', p.name) print('is_alive:', p.is_alive()) print('exitcode:', p.exitcode) ''' ------------------------------------------------------------ module name: __main__ parent process: 1188 process id: 13060 False module name: __mp_main__ parent process: 13060 process id: 13424 name: Process-1 is_alive: True exitcode: None hello bob ------------------------------------------------------------ ''' if __name__ == '__main__': info() p = Process(target=f, args=('bob',)) p.daemon = True print(p.daemon) p.start() # p.join(1) print('name:', p.name) print('is_alive:', p.is_alive()) print('exitcode:', p.exitcode) ''' ------------------------------------------------------------ module name: __main__ parent process: 1188 process id: 1668 True name: Process-1 is_alive: True exitcode: None ------------------------------------------------------------ '''
2.1.1、直接创建
点击查看
# 1.导入包和模块 import multiprocessing import time def sing(): for i in range(3): print("i am sing ooo~") time.sleep(0.5) if __name__ == '__main__': # 2.使用进程类创建进程对象 # target :指定进程执行的函数名,不加括号 sing_process = multiprocessing.Process(target=sing) # 3. 使用进程对象启动进程执行指定任务 sing_process.start()
2.1.2、循环创建(🉑️;重点关注获取返回值)
点击查看
# 1.导入包和模块 import multiprocessing import time def main_province2(data_prov, return_list): k = data_prov return_list.append(k) return return_list return_list = multiprocessing.Manager().list() # 使用multiprocessing的Manager下的list,dict等模块接收返回值 jobs = [] for data_prov in data_prov_list: p = multiprocessing.Process(target=main_province2, args=(data_prov,return_list)) # 将返回值送到函数去接收返回值 jobs.append(p) p.start() # 开始进程 for proc in jobs: proc.join() # 阻塞进程,等待所有进程执行完成 return_list # 最终的接收值
2.2、Pool(用于创建管理进程池)
Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
2.2.0、方法属性
点击查看
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) ## 参数详解 # processes: 是要使用的工作进程数。如果进程是None,那么使用返回的数字os.cpu_count()。也就是说根据本地的cpu个数决定,processes小于等于本地的cpu个数; # initializer: 如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。 # maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。 # context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。 ## 方法实例 apply(func [,args [,kwds ] ]) # pool.apply(test, args=(i,)) # 使用堵塞方式调用函数,必须等待上一个进程退出才能执行下一个进程 apply_async(func [,args [,kwds [,callback [,error_callback ] ] ] ]) # pool.apply_async(test, args=(i,)) # 使用非阻塞方式调用函数,可以并行执行 map(func,iterable [,chunksize ]) # pool.map(test, lists) # 使用堵塞方式调用函数,此方法将iterable内的每一个对象作为单独的任务提交给进程池 map_async(func,iterable [,chunksize [,callback [,error_callback ] ] ]) # pool.map_async(test, range(500)) # 使用非阻塞方式调用函数,将iterable内的每一个对象作为单独的任务提交给进程池 apply_async 和 map_async 的区别 imap(func,iterable [,chunksize ]) # 使用和map相同,但是该方法将返回迭代器,使用next()获取迭代器内的内容 close():不在接受新的任务进入进程池,等待正在运行的所有进程结束后,关闭线程池 terminal() — 立即结束所有进程,即使在运行中的进程也会被停止 join() — 主进程阻塞等待子进程执行完之后再执行, join方法要在close或terminate之后使用。(守护进程) ## 异步方法返回的结果获取 get([timeout]) # result = pool.apply_async(time.sleep, (10,)) # print(result.get(timeout=1)) # 获取返回结果,timeout可以不设置。如果timeout不是None并且结果没有在timeout秒内到达,则 multiprocessing.TimeoutError 被引发。
2.2.1、apply_async循环启用进程池中的进程
点击查看
# 导入包和模块 import multiprocessing # 创建进程池 pool = multiprocessing.Pool(20) #异步调用 results = [] for data_4a in data_prov_system_acct4a_lists: result = pool.apply_async(main_province, args=(data_4a,)) results.append(result) pool.close() pool.join() #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束 #获取处理结果 data_acct4a_out_list_2 = [res.get() for res in results] data_acct4a_out_df_2 = pd.concat(data_acct4a_out_list_2, join='inner', axis=0)
使用apply_async方法时,子进程不执行的情况
- 参数需要以元组的形式传递,并在最后一个参数后面加上 ,号,如果没有加,子进程不会执行
- 关闭进程池之前使用get()函数会导致进程阻塞(可以使用
callback = log_result
参数来代替get调回函数结果)
2.2.2、map_async映射启用进程池中的进程(🉑️)
点击查看
# 导入包和模块 import multiproceing # 创建进程池 pool = multiprocessing.Pool(20) #异步调用 res = pool.map_async(main_province, data_prov_system_acct4a_lists) #获取处理结果 data_acct4a_out_list_1 = [res_img for res_img in res.get()] data_acct4a_out_df_1 = pd.concat(data_acct4a_out_list_1, join='inner', axis=0)
它们的主要区别在于它们处理任务和返回结果的方式。
apply_async
:一次只提交一个任务。返回一个
AsyncResult
对象,你可以调用它的 get()
方法来获取结果。允许你为每个任务提供不同的参数。map_async
:类似于内置的
map
函数,它接受一个函数和一个可迭代的参数列表,然后将函数并行地应用于所有的参数。返回一个 AsyncResult
对象,你可以调用它的 get()
方法来获取一个包含所有结果的列表。所有任务使用相同的函数,但是参数从提供的可迭代对象中获取。2.3、jobli包实现多线程
推荐阅读:
pip install joblib from math import sqrt from joblib import Parallel, delayed Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10)) [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] # Parallel(n_jobs=2): 指定两个CPU(默认是分配给不同的CPU) # 后面的delayed(sqrt)表示要用的函数是sqrt,这里这种用法就非常类似C++里面的委托(delegate)。 # (i**2) for i in range(10): 这里注意(i**2)的括号和delayed(sqrt)是紧挨着的。这一小段表示要传递给delayed中指定的函数的参数是i^2。
2.4、队列
2.4.1、队列应用场景
有时候在启用多进程时, 多个进程可能同时访问和修改共享的数据,这可能导致数据不一致性和竞争条件。为了解决这个问题,可以考虑使用进程间通信机制(IPC)来同步和协调进程之间的操作。
1, 使用进程池(
multiprocessing.Pool
)中的锁机制。可以使用multiprocessing.Lock()
创建锁对象,然后在进程内部的关键区域使用lock.acquire()
获取锁,完成关键操作后使用lock.release()
释放锁。这样可以确保同一时间只有一个进程在访问共享数据。使用锁机制确保了数据的正确性,但是同时也降低了并行的效率,因为多个进程需要竞争同一个锁。2, 使用进程池(
multiprocessing.Pool
)中的队列(multiprocessing.Queue
)机制。可以将数据放入队列中,然后让进程从队列中获取数据并进行处理。这种方式可以确保多个进程之间的数据互不干扰,避免了数据竞争的问题。数据进入不同进程是经过拷贝的2.4.2、方法属性
import multiprocessing queue = multiprocessing.Queue(队列长度) put # queue.put(数据),放入数据(如队列已满,则程序进入阻塞状态,等待队列取出后再放入) put_nowait # queue.put_nowati(数据),放入数据(如队列已满,则不等待队列信息取出后再放入,直接报错) get # queue.get(数据),取出数据(如队列为空,则程序进入阻塞状态,等待队列防如数据后再取出) get_nowait # queue.get_nowait(数据),取出数据(如队列为空,则不等待队列放入信息后取出数据,直接报错),放入数据后立马判断是否为空有时为True,原因是放入值和判断同时进行 qsize # queue.qsize(),消息数量 empty # queue.empty()(返回值为True或False),判断是否为空 full # queue.full()(返回值为True或False),判断是否为满
2.4.3、实例
import multiprocessing def square(n): return n * n if __name__ == '__main__': data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 创建进程池和队列 pool = multiprocessing.Pool() queue = multiprocessing.Manager().Queue() # 将要处理的数据放入队列 for n in data: queue.put(n) # 并发处理数据 results = [] while not queue.empty(): n = queue.get() result = pool.apply_async(square, args=(n,)) results.append(result) # 关闭进程池 pool.close() pool.join() # 获取处理结果 final_results = [r.get() for r in results] print(final_results)
2.5、资源查看
os.cpu_count() # 查看cpu核数 os.getpid() # 当前进程id os.getppid() # 当前父进程id
2.6、推荐阅读
三、多线程实现
多线程主要是通过threading包实现
python多线程详解
什么是线程?
线程也叫轻量级进程,是操作系统能够进行运算调度的最小单位,它被包涵在进程之中,是进程中的实际运作单位。
线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其他线程共享进程所
拥有的全部资源。一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发执行
为什么要使用多线程?
线程在程序中是独立的、并发的执行流。与分隔的进程相比,进程中线程之间的隔离程度要小,它们共享内存、文件句柄
和其他进程应有的状态。
因为线程的划分尺度小于进程,使得多线程程序的并发性高。进程在执行过程之中拥有独立的内存单元,而多个线程共享
内存,从而极大的提升了程序的运行效率。
线程比进程具有更高的性能,这是由于同一个进程中的线程都有共性,多个线程共享一个进程的虚拟空间。线程的共享环境
包括进程代码段、进程的共有数据等,利用这些共享的数据,线程之间很容易实现通信。
操作系统在创建进程时,必须为改进程分配独立的内存空间,并分配大量的相关资源,但创建线程则简单得多。因此,使用多线程
来实现并发比使用多进程的性能高得要多。
总结起来,使用多线程编程具有如下几个优点:
进程之间不能共享内存,但线程之间共享内存非常容易。
操作系统在创建进程时,需要为该进程重新分配系统资源,但创建线程的代价则小得多。因此使用多线程来实现多任务并发执行比使用多进程的效率高
python语言内置了多线程功能支持,而不是单纯地作为底层操作系统的调度方式,从而简化了python的多线程编程。
实现多线程
点击查看
import threading from threading import Lock,Thread import time,os # 普通创建方式 def run(n): print('task',n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if __name__ == '__main__': t1 = threading.Thread(target=run,args=('t1',)) # target是要执行的函数名(不是函数),args是函数对应的参数,以元组的形式存在 t2 = threading.Thread(target=run,args=('t2',)) t1.start() t2.start() # 自定义线程:继承threading.Thread来定义线程类,其本质是重构Thread类中的run方法 class MyThread(threading.Thread): def __init__(self,n): super(MyThread,self).__init__() #重构run函数必须写 self.n = n def run(self): print('task',self.n) time.sleep(1) print('2s') time.sleep(1) print('1s') time.sleep(1) print('0s') time.sleep(1) if __name__ == '__main__': t1 = MyThread('t1') t2 = MyThread('t2') t1.start() t2.start()
守护线程
下面这个例子,这里使用setDaemon(True)把所有的子线程都变成了主线程的守护线程,
因此当主线程结束后,子线程也会随之结束,所以当主线程结束后,整个程序就退出了。
所谓’线程守护’,就是主线程不管该线程的执行情况,只要是其他子线程结束且主线程执行完毕,主线程都会关闭。也就是说:主线程不等待该守护线程的执行完再去关闭。
点击查看
def run(n): print('task',n) time.sleep(1) print('3s') time.sleep(1) print('2s') time.sleep(1) print('1s') if __name__ == '__main__': t=threading.Thread(target=run,args=('t1',)) t.setDaemon(True) t.start() print('end') # 通过执行结果可以看出,设置守护线程之后,当主线程结束时,子线程也将立即结束,不再执行 # 主线程等待子线程结束 # 为了让守护线程执行结束之后,主线程再结束,我们可以使用join方法,让主线程等待子线程执行 def run(n): print('task',n) time.sleep(2) print('5s') time.sleep(2) print('3s') time.sleep(2) print('1s') if __name__ == '__main__': t=threading.Thread(target=run,args=('t1',)) t.setDaemon(True) #把子线程设置为守护线程,必须在start()之前设置 t.start() t.join() #设置主线程等待子线程结束 print('end') # 多线程共享全局变量 # 线程时进程的执行单元,进程时系统分配资源的最小执行单位,所以在同一个进程中的多线程是共享资源的 g_num = 100 def work1(): global g_num for i in range(3): g_num+=1 print('in work1 g_num is : %d' % g_num) def work2(): global g_num print('in work2 g_num is : %d' % g_num) if __name__ == '__main__': t1 = threading.Thread(target=work1) t1.start() time.sleep(1) t2=threading.Thread(target=work2) t2.start()
GIL和其他
点击查看
''' 由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据, 所以出现了线程锁,即同一时刻允许一个线程执行操作。线程锁用于锁定资源,可以定义多个锁,像下面的代码,当需要独占 某一个资源时,任何一个锁都可以锁定这个资源,就好比你用不同的锁都可以把这个相同的门锁住一样。 由于线程之间是进行随机调度的,如果有多个线程同时操作一个对象,如果没有很好地保护该对象,会造成程序结果的不可预期, 我们因此也称为“线程不安全”。 为了防止上面情况的发生,就出现了互斥锁(Lock) ''' # def work(): # global n # lock.acquire() # temp = n # time.sleep(0.1) # n = temp-1 # lock.release() # # # if __name__ == '__main__': # lock = Lock() # n = 100 # l = [] # for i in range(100): # p = Thread(target=work) # l.append(p) # p.start() # for p in l: # p.join() ''' 递归锁:RLcok类的用法和Lock类一模一样,但它支持嵌套,在多个锁没有释放的时候一般会使用RLock类 ''' # def func(lock): # global gl_num # lock.acquire() # gl_num += 1 # time.sleep(1) # print(gl_num) # lock.release() # # # if __name__ == '__main__': # gl_num = 0 # lock = threading.RLock() # for i in range(10): # t = threading.Thread(target=func,args=(lock,)) # t.start() ''' 信号量(BoundedSemaphore类) 互斥锁同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据,比如厕所有3个坑, 那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去 ''' # def run(n,semaphore): # semaphore.acquire() #加锁 # time.sleep(3) # print('run the thread:%s\n' % n) # semaphore.release() #释放 # # # if __name__== '__main__': # num=0 # semaphore = threading.BoundedSemaphore(5) #最多允许5个线程同时运行 # for i in range(22): # t = threading.Thread(target=run,args=('t-%s' % i,semaphore)) # t.start() # while threading.active_count() !=1: # pass # else: # print('----------all threads done-----------') ''' python线程的事件用于主线程控制其他线程的执行,事件是一个简单的线程同步对象,其主要提供以下的几个方法: clear将flag设置为 False set将flag设置为 True is_set判断是否设置了flag wait会一直监听flag,如果没有检测到flag就一直处于阻塞状态 事件处理的机制:全局定义了一个Flag,当Flag的值为False,那么event.wait()就会阻塞,当flag值为True, 那么event.wait()便不再阻塞 ''' event = threading.Event() def lighter(): count = 0 event.set() #初始者为绿灯 while True: if 5 < count <=10: event.clear() #红灯,清除标志位 print("\33[41;lmred light is on...\033[0m]") elif count > 10: event.set() #绿灯,设置标志位 count = 0 else: print('\33[42;lmgreen light is on...\033[0m') time.sleep(1) count += 1 def car(name): while True: if event.is_set(): #判断是否设置了标志位 print('[%s] running.....'%name) time.sleep(1) else: print('[%s] sees red light,waiting...'%name) event.wait() print('[%s] green light is on,start going...'%name) # startTime = time.time() light = threading.Thread(target=lighter,) light.start() car = threading.Thread(target=car,args=('MINT',)) car.start() endTime = time.time() # print('用时:',endTime-startTime)
GIL 全局解释器
在非python环境中,单核情况下,同时只能有一个任务执行。多核时可以支持多个线程同时执行。但是在python中,无论有多少个核同时只能执行一个线程。究其原因,这就是由于GIL的存在导致的。
GIL的全程是全局解释器,来源是python设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到GIL,我们可以把GIL看做是“通行证”,并且在一个python进程之中,GIL只有一个。拿不到线程的通行证,并且在一个python进程中,GIL只有一个,拿不到通行证的线程,就不允许进入CPU执行。GIL只在cpython中才有,因为cpython调用的是c语言的原生线程,所以他不能直接操作cpu,而只能利用GIL保证同一时间只能有一个线程拿到数据。而在pypy和jpython中是没有GIL的python在使用多线程的时候,调用的是c语言的原生过程。
python针对不同类型的代码执行效率也是不同的
1、CPU密集型代码(各种循环处理、计算等),在这种情况下,由于计算工作多,ticks技术很快就会达到阀值,然后触发GIL的释放与再竞争(多个线程来回切换当然是需要消耗资源的),所以python下的多线程对CPU密集型代码并不友好。
2、IO密集型代码(文件处理、网络爬虫等设计文件读写操作),多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序的执行效率)。所以python的多线程对IO密集型代码比较友好。
主要要看任务的类型,我们把任务分为I/O密集型和计算密集型,而多线程在切换中又分为I/O切换和时间切换。如果任务属于是I/O密集型,若不采用多线程,我们在进行I/O操作时,势必要等待前面一个I/O任务完成后面的I/O任务才能进行,在这个等待的过程中,CPU处于等待状态,这时如果采用多线程的话,刚好可以切换到进行另一个I/O任务。这样就刚好可以充分利用CPU避免CPU处于闲置状态,提高效率。但是如果多线程任务都是计算型,CPU会一直在进行工作,直到一定的时间后采取多线程时间切换的方式进行切换线程,此时CPU一直处于工作状态,此种情况下并不能提高性能,相反在切换多线程任务时,可能还会造成时间和资源的浪费,导致效能下降。这就是造成上面两种多线程结果不能的解释。
结论:
I/O密集型任务,建议采取多线程,还可以采用多进程+协程的方式(例如:爬虫多采用多线程处理爬取的数据);
对于计算密集型任务,python此时就不适用了。
四、异步编程(协程)
异步编程(Asynchronous Programming)是一种并发编程的形式,允许程序在等待某个耗时操作完成(如 I/O 操作)时,继续执行其他任务,而不是阻塞在当前操作上。这种方式可以大幅提高程序的执行效率,特别是在处理 I/O 密集型任务(如文件读写、网络请求、数据库操作)时。
异步编程可以通过多种方式实现,协程是其中一种常用的实现方式。在 Python 中,
asyncio
模块提供了一个完整的框架来实现异步编程,协程是这个框架的核心组件。此外,异步编程还可以通过回调函数、事件循环、生成器等方式实现。异步编程的核心概念
- 同步 vs 异步
- 同步编程:代码按顺序执行,一个任务未完成,后续任务必须等待。例如,传统的函数调用是同步的,函数未返回时,调用者会一直等待。
- 异步编程:代码可以在某些操作未完成时继续执行其他任务。等待中的操作完成时会通过回调或其他方式通知主程序。
- 并发 vs 并行
- 并发:程序在单个处理器上通过分时机制执行多个任务,任务间交替执行,但在某一时刻只有一个任务在运行。异步编程是实现并发的一种方式。
- 并行:程序在多个处理器或多核处理器上同时执行多个任务。
- 事件循环 (Event Loop)
- 事件循环是异步编程的核心机制,它负责管理并调度异步任务。事件循环会不断检查是否有准备好执行的任务,并将它们调度执行。确保在一个协程暂停时,其他协程可以利用 CPU 资源继续执行。
- 协程 (Coroutine)
- 协程是 Python 中异步编程的基础,它是一种比线程更加轻量级的并发方式。通过
async
关键字定义的函数就是协程。协程可以在等待 I/O 操作时挂起,允许其他协程继续执行。
Python 中的异步编程
Python 通过
asyncio
模块来支持异步编程。自 Python 3.5 起,async
和 await
关键字被引入,使得异步编程更加直观和易于使用。关键字和函数
async def
:定义一个异步函数(协程)。
await
:等待一个异步操作完成。只能在协程中使用。(暂停当前协程,释放控制权给事件循环,等待异步操作完成。)
asyncio.run()
:启动一个异步任务。
asyncio.create_task()
:并发运行多个协程。
asyncio.gather()
:将多个协程包装成一个,等待所有协程完成。
异步编程示例
示例 1:简单的异步函数
import asyncio async def say_hello(): print("Hello...") await asyncio.sleep(1) # 模拟一个异步 I/O 操作 print("...World!") asyncio.run(say_hello())
- 在这个例子中,
say_hello
是一个异步函数,await asyncio.sleep(1)
会让出控制权,使事件循环可以调度其他任务。
示例 2:并发执行多个异步任务
import asyncio async def task(name, duration): print(f"Task {name} started...") await asyncio.sleep(duration) print(f"Task {name} finished after {duration} seconds") async def main(): task1 = asyncio.create_task(task("A", 2)) task2 = asyncio.create_task(task("B", 1)) await task1 await task2 asyncio.run(main())
- 这里的
main()
函数通过asyncio.create_task()
创建了两个异步任务,它们会并发执行,最终的输出顺序不会因为任务的启动顺序而改变,而是由任务的持续时间决定。
示例 3:等待多个任务的完成
import asyncio async def task(name, duration): print(f"Task {name} started...") await asyncio.sleep(duration) print(f"Task {name} finished after {duration} seconds") return name, duration async def main(): results = await asyncio.gather( task("A", 2), task("B", 1), task("C", 3) ) print("All tasks finished:", results) asyncio.run(main())
asyncio.gather()
同时运行多个任务,并等待它们全部完成后再返回结果。这个示例会并发执行任务A
、B
和C
,并在所有任务完成后输出结果。
示例4:执行顺序
import asyncio async def task1(): print("Task 1 started") await asyncio.sleep(2) # 假设这个操作需要 2 秒 print("Task 1 finished") async def task2(): print("Task 2 started") await asyncio.sleep(1) # 假设这个操作需要 1 秒 print("Task 2 finished") async def main(): await asyncio.gather(task1(), task2()) asyncio.run(main())
- 当
task1()
遇到await asyncio.sleep(2)
时,任务会暂停,事件循环会切换去执行task2()
。
task2()
遇到await asyncio.sleep(1)
时也会暂停,事件循环又会检查是否有其他任务可以运行。
- 由于所有任务都在等待,这时事件循环会空闲等待,直到第一个
await
操作完成。
- 1 秒后,
task2
的await
操作完成,事件循环会恢复执行task2
的后续代码,打印Task 2 finished
。
- 再过 1 秒后,
task1
的await
操作完成,事件循环会恢复执行task1
的后续代码,打印Task 1 finished
。
异步编程的优点和注意事项
优点
- 高效的 I/O 操作:在网络请求、文件读写等 I/O 操作中,异步编程可以显著提高效率。
- 减少阻塞:避免程序在等待 I/O 操作时阻塞,从而更好地利用 CPU 资源。
- 更好的响应性:在 GUI 编程或 Web 服务中,异步编程可以使程序对用户操作更加响应及时。
注意事项
- 学习曲线:异步编程可能比同步编程更难理解,特别是在处理复杂的逻辑时。
- 调试困难:异步代码的调试和错误处理相对复杂,需要更多的注意力。
- 非 I/O 密集型任务:异步编程主要在 I/O 密集型任务中表现出色,而对于 CPU 密集型任务,可能并没有显著优势。
总结
异步编程是一种强大的工具,可以显著提高程序在处理 I/O 密集型任务时的性能。通过理解事件循环、协程和
asyncio
库,你可以编写出更高效、响应更快的 Python 程序。随着 Python 对异步编程支持的逐步增强,掌握这些概念和技术对现代 Python 开发者来说至关重要。五、进程、线程、协程对比
对比项 | 进程 (Process) | 线程 (Thread) | 协程 (Coroutine) |
定义 | 独立的执行单元,有自己的内存空间和资源 | 进程内的执行单元,线程间共享内存和资源 | 程序内轻量级的协作任务,通过让出控制权实现并发。协程的并发运行实际上是通过迅速切换任务来实现的 |
构建方式 | multiprocessing 模块 | threading 模块 | asyncio 模块和 async /await 关键字 |
优点 | 独立性强,能充分利用多核 CPU | 轻量级,共享内存,切换速度快 | 最轻量级,创建和切换开销小,适合大量并发任务 |
缺点 | 创建和销毁开销大,进程间通信复杂 | 受 GIL 限制,CPU 密集型任务中无法充分利用多核 CPU | 需要显式控制让出点,调试和错误处理较复杂 |
适用场景 | CPU 密集型任务,如数据处理、科学计算 | I/O 密集型任务,如网络请求、文件读写 | I/O 密集型任务,如网络服务器、高并发应用 |
CPU 使用情况 | 能够充分利用多核 CPU,真正的并行 | 受 GIL 限制,主要在 I/O 操作时有效 | 所有协程都在同一个线程中执行,因此是共享同一个 CPU 核心的资源。 |
内存共享 | 否,每个进程有独立的内存空间 | 是,线程间共享同一进程的内存 | 是,协程共享同一线程的内存 |
调度方式 | 由操作系统调度 | 由操作系统调度 | 由程序自身调度,非抢占式 |
构建代码示例 | multiprocessing 示例见下 | threading 示例见下 | asyncio 示例见下 |
multiprocessing
示例
import multiprocessing import os def worker(num): print(f"Worker {num} is running on process {os.getpid()}") if __name__ == "__main__": processes = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) processes.append(p) p.start() for p in processes: p.join()
threading
示例
import threading def worker(num): print(f"Worker {num} is running on thread {threading.current_thread().name}") threads = [] for i in range(5): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
asyncio
示例
import asyncio async def worker(num): print(f"Worker {num} is running") await asyncio.sleep(1) print(f"Worker {num} finished") async def main(): tasks = [worker(i) for i in range(5)] await asyncio.gather(*tasks) asyncio.run(main())
使用过程中遇到的问题
1、python版本限制进程池处理的数据量
报错代码:'i' format requires -2147483648 <= number <= 2147483647
问题原因:多进程时候,进程间数据交换是通过pickling,因为处理的文本都比较大,当数据pickled时候超过了i struct的限制,-2147483648 <= number <= 2147483647;Python3.8中在非windows平台修复了这个问题,可以支持最大4EB的数据。
解决办法:
- 将每个进程结果写入文件,最后汇总处理,这样避免进程间大量数据传递
- 代码中检查Python版本,低于3.8时候不使用多进程。
- 使用dask dataframe等Python并行处理库
具体思路:主要路线是上面👆的第一条
尝试:
1、换成process方式实现(缺点是进程数会跟着省份数增加,不固定)
2、由于使用pool方式的循环创建并不能缩短时间,所以还是使用pool直接创建,但是可以把process中的数据接收参数使用进来
2、进程池中进程的数量增加,而运行效率并没有提升
进程还需要处理I/O请求,在一定数据量内,进程数量和处理速度会有一个平衡点。
3、OSError: [Errno 28] No space left on device
严格来讲这并不是一个多进程引发的报错。
背景:
Linux 中的tmpfs是一种基于内存的文件系统,它使用系统的RAM或交换空间来存储文件和目录。tmpfs的特点是速度快,因为数据存储在内存中,而不是硬盘上。这使得对文件的读写操作比传统的基于磁盘的文件系统要快得多。当系统重启或关闭时,存储在tmpfs上的数据会丢失,因为内存的内容在断电后不会保留。
/dev/shm
是tmpfs文件系统的一个常见挂载点,全称为"shared memory",即共享内存。它允许不同的进程共享内存中的数据,从而提高程序之间通信的效率。在很多Linux发行版中,/dev/shm
默认就是以tmpfs的形式挂载的,这意味着/dev/shm
实际上就是一个内存中的文件系统,用于存储临时文件,这些文件可以被系统上运行的任何进程访问。一般共享内存对应的大小时 64M(不知道是不是因为有Docker的原因)原因:
一般来说,如果一个程序进程正常结束,那么这个程序在共享内存中对应的临时文件就会自动删除;但是当程训报错或者非正常结束,则临时文件就会堆积,而一般服务器也不会频繁关机重启,从而就导致
/dev/shm
对应的空间越来越小。当共享内存被占满时再运行程序就会导致报错:OSError: [Errno 28] No space left on device