本文实例讲述了Sanic框架流式传输操作。分享给大家供大家参考,具体如下:
简介
Sanic是一个类似Flask的Python 3.5+ Web服务器,它的写入速度非常快。除了Flask之外,Sanic还支持异步请求处理程序。这意味着你可以使用Python 3.5中新的闪亮的异步/等待语法,使你的代码非阻塞和快速。
在前面一篇《Sanic框架Cookies操作》中已经讲到,如何在Sanic中使用Cookie,接下来将介绍一下Sanic的流的使用:
请求流式传输
Sanic允许通过流获取请求数据,如下所示,当请求结束时,request.stream.get()
返回为None
,只有post
、put
和patch decorator
拥有流参数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from sanic.response import stream @app .post( "/post_stream" ,stream = True ) async def post_stream(request): async def streaming(response): while True : body = await request.stream.get() if body is None : break body = body.decode( "utf-8" ) reponse.write(body) return stream(streaming) @app .put( "/put_stream" ,stream = True ) async def put_stream(request): async def streaming(response): while True : body = await request.stream.get() if body is None : break body = body.decode( "utf-8" ) response.write( "utf-8" ) return stream(streaming) |
除了上述例子的方法之外,我们之前还讲过用add_route
方法动态添加路由:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from sanic.response import text from sanic.views import HTTPMethodView from sanic.views import stream as stream_decorator class StreamView(HTTPMethodView) @stream_decorator async def post( self ,request) result = '' while True : body = await request.stream.get() if body is None : break body = body.decode( 'utf-8' ) result + = body return text(result) app.add_route(StreamView.as_view(), "/method_view" ) |
值得注意的是,stream_decorator
装饰器中处理函数的函数名称,若为post则为post请求,若为put则为put请求。在之前讲述路由的文章《Sanic框架路由用法》中讲到一个CompositionView
类来自定义一个路由,CompositionView
在流式请求中同样适用:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from sanic.views import CompositionView async def post_stream_view(request): result = '' while True : body = await request.stream.get() if body is None : break body = body.decode( 'utf-8' ) result + = body return text(result) view = CompositionView() view.add([ 'POST' ],post_stream_view,stream = True ) app.add_route(view, "/post_stream_view" ) |
响应流式传输
Sanic允许你使用stream
方法将内容传输到客户端,该方法接受一个通过StreamingHTTPResponse
传入的对象的协程回调,举个栗子:
1
2
3
4
5
6
7
|
from sanic.response import stream @app .route( "/post_stream_info" ,methods = [ "POST" ]) async def post_stream_info(request): async def streaming(response): response.write( "no" ) response.write( "bug" ) return stream(streaming) |
希望本文所述对大家Python程序设计有所帮助。
原文链接:https://blog.csdn.net/y472360651/article/details/80212483