python 并发编程

并发编程

并发编程,程序提速方法:

  • 单线程串行

    • 不加改造的程序
    • CPU-IO-CPU-IO… IO期间CPU是等待状态的
  • 多线程并发

    • threading
    • IO的同时让CPU进行其他任务,IO执行完CPU执行下一步任务
  • 多CPU并行

    • multiprocessing
    • 每个CPU都可以 CPU-IO…
  • 多机器并行

  • 多线程:threading, 利用CPU和IO可以同时执行的原理,让CPU在IO的同时执行其他任务

  • 多进程:multiprocessing, 利用多核CPU的能力,真正的并行执行任务

  • 异步IO: asyncio, 在单线程利用CPU和IO同时执行的原理,实现函数异步执行

  • 使用Lock对资源加锁,防止冲突访问

  • 使用Queue实现不同线程/进程之间的数据通信,实现生产者-消费者模式

    • 改造爬虫,边爬虫-边解析
  • 使用线程池Pool/进程池Pool,简化线程/进程的任务提交,等待结束、获取结果

  • 使用subprocess启动外部程序的京城,并进行输入输出交互

CPU密集型计算、IO密集型计算

  • CPU密集型(CPU-bound)
    • 也叫计算密集型,是指I/O在很短的时间就可以完成,CPU需要大量的计算和处理,特点CPU占用率很高;
    • 例如:压缩解压缩、加密解密、正则表达式搜索
  • IO密集型(I/O bound)
    • IO密集型指的是系统运作大部分的状况是CPU在等待I/O(硬盘/内存)的读写操作,CPU占用率仍然较低。
    • 例如:文件处理程序(文件读取),网络爬虫程序(大量的下载)、多谢数据库程序

多线程、多进程、多协程的对比

  • 多进程Process(multiprocessing)
    • 优点:可以利用多核CPU并行运算
    • 缺点:占用资源最多、可启动数目比线程少
    • 适用于:CPU密集型计算
  • 多线程Thread(threadin)
    • 优点:相比进程,更轻量级,占用资源少
    • 缺点:(Python多线程只能同时用一个CPU)
      • 相比进程:多线程只能并发执行,不能利用多CPU(GIL);
      • 相比协程:启动数目有限制,占用内存资源,有线程切换开销
    • 适用于:IO密集型计算、同时运行的任务数目要求不多
  • 多协程Coroutine(asyncio)
    • 优点:内存开销最少、启动协程数量最多
    • 缺点:支持的库有限制(aiohttp vs requests)、代码实现复杂
    • 适用于: IO密集型计算、需要超多任务运行、但有现成库支持的场景

一个进程中可以启动N个线程,一个线程中可以启动N个协程

根据任务选择对应技术

1
2
3
4
5
6
7
if CPU密集型:
使用多进程Multiprocessing
elif IP密集型:
if 任务多 and 有协程库支持 and 协程实现复杂度不高:
使用多协程asyncio
else:
使用多线程threading

Python被吐槽慢,头号嫌疑犯,全局解释器锁GIL

相比C/C++/JAVA,Python确实慢,在一些特殊场景下,Python比C++慢100~200倍

由于速度慢的原因,很多公司的基础架构代码依然用C/C++开发
比如各大公司阿里/腾讯/快手的推荐引擎、搜索引擎、存储引擎等底层对性能要求高的模块

Python速度慢两大原因:

  • 动态类型语言,边解释边执行,一个变量可以做数字、字符串或者列表,所以执行过程中都要检查变量类型,导致很慢
  • GIL无法利用多核CPU并发执行

GIL

