Server-Sent Events
Server-Sent Events(简称 SSE)是服务器向浏览器推送信息的一种方式
话说在 ChatGPT 的 api 文档 中也看到 SSE 了
我使用 SSE 是为了在前端页面实时查看 K8S deployment 的部署日志
从阮一峰的网络日志中引用
严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。
也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。
SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。
[客户端] 在 js 中发起 sse
新建 EventSource
实例即可:new EventSource(url)
js
var url = '/api/v1/sse/logs/?type='+lt+'&id='+id+'&num='+num
var _this = this
this.source = new EventSource(url)
this.source.onmessage = function (event) {
if (event.data == '[DONE]') {
_this.log += '\n\n=== 日志结束 ==='
_this.autoScroll()
_this.source.close()
// _this.getData()
} else if (event.data == '[NONE]') {
// 不能什么都不写,但是可以是注释
} else {
// console.log(event.data)
_this.log += event.data + '\n'
_this.autoScroll()
}
}
[服务端] 在 django 中使用 sse
https://stackoverflow.com/questions/54326515/how-can-i-make-sse-with-python-django
使用配置都很简单,但是实际测试情况是没有实时的进行输出,而是卡住了,超时后一次性输出了;找了半天的原因,最终发现是 nginx 需要修改配置
python
class SSELogs(LoginRequiredMixin, View):
"""日志流,使用SSE"""
def get(self, request):
response = StreamingHttpResponse(
self.event_stream(request),
content_type='text/event-stream'
)
# 设置响应的 Cache-Control
response['Cache-Control'] = 'no-cache'
return response
def event_stream(self, request):
# 构造 SSE 数据
_tp = request.GET.get('type')
_id = request.GET.get('id')
_num = int(request.GET.get('num', 0))
if _tp is None or _id is None:
yield 'data: [DONE]\n\n'
while True:
res = client['popop']['sse_logs'].find({
'type': _tp,
'id': int(_id),
}).limit(1).skip(_num)
for r in res:
_log = r['log']
# 处理多行情况,每行必须是 data: xxx\n
_log = _log.replace('\n', '\ndata: ')
_num += 1
break
else:
_log = '[NONE]'
time.sleep(0.2)
yield f'data: {_log}\n\n'
if _log == '[DONE]':
break
- StreamingHttpResponse 异步介绍:https://github.com/django/django/pull/16384
data: [DONE]
为日志结束标识
nginx 支持 sse
https://stackoverflow.com/questions/13672743/eventsource-server-sent-events-through-nginx
uwsgi 的话,必须使用 uwsgi_buffering off;
nginx
location /api/v1/sse/ {
include uwsgi_params;
uwsgi_pass 127.0.0.1:8000;
uwsgi_buffering off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding off;
# proxy_buffering off;
proxy_cache off;
}
多客户端并发情况
- 客户端在调用 sse 之前先请求后端获取一次当前 log 位置(条数),以及初始的 log
- 初始 log:比如总共 100 条,
- 如果最后一条有 end 标识(
data: [DONE]
),初始 log 即为全部 log,则不用 sse 了 - 如果最后一条没有 end 标识,表示日志正在产生,返回当前 log 位置,初始 log 为当前所有的日志
- 如果最后一条有 end 标识(
这样后,多客户端对同一条日志的实时查看就不会产生混乱了;各自均能正常的查看实时日志
[服务端2] 在 sanic 中使用 sse
我的 sanic 并不使用 nginx
python
class SSELogsView(HTTPMethodView):
"""日志流,使用SSE"""
async def get(self, request):
print(request.args)
_tp = request.args.get('type')
_id = request.args.get('id')
_num = int(request.args.get('num', 0))
response = await request.respond(content_type='text/event-stream')
while True:
res = request.app.ctx.client['popop']['sse_logs'].find({
'type': _tp,
'id': _id,
}).limit(1).skip(_num)
async for r in res:
_log = r['log']
# 处理多行情况,每行必须是 data: xxx\n
_log = _log.replace('\n', '\ndata: ')
_num += 1
break
else:
# _log = '[NONE]'
await asyncio.sleep(0.2)
continue
if _log == '[DONE]':
break
await response.send(f'data: {_log}\n\n')
await response.eof()
- sanic 原生支持流式响应,所以使用起来比 django 更加简单方便
- 同时 sanic 还提供
response.eof()
来主动断开连接 - 前端就不再使用
data: [DONE]
来判断是否断开了;直接使用EventSource.onerror
方法就行 - sanic 官方相关文档:Sanic-流式传输-响应流
鉴权
- 和 WebSocket 一样,无法自定请求头;只能依靠浏览器的“隐式”身份验证
- 通过 cookie 方式最简单方便了,这就要求前后端不能跨域