python模块-协程

协程

首先写一段简单的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio

async def test1():
await test2()
print('This is test1')

async def test2():
print("This is test2")


if __name__ =="__main__":
b = test1()
b.send(None)

很简单的代码,加上async以后,这个函数就成了一个异步函数。异步函数需要通过send() 方法才能执行。否则会报错。

之前的疑惑

之前最大的疑惑就是,test2都被挂起在那里了,就只有单线程,单线程都去执行test1了,怎么test2还会有值会返回?

后来陆续看了一些文章,涉及到了系统底层。
系统将这样的IO变化称之为事件变化,提供了专门的模块来进行处理,
所以,很明显test2触发了,比如发送请求了,但是没返回,所以先把test2挂起来以后,就由系统去检测其IO变化.
一旦有值返回了,系统回来告诉应用程序,
应用程序通过回调函数对值做处理。

简单的协程代码

在python里面,只有调用了 send方法,协程函数才会去调用一次,所以要整个协程放进去事件循环里面,才能完全的执行完协程函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import asyncio
from datetime import datetime

'''
test1 start 2018-07-25 14:35:56.808890
test1 start 2018-07-25 14:35:56.808890
test1 start 2018-07-25 14:35:56.808890
test1 done 2018-07-25 14:36:01.835863
test1 done 2018-07-25 14:36:01.835863
test1 done 2018-07-25 14:36:01.835863
'''

async def test1():
print("test1 start {}".format(datetime.now()))
await asyncio.sleep(5)
print("test1 done {}".format(datetime.now()))


if __name__ == "__main__":
a = test1()
b = test1()
c = test1()
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(a),
asyncio.ensure_future(b),
asyncio.ensure_future(c),
]
loop.run_until_complete(asyncio.wait(tasks)) # 如果是task,必须要wait一下。

在这一段代码里面,run_until_complete将协程包装成了一个task对象,也就是Future类的子类.
保存了协程运行后的状态,方便后面IO变化以后,用于获取协程的结果。

增加回调函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio

'''
I am test1 and the param is 2
the waitting x is 20
'''
async def test1(x):
print("I am test1 and the param is {}".format(x))
await asyncio.sleep(10)
return x*10

# param 是一个future对象。想象一下,这个类来自未来。
def callback(param):
print('the waitting x is {}'.format(param.result()))

a = test1(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(a)
task.add_done_callback(callback)
loop.run_until_complete(task)

调用了一个回调函数,用于处理异步函数test1的返回值。
如果回调函数需要增加参数,可以使用functools.partial。

使用回调函数需要十分小心,避免掉进回调的地狱。
这里我们使用同步的方式来处理异步函数返回来的值,而不一定使用回调函数,
其方式就是借用future对象,可以取到异步执行后的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

'''
I am test1 and the param is 2
20
'''
async def test1(x):
print("I am test1 and the param is {}".format(x))
await asyncio.sleep(10) # 这里是异步操作,所以可以挂起
return x*10

a = test1(2)
loop = asyncio.get_event_loop()
# task 是来自未来的对象
task = asyncio.ensure_future(a)
loop.run_until_complete(task)
print(task.result())

线程阻塞问题

在使用requests的时候,会阻塞掉唯一的线程,就是说你的函数虽然是异步的,
但是在这个异步的函数里面,有阻塞线程的调用.
于是,你没办法挂起这个函数。这个时候,可以开一个线程让它去阻塞,而当前线程继续执行。
当然这样做,就成了多线程了就是。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
from datetime import datetime
import time,requests


async def run(url):
loop = asyncio.get_event_loop()
print('start {} and time is {}'.format(url,datetime.now()))
try:
task = loop.run_in_executor(None,requests.get,url)
#task2 = loop.run_in_executor(None,time.sleep,10)
complete,pending = await asyncio.wait([task])
for t in complete:
print(t.result())

except Exception as e:
print(e)

print('done {} and time is {}'.format(url,datetime.now()))

url_list = ['http://www.baidu.com','http://www.google.com','http://www.sina.com.cn']
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
# 这里是并发的场景,每当有协程挂起,就需要wait,如果没加,就会报错。
loop.run_until_complete(asyncio.wait(tasks))

>>>>


start http://www.baidu.com and time is 2018-07-25 15:37:57.221670
start http://www.google.com and time is 2018-07-25 15:37:57.222670
start http://www.sina.com.cn and time is 2018-07-25 15:37:57.223670
done http://www.sina.com.cn and time is 2018-07-25 15:38:07.759670
done http://www.google.com and time is 2018-07-25 15:38:07.936670
done http://www.baidu.com and time is 2018-07-25 15:38:08.014670

总结

在协程中,因为很多请求是同步的,会阻塞掉当前的线程,你根本无法挂起这个请求,执行并发,除非使用一些异步框架来做,或者如上开一个另外的线程来 做这种阻塞的动作。这样其实跟多线程没啥区别了,有待研究。当然可以使用gevent这样的框架来做,可以用写同步函数的方式来做异步IO的事情,执行并发等等,但是也有其他的一些缺陷,但是对其底层不了解的话,很容易出现不知所谓的错误。
优点:
协程开销小

缺点:
复杂
一旦异步,全部都必须异步。

reference

  1. http://www.dongwm.com/archives/使用Python进行并发编程-asyncio篇/

  2. http://thief.one/2018/06/21/1/