全局解释器锁(Global Interpreter Lock, GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行,即便在多核心处理上,使用GIL的解释器也只允许同一时间执行一个线程。

http://www.dabeaz.com/python/UnderstandingGIL.pdf

GIL的存在,即使电脑有多核CPU,单个时刻也只能使用1个,相比并发加速的C/C++慢

为什么有GIL这个东西

python设计初期,为了规避并发问题引入了GIL,现在想去除去不掉了

为了解决多线程之间数据完整性和状态同步问题

python中对象的管理,是使用引用计数器进行的,引用数为0则释放对象

开始:线程A和线程B都引用了对象obj, obj.ref_num=2, 线程A和B都想撤销对obj的引用

whyGIL

GIL确实有好处:简化了Python对共享资源的管理

如何规避GIL带来的限制

  1. 多线程threading机制依然是有用的,用于IO密集型计算
    • 因为I/O(read,write,send,recv,etc.)期间,线程会释放GIL,实现CPU和IO的并行,因此多线程用于IO密集型计算依然可以大幅度提升速度
    • 但多线程用于CPU密集型计算时,只会更加拖慢速度
  2. 使用multiprocessing 的多进程机制实现并行计算、利用多核CPU优势
    • 为了应对GIL,python提供了multiprocessing

python多线程加速爬虫程序

python创建多线程的方法

1
2
3
4
5
6
7
8
9
10
# 准备一个函数
def my_func(a,b):
do_craw(a,b)
# 怎样创建一个线程
import threading
t = threading.Thread(target=my_func, args=(100,200))
# 启动线程
t.start()
# 等待结束
t.join()

multi-threads example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import requests
import threading
import time
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1,50+1)]
# urls = ["https://www.cnblogs.com/#p{}".format(page) for page in range(1,50+1)]

def craw(url):
r = requests.get(url)
print(url,len(r.text))

craw(urls[0])

## multi thread

def single_thread():
print("single thread begin")
for url in urls:
craw(url)
print("single thread end")

def multi_thread():
print("multi-thread begin")
threads = []
for url in urls:
threads.append(
threading.Thread(target=craw, args=(url,))
)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("multi-thread end")
if __name__ == '__main__':
start = time.time()
single_thread()
end = time.time()
print("single thread cost: ", end-start, "seconds")

start = time.time()
multi_thread()
end = time.time()
print("multi-thread cost: ", end - start, "seconds")

多组件的Pipeline技术架构

复杂的事情一般不会一下子做完,而是会分很多中间步骤一步一步完成

pipeline

多线程数据通信的quene.Queue

queue.Queue可以用于多线程之间的线程安全的数据通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 导入类库
import queue
# 创建Queue
q = queue.Queue()
# 添加元素
q.put(item)
# 获取元素
item.get()
# 查看元素的多少
q.qsize()
# 判断是否为空
q.empty()
# 判断是否已满
q.full()

代码编写实现生产者消费者爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import requests
import threading
import time
from bs4 import BeautifulSoup
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1,50+1)]
# urls = ["https://www.cnblogs.com/#p{}".format(page) for page in range(1,50+1)]

def craw(url):
r = requests.get(url)
return r.text
def txtparse(html):
soup = BeautifulSoup(html, 'html.parser')
links = soup.find_all("a", class_ = "post-item-title")
return [(link["href"], link.get_text()) for link in links]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import requests
import queue
import time
import random
from bs4 import BeautifulSoup
import threading
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1,50+1)]
# urls = ["https://www.cnblogs.com/#p{}".format(page) for page in range(1,50+1)]

def craw(url):
r = requests.get(url)
return r.text

def txtparse(html):
soup = BeautifulSoup(html, 'html.parser')
links = soup.find_all("a", class_ = "post-item-title")
return [(link["href"], link.get_text()) for link in links]
def do_craw(url_queue: queue.Queue, html_queue: queue.Queue):
while True:
url = url_queue.get()
html = craw(url)
html_queue.put(html)
print(threading.current_thread().name, f"craw {url}", "url_queue.size=", url_queue.qsize())
time.sleep(random.randint(1,2))
def do_parse(html_queue: queue.Queue, fout):
while True:
html = html_queue.get()
results = txtparse(html)
for result in results:
fout.write(str(result) +'\n')
print(threading.current_thread().name, f"results.size", len(results), "html_queue.size=" , html_queue.qsize())
time.sleep(random.randint(1, 2))
if __name__ == '__main__':
url_queue = queue.Queue()
html_queue= queue.Queue()
for url in urls:
url_queue.put(url)
for idx in range(3):
t = threading.Thread(target=do_craw, args=(url_queue,html_queue), name=f"craw{idx}")
t.start()
fout = open("02.data.txt",'w')
for idx in range(2):
t = threading.Thread(target=do_parse, args=(html_queue, fout), name=f"parse{idx}")
t.start()

线程安全

介绍

线程安全指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的共享变量,使程序功能正确完成。

由于现成的执行随时会发生切换,就造成了不可预料的结果,出现线程不安全。

尤其是在远程调用或sleep的时候会经常出现

例如:

1
2
3
def draw(account, amount):
if account.balance >= amount:
account.balance -=amount

