Python的Web框架中Tornado以异步非阻塞而闻名。本篇将使用200行代码完成一个微型异步非阻塞Web框架:Snow。
一、源码
本文基于非阻塞的Socket以及IO多路复用从而实现异步非阻塞的Web框架,其中便是众多异步非阻塞Web框架内部原理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
|
#!/usr/bin/env python # -*- coding:utf-8 -*- import re import socket import select import time class HttpResponse( object ): """ 封装响应信息 """ def __init__( self , content = ''): self .content = content self .headers = {} self .cookies = {} def response( self ): return bytes( self .content, encoding = 'utf-8' ) class HttpNotFound(HttpResponse): """ 404时的错误提示 """ def __init__( self ): super (HttpNotFound, self ).__init__( '404 Not Found' ) class HttpRequest( object ): """ 用户封装用户请求信息 """ def __init__( self , conn): self .conn = conn self .header_bytes = bytes() self .header_dict = {} self .body_bytes = bytes() self .method = "" self .url = "" self .protocol = "" self .initialize() self .initialize_headers() def initialize( self ): header_flag = False while True : try : received = self .conn.recv( 8096 ) except Exception as e: received = None if not received: break if header_flag: self .body_bytes + = received continue temp = received.split(b '\r\n\r\n' , 1 ) if len (temp) = = 1 : self .header_bytes + = temp else : h, b = temp self .header_bytes + = h self .body_bytes + = b header_flag = True @property def header_str( self ): return str ( self .header_bytes, encoding = 'utf-8' ) def initialize_headers( self ): headers = self .header_str.split( '\r\n' ) first_line = headers[ 0 ].split( ' ' ) if len (first_line) = = 3 : self .method, self .url, self .protocol = headers[ 0 ].split( ' ' ) for line in headers: kv = line.split( ':' ) if len (kv) = = 2 : k, v = kv self .header_dict[k] = v class Future( object ): """ 异步非阻塞模式时封装回调函数以及是否准备就绪 """ def __init__( self , callback): self .callback = callback self ._ready = False self .value = None def set_result( self , value = None ): self .value = value self ._ready = True @property def ready( self ): return self ._ready class TimeoutFuture(Future): """ 异步非阻塞超时 """ def __init__( self , timeout): super (TimeoutFuture, self ).__init__(callback = None ) self .timeout = timeout self .start_time = time.time() @property def ready( self ): current_time = time.time() if current_time > self .start_time + self .timeout: self ._ready = True return self ._ready class Snow( object ): """ 微型Web框架类 """ def __init__( self , routes): self .routes = routes self .inputs = set () self .request = None self .async_request_handler = {} def run( self , host = 'localhost' , port = 9999 ): """ 事件循环 :param host: :param port: :return: """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind((host, port,)) sock.setblocking( False ) sock.listen( 128 ) sock.setblocking( 0 ) self .inputs.add(sock) try : while True : readable_list, writeable_list, error_list = select.select( self .inputs, [], self .inputs, 0.005 ) for conn in readable_list: if sock = = conn: client, address = conn.accept() client.setblocking( False ) self .inputs.add(client) else : gen = self .process(conn) if isinstance (gen, HttpResponse): conn.sendall(gen.response()) self .inputs.remove(conn) conn.close() else : yielded = next (gen) self .async_request_handler[conn] = yielded self .polling_callback() except Exception as e: pass finally : sock.close() def polling_callback( self ): """ 遍历触发异步非阻塞的回调函数 :return: """ for conn in list ( self .async_request_handler.keys()): yielded = self .async_request_handler[conn] if not yielded.ready: continue if yielded.callback: ret = yielded.callback( self .request, yielded) conn.sendall(ret.response()) self .inputs.remove(conn) del self .async_request_handler[conn] conn.close() def process( self , conn): """ 处理路由系统以及执行函数 :param conn: :return: """ self .request = HttpRequest(conn) func = None for route in self .routes: if re.match(route[ 0 ], self .request.url): func = route[ 1 ] break if not func: return HttpNotFound() else : return func( self .request) snow.py |
二、使用
1. 基本使用
1
2
3
4
5
6
7
8
9
|
from snow import Snow from snow import HttpResponse def index(request): return HttpResponse( 'OK' ) routes = [ (r '/index/' , index), ] app = Snow(routes) app.run(port = 8012 ) |
2.异步非阻塞:超时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from snow import Snow from snow import HttpResponse from snow import TimeoutFuture request_list = [] def async(request): obj = TimeoutFuture( 5 ) yield obj def home(request): return HttpResponse( 'home' ) routes = [ (r '/home/' , home), (r '/async/' , async), ] app = Snow(routes) app.run(port = 8012 ) |
3.异步非阻塞:等待
基于等待模式可以完成自定制操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from snow import Snow from snow import HttpResponse from snow import Future request_list = [] def callback(request, future): return HttpResponse(future.value) def req(request): obj = Future(callback = callback) request_list.append(obj) yield obj def stop(request): obj = request_list[ 0 ] del request_list[ 0 ] obj.set_result( 'done' ) return HttpResponse( 'stop' ) routes = [ (r '/req/' , req), (r '/stop/' , stop), ] app = Snow(routes) app.run(port = 8012 ) |
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持服务器之家!
原文链接:http://www.cnblogs.com/wupeiqi/p/6536518.html