并发编程 并发编程,程序提速方法:
单线程串行
不加改造的程序
CPU-IO-CPU-IO… IO期间CPU是等待状态的
多线程并发
threading
IO的同时让CPU进行其他任务,IO执行完CPU执行下一步任务
多CPU并行
multiprocessing
每个CPU都可以 CPU-IO…
多机器并行
多线程:threading, 利用CPU和IO可以同时执行的原理,让CPU在IO的同时执行其他任务
多进程:multiprocessing, 利用多核CPU的能力,真正的并行执行任务
异步IO: asyncio, 在单线程利用CPU和IO同时执行的原理,实现函数异步执行
使用Lock对资源加锁,防止冲突访问
使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式
使用线程池Pool/进程池Pool,简化线程/进程的任务提交,等待结束、获取结果
使用subprocess启动外部程序的京城,并进行输入输出交互
CPU密集型计算、IO密集型计算
CPU密集型(CPU-bound)
也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点CPU占用率很高;
例如:压缩解压缩、加密解密、正则表达式搜索
IO密集型(I/O bound)
IO密集型指的是系统运作大部分的状况是CPU在等待I/O(硬盘/内存)的读写操作,CPU占用率仍然较低。
例如:文件处理程序(文件读取),网络爬虫程序(大量的下载)、多谢数据库程序
多线程、多进程、多协程的对比
多进程Process(multiprocessing)
优点:可以利用多核CPU并行运算
缺点:占用资源最多、可启动数目比线程少
适用于:CPU密集型计算
多线程Thread(threadin)
优点:相比进程,更轻量级,占用资源少
缺点:(Python多线程只能同时用一个CPU)
相比进程:多线程只能并发执行,不能利用多CPU(GIL);
相比协程:启动数目有限制,占用内存资源,有线程切换开销
适用于:IO密集型计算、同时运行的任务数目要求不多
多协程Coroutine(asyncio)
优点:内存开销最少、启动协程数量最多
缺点:支持的库有限制(aiohttp vs requests)、代码实现复杂
适用于: IO密集型计算、需要超多任务运行、但有现成库支持的场景
一个进程 中可以启动N个线程 ,一个线程中可以启动N个协程
根据任务选择对应技术 1 2 3 4 5 6 7 if CPU密集型: 使用多进程Multiprocessing elif IP密集型: if 任务多 and 有协程库支持 and 协程实现复杂度不高: 使用多协程asyncio else : 使用多线程threading
Python被吐槽慢,头号嫌疑犯,全局解释器锁GIL 相比C/C++/JAVA,Python确实慢,在一些特殊场景下,Python比C++慢100~200倍
由于速度慢的原因,很多公司的基础架构代码依然用C/C++开发 比如各大公司阿里/腾讯/快手的推荐引擎、搜索引擎、存储引擎等底层对性能要求高的模块
Python速度慢两大原因:
动态类型语言,边解释边执行,一个变量可以做数字、字符串或者列表,所以执行过程中都要检查变量类型,导致很慢
GIL无法利用多核CPU并发执行
GIL 全局解释器锁(Global Interpreter Lock, GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行,即便在多核心处理上,使用GIL的解释器也只允许同一时间执行一个线程。
GIL的存在,即使电脑有多核CPU,单个时刻也只能使用1个,相比并发加速的C/C++慢
为什么有GIL这个东西 python设计初期,为了规避并发问题引入了GIL,现在想去除去不掉了
为了解决多线程之间数据完整性和状态同步问题
python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象
开始:线程A和线程B都引用了对象obj, obj.ref_num=2, 线程A和B都想撤销对obj的引用
GIL确实有好处:简化了Python对共享资源的管理
如何规避GIL带来的限制
多线程threading机制依然是有用的,用于IO密集型计算
因为I/O(read,write,send,recv,etc.)期间,线程会释放GIL,实现CPU和IO的并行,因此多线程用于IO密集型计算依然可以大幅度提升速度
但多线程用于CPU密集型计算时,只会更加拖慢速度
使用multiprocessing 的多进程机制实现并行计算、利用多核CPU优势
为了应对GIL,python提供了multiprocessing
python多线程加速爬虫程序 python创建多线程的方法 1 2 3 4 5 6 7 8 9 10 def my_func (a,b ): do_craw(a,b) import threadingt = threading.Thread(target=my_func, args=(100 ,200 )) t.start() t.join()
multi-threads example 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 import requestsimport threadingimport timeurls = [f"https://www.cnblogs.com/#p{page} " for page in range (1 ,50 +1 )] def craw (url ): r = requests.get(url) print (url,len (r.text)) craw(urls[0 ]) def single_thread (): print ("single thread begin" ) for url in urls: craw(url) print ("single thread end" ) def multi_thread (): print ("multi-thread begin" ) threads = [] for url in urls: threads.append( threading.Thread(target=craw, args=(url,)) ) for thread in threads: thread.start() for thread in threads: thread.join() print ("multi-thread end" ) if __name__ == '__main__' : start = time.time() single_thread() end = time.time() print ("single thread cost: " , end-start, "seconds" ) start = time.time() multi_thread() end = time.time() print ("multi-thread cost: " , end - start, "seconds" )
多组件的Pipeline技术架构 复杂的事情一般不会一下子做完,而是会分很多中间步骤一步一步完成
多线程数据通信的quene.Queue queue.Queue可以用于多线程之间的线程安全的数据通信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import queueq = queue.Queue() q.put(item) item.get() q.qsize() q.empty() q.full()
代码编写实现生产者消费者爬虫 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import requestsimport threadingimport timefrom bs4 import BeautifulSoupurls = [f"https://www.cnblogs.com/#p{page} " for page in range (1 ,50 +1 )] def craw (url ): r = requests.get(url) return r.text def txtparse (html ): soup = BeautifulSoup(html, 'html.parser' ) links = soup.find_all("a" , class_ = "post-item-title" ) return [(link["href" ], link.get_text()) for link in links]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import requestsimport queueimport timeimport randomfrom bs4 import BeautifulSoupimport threadingurls = [f"https://www.cnblogs.com/#p{page} " for page in range (1 ,50 +1 )] def craw (url ): r = requests.get(url) return r.text def txtparse (html ): soup = BeautifulSoup(html, 'html.parser' ) links = soup.find_all("a" , class_ = "post-item-title" ) return [(link["href" ], link.get_text()) for link in links] def do_craw (url_queue: queue.Queue, html_queue: queue.Queue ): while True : url = url_queue.get() html = craw(url) html_queue.put(html) print (threading.current_thread().name, f"craw {url} " , "url_queue.size=" , url_queue.qsize()) time.sleep(random.randint(1 ,2 )) def do_parse (html_queue: queue.Queue, fout ): while True : html = html_queue.get() results = txtparse(html) for result in results: fout.write(str (result) +'\n' ) print (threading.current_thread().name, f"results.size" , len (results), "html_queue.size=" , html_queue.qsize()) time.sleep(random.randint(1 , 2 )) if __name__ == '__main__' : url_queue = queue.Queue() html_queue= queue.Queue() for url in urls: url_queue.put(url) for idx in range (3 ): t = threading.Thread(target=do_craw, args=(url_queue,html_queue), name=f"craw{idx} " ) t.start() fout = open ("02.data.txt" ,'w' ) for idx in range (2 ): t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx} " ) t.start()
线程安全 介绍 线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。
由于现成的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全。
尤其是在远程调用或sleep的时候会经常出现
例如:
1 2 3 def draw (account, amount ): if account.balance >= amount: account.balance -=amount
如果是多线程,同时进行两笔取钱操作,线程切换过程中,系统依次先执行了两次判断语句,导致接下来都完成了取钱操作,就出现了问题。
互斥访问;判断和执行语句捆绑为原子性;mysql幻读
Lock用于解决线程安全问题
锁就是让任务排队;锁让判断和执行语句捆绑上锁同时执行;锁是数据共享的地方;
方法一: try-finally 模式
1 2 3 4 5 6 7 import threadinglock = threading.Lock() lock.acquire() try : finally : lock.release()
方法二: with模式
1 2 3 4 import threadinglock = threading.Lock() with lock:
具体例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import threadingimport timelock = threading.Lock() class Account : def __init__ (self, balance ): self.balance = balance def draw (account, amount ): with lock: if account.balance >= amount: time.sleep(0.1 ) print (threading.current_thread().name, "取钱成功\n" ) account.balance -= amount print (threading.current_thread().name, "余额" , account.balance,"\n" ) else : print (threading.current_thread().name, "取钱失败,余额不足" ,"\n" ) if __name__ == '__main__' : account = Account(1000 ) ta = threading.Thread(name="ta" , target=draw, args=(account, 800 )) tb = threading.Thread(name="tb" , target=draw, args=(account, 800 )) ta.start() tb.start()
线程池 原理 新建线程系统需要分配资源、终止线程系统需要回收资源,如果可以重用线程,则可以减去新建/终止的开销。
使用线程池的好处
提升性能:因为减去大量新建、终止线程的开销,重用了线程资源
使用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际处理时间较短
防御功能:能有效避免系统因创建线程过多,而导致系统负荷过大相应变慢等问题
代码优势:使用线程池的语法比自己新建线程执行更加简洁
使用线程池ThreadPoolExecutor改造爬虫程序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 from concurrent.futures import ThreadPoolExecutor, as_completedimport requestsimport queueimport timeimport randomfrom bs4 import BeautifulSoupimport threadingurls = [f"https://www.cnblogs.com/#p{page} " for page in range (1 ,50 +1 )] def craw (url ): r = requests.get(url) return r.text def txtparse (html ): soup = BeautifulSoup(html, 'html.parser' ) links = soup.find_all("a" , class_ = "post-item-title" ) return [(link["href" ], link.get_text()) for link in links] with ThreadPoolExecutor() as pool: results = pool.map (craw, urls) results = list (zip (urls, results)) for result in results: print (result) with ThreadPoolExecutor() as pool: futures = {} for url, html in results: future = pool.submit(txtparse, html) futures[future] = url for future in as_completed(futures): url = futures[future] print (url, future.result())
note: 用法一:map函数,结果和入参顺序对应;用法二:future模式更强大,as_completed顺序不定。
在web服务中,使用线程池加速 web服务的架构以及特定
web后台服务的特点
web 服务对响应时间要求非常高,比如200MS返回
web 服务有大量的IO操作的调用,比如磁盘文件、数据库、远程API
web 服务经常需要处理几万人、几百万人的同时请求
使用线程池ThreadPoolExecutor加速 使用ThreadPoolExecutor的好处
方便的将磁盘文件、数据库、远程API的IO调用并发执行
线程池的线程数目不会无限创建(导致系统挂掉),具有防御功能
代码用Flask实现web服务并实现加速 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import flaskimport jsonimport timeapp = flask.Flask(__name__) def read_db (): time.sleep(0.1 ) return "db result" def read_file (): time.sleep(0.2 ) return "file result" def read_api (): time.sleep(0.3 ) return "api result" @app.route("/" ) def index (): result_file = read_file() result_db =read_db() result_api = read_api() return json.dumps({ "result_file" :result_file, "result_db" :result_db, "result_api" :result_api }) if __name__ == '__main__' : app.run()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import flaskimport jsonimport timeapp = flask.Flask(__name__) def read_db (): time.sleep(0.1 ) return "db result" def read_file (): time.sleep(0.2 ) return "file result" def read_api (): time.sleep(0.3 ) return "api result" @app.route("/" ) def index (): result_file = read_file() result_db =read_db() result_api = read_api() return json.dumps({ "result_file" :result_file, "result_db" :result_db, "result_api" :result_api }) if __name__ == '__main__' : app.run()
多进程multiprocessing 加速程序的运行 有了多线程threading,为什么还要使用多进程multiprocessing? 如果遇到了CPU密集型计算,多线程反而会降低执行速度!
multiprocessing模块就是python为解决GIL缺陷引入的一个模块,原理是用多进程在多CPU并行执行
多进程mutliprocessing知识梳理(语法上二者十分相似)
代码对比单线程、多线程、多进程在CPU密集计算速度 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import mathimport timefrom concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutorPRIMES = [112272535095293 ]*100 def is_prime (n ): if n<2 : return False if n ==2 : return True if n %2 ==0 : return False sqrt_n = int (math.floor(math.sqrt(n))) for i in range (3 , sqrt_n+1 ,2 ): if n % i ==0 : return False return True def single_thread (): for num in PRIMES: is_prime(num) def multi_thread (): with ThreadPoolExecutor() as pool: pool.map (is_prime, PRIMES) def multi_process (): with ProcessPoolExecutor() as pool: pool.map (is_prime, PRIMES) if __name__ == '__main__' : start = time.time() single_thread() end = time.time() print ("single thread cost:" , end-start, "seconds" ) start = time.time() multi_thread() end = time.time() print ("multi thread cost:" , end - start, "seconds" ) start = time.time() multi_process() end = time.time() print ("multi process cost:" , end-start, "seconds" )
Flask服务中使用进程池加速 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import jsonimport flaskfrom concurrent.futures import ProcessPoolExecutorimport mathapp = flask.Flask(__name__) def is_prime (n ): if n<2 : return False if n ==2 : return True if n %2 ==0 : return False sqrt_n = int (math.floor(math.sqrt(n))) for i in range (3 , sqrt_n+1 ,2 ): if n % i ==0 : return False return True @app.route("/is_prime/<numbers>" ) def api_is_prime (numbers ): numbers_list = [int (x) for x in numbers.split("," )] results = process_pool.map (is_prime, numbers_list) return json.dumps(dict (zip (numbers_list, results))) if __name__ == '__main__' : process_pool = ProcessPoolExecutor() app.run()
python异步IO实现并发爬虫 单线程爬虫的执行路径
协程:在单线程内实现并发
Python异步IO库介绍asyncio 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import asyncioimport aiohttpimport timeurls = [f"https://www.cnblogs.com/#p{page} " for page in range (1 ,50 +1 )] loop = asyncio.get_event_loop() async def async_craw (url ): print ("craw url: " , url) async with aiohttp.ClientSession() as session: async with session.get(url) as resp: result = await resp.text() print (f"craw url: {url} , {len (result)} " ) tasks = [ loop.create_task(async_craw(url)) for url in urls ] start = time.time() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print ("use time seconds: " , end-start)
async 表示这是一个协程,await表示这是一个IO
note: 要用在异步IO编程中,依赖的库必须支持异步IO特性;爬虫引用中,requests不支持异步,需要用aiohttp
在异步IO中使用信号量控制爬虫并发度 信号量(Semaphore),又称为心好累、旗语,是一个同步对象,用于保持在0至指定最大值之间的一个技术之。
当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;
当线程完成一次对semaphore对象的释放(release)时,计数值加一;
当计数值为0,则线程等待该semaphore对象不再能成功直至该semaphore对象变成signaled状态
semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 sem = asyncio.Semaphore(10 ) asyncio with sem: sem = asyncio.Semaphore(10 ) await sem.acquire() try : finally : sem.release()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import asyncioimport aiohttpimport timeurls = [f"https://www.cnblogs.com/#p{page} " for page in range (1 ,50 +1 )] semaphore = asyncio.Semaphore(10 ) loop = asyncio.get_event_loop() async def async_craw (url ): async with semaphore: print ("craw url: " , url) async with aiohttp.ClientSession() as session: async with session.get(url) as resp: result = await resp.text() await asyncio.sleep(5 ) print (f"craw url: {url} , {len (result)} " ) tasks = [ loop.create_task(async_craw(url)) for url in urls ] start = time.time() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print ("use time seconds: " , end-start)