如果是多线程,同时进行两笔取钱操作,线程切换过程中,系统依次先执行了两次判断语句,导致接下来都完成了取钱操作,就出现了问题。

互斥访问;判断和执行语句捆绑为原子性;mysql幻读

Lock用于解决线程安全问题

锁就是让任务排队;锁让判断和执行语句捆绑上锁同时执行;锁是数据共享的地方;

方法一: try-finally 模式

1
2
3
4
5
6
7
import threading
lock = threading.Lock()
lock.acquire()
try:
# do something
finally:
lock.release()

方法二: with模式

1
2
3
4
import threading
lock = threading.Lock()
with lock:
# do something

具体例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading
import time

lock = threading.Lock()
class Account:
def __init__(self, balance):
self.balance = balance

def draw(account, amount):
with lock:
if account.balance >= amount:
time.sleep(0.1)
print(threading.current_thread().name, "取钱成功\n")
account.balance -= amount
print(threading.current_thread().name, "余额", account.balance,"\n")
else:
print(threading.current_thread().name, "取钱失败,余额不足","\n")

if __name__ == '__main__':
account = Account(1000)
ta = threading.Thread(name="ta", target=draw, args=(account, 800))
tb = threading.Thread(name="tb", target=draw, args=(account, 800))
ta.start()
tb.start()

线程池

原理

新建线程系统需要分配资源、终止线程系统需要回收资源,如果可以重用线程,则可以减去新建/终止的开销。

threadperiod

使用线程池的好处

  1. 提升性能:因为减去大量新建、终止线程的开销,重用了线程资源
  2. 使用场景:适合处理突发性大量请求或需要大量线程完成任务、但实际处理时间较短
  3. 防御功能:能有效避免系统因创建线程过多,而导致系统负荷过大相应变慢等问题
  4. 代码优势:使用线程池的语法比自己新建线程执行更加简洁

使用线程池ThreadPoolExecutor改造爬虫程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import queue
import time
import random
from bs4 import BeautifulSoup
import threading

urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1,50+1)]

def craw(url):
r = requests.get(url)
return r.text

def txtparse(html):
soup = BeautifulSoup(html, 'html.parser')
links = soup.find_all("a", class_ = "post-item-title")
return [(link["href"], link.get_text()) for link in links]

## 用法一
with ThreadPoolExecutor() as pool:
results = pool.map(craw, urls)
results = list(zip(urls, results))
for result in results:
print(result)
## 用法二
with ThreadPoolExecutor() as pool:
futures = {}
for url, html in results:
future = pool.submit(txtparse, html)
futures[future] = url
## 方法一:顺序执行
# for future, url in futures.items():
# print(url, future.result())
## 方法二:这个好,先结束先返回
for future in as_completed(futures):
url = futures[future]
print(url, future.result())

note: 用法一:map函数,结果和入参顺序对应;用法二:future模式更强大,as_completed顺序不定。

在web服务中,使用线程池加速

web服务的架构以及特定

webframe

web后台服务的特点

  1. web 服务对响应时间要求非常高,比如200MS返回
  2. web 服务有大量的IO操作的调用,比如磁盘文件、数据库、远程API
  3. web 服务经常需要处理几万人、几百万人的同时请求

使用线程池ThreadPoolExecutor加速

使用ThreadPoolExecutor的好处

  1. 方便的将磁盘文件、数据库、远程API的IO调用并发执行
  2. 线程池的线程数目不会无限创建(导致系统挂掉),具有防御功能

代码用Flask实现web服务并实现加速

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import  flask
import json
import time
app = flask.Flask(__name__)
def read_db():
time.sleep(0.1)
return "db result"
def read_file():
time.sleep(0.2)
return "file result"
def read_api():
time.sleep(0.3)
return "api result"
@app.route("/")
def index():
result_file = read_file()
result_db =read_db()
result_api = read_api()
return json.dumps({
"result_file":result_file,
"result_db":result_db,
"result_api":result_api
})
if __name__ == '__main__':
app.run()

responetime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import  flask
import json
import time
app = flask.Flask(__name__)
def read_db():
time.sleep(0.1)
return "db result"
def read_file():
time.sleep(0.2)
return "file result"
def read_api():
time.sleep(0.3)
return "api result"
@app.route("/")
def index():
result_file = read_file()
result_db =read_db()
result_api = read_api()
return json.dumps({
"result_file":result_file,
"result_db":result_db,
"result_api":result_api
})
if __name__ == '__main__':
app.run()

responetime1

多进程multiprocessing 加速程序的运行

有了多线程threading,为什么还要使用多进程multiprocessing?
如果遇到了CPU密集型计算,多线程反而会降低执行速度!

multiprocessing模块就是python为解决GIL缺陷引入的一个模块,原理是用多进程在多CPU并行执行
whymultiprocess

多进程mutliprocessing知识梳理(语法上二者十分相似)

threadingandmultiprocessing

代码对比单线程、多线程、多进程在CPU密集计算速度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import math
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
PRIMES = [112272535095293]*100 # 这个数字很关键,不然很快就判断结束了

def is_prime(n): # 判单是否是质数
if n<2:
return False
if n ==2:
return True
if n %2 ==0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n+1,2):
if n % i ==0:
return False
return True

def single_thread():
for num in PRIMES:
is_prime(num)
def multi_thread():
with ThreadPoolExecutor() as pool:
pool.map(is_prime, PRIMES)
def multi_process():
with ProcessPoolExecutor() as pool:
pool.map(is_prime, PRIMES)

if __name__ == '__main__':
start = time.time()
single_thread()
end = time.time()
print("single thread cost:", end-start, "seconds")
start = time.time()
multi_thread()
end = time.time()
print("multi thread cost:", end - start, "seconds")
start = time.time()
multi_process()
end = time.time()
print("multi process cost:", end-start, "seconds")
### print结果
# single thread cost: 83.22900080680847 seconds
# multi thread cost: 64.15043830871582 seconds
# multi process cost: 24.825831413269043 seconds

Flask服务中使用进程池加速

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import json

import flask
from concurrent.futures import ProcessPoolExecutor
import math

app = flask.Flask(__name__)
def is_prime(n): # 判单是否是质数
if n<2:
return False
if n ==2:
return True
if n %2 ==0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n+1,2):
if n % i ==0:
return False
return True
@app.route("/is_prime/<numbers>")
def api_is_prime(numbers):
numbers_list = [int(x) for x in numbers.split(",")]
results = process_pool.map(is_prime, numbers_list)
return json.dumps(dict(zip(numbers_list, results)))
if __name__ == '__main__':
process_pool = ProcessPoolExecutor()
app.run()

python异步IO实现并发爬虫

单线程爬虫的执行路径

single_thread_spider

协程:在单线程内实现并发

asyncio_spider

Python异步IO库介绍asyncio

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import aiohttp
import time
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1,50+1)]
# 获取事件循环
loop = asyncio.get_event_loop()
# 定义协程
async def async_craw(url):
print("craw url: ", url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
print(f"craw url: {url}, {len(result)}")
# 创建task列表
tasks = [
loop.create_task(async_craw(url)) for url in urls
]
# 执行爬虫事件列表
start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds: ", end-start)
## print
# single thread cost: 11.99697756767273 seconds
# multi-thread cost: 1.426210880279541 seconds
# asyncio cost: 1.3427538871765137

async 表示这是一个协程,await表示这是一个IO

note: 要用在异步IO编程中,依赖的库必须支持异步IO特性;爬虫引用中,requests不支持异步,需要用aiohttp

在异步IO中使用信号量控制爬虫并发度

信号量(Semaphore),又称为心好累、旗语,是一个同步对象,用于保持在0至指定最大值之间的一个技术之。

  • 当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;
  • 当线程完成一次对semaphore对象的释放(release)时,计数值加一;
  • 当计数值为0,则线程等待该semaphore对象不再能成功直至该semaphore对象变成signaled状态
  • semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
########## 使用方式1
sem = asyncio.Semaphore(10)
# ...later
asyncio with sem:
# work with shared resource
########## 使用方式2
sem = asyncio.Semaphore(10)
# ...later
await sem.acquire()
try:
# work with shared resource
finally:
sem.release()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import aiohttp
import time
urls = [f"https://www.cnblogs.com/#p{page}" for page in range(1,50+1)]

semaphore = asyncio.Semaphore(10)
# 获取事件循环
loop = asyncio.get_event_loop()
# 定义协程
async def async_craw(url):
async with semaphore:
print("craw url: ", url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
result = await resp.text()
await asyncio.sleep(5)
print(f"craw url: {url}, {len(result)}")


# 创建task列表
tasks = [
loop.create_task(async_craw(url)) for url in urls
]
# 执行爬虫事件列表
start = time.time()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print("use time seconds: ", end-start)