Skip to content

Server-Sent Events

Server-Sent Events(简称 SSE)是服务器向浏览器推送信息的一种方式
话说在 ChatGPT 的 api 文档 中也看到 SSE 了
我使用 SSE 是为了在前端页面实时查看 K8S deployment 的部署日志

阮一峰的网络日志中引用

严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。

也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。

SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议,目前除了 IE/Edge,其他浏览器都支持。

[客户端] 在 js 中发起 sse

新建 EventSource 实例即可:new EventSource(url)

js
var url = '/api/v1/sse/logs/?type='+lt+'&id='+id+'&num='+num
var _this = this
this.source = new EventSource(url)
this.source.onmessage = function (event) {
  if (event.data == '[DONE]') {
    _this.log += '\n\n=== 日志结束 ==='
    _this.autoScroll()
    _this.source.close()
    // _this.getData()
  } else if (event.data == '[NONE]') {
    // 不能什么都不写,但是可以是注释
  } else {
    // console.log(event.data)
    _this.log += event.data + '\n'
    _this.autoScroll()
  }
}

[服务端] 在 django 中使用 sse

https://stackoverflow.com/questions/54326515/how-can-i-make-sse-with-python-django

使用配置都很简单,但是实际测试情况是没有实时的进行输出,而是卡住了,超时后一次性输出了;找了半天的原因,最终发现是 nginx 需要修改配置

python
class SSELogs(LoginRequiredMixin, View):
    """日志流,使用SSE"""

    def get(self, request):
        response = StreamingHttpResponse(
            self.event_stream(request),
            content_type='text/event-stream'
        )

        # 设置响应的 Cache-Control
        response['Cache-Control'] = 'no-cache'
        return response

    def event_stream(self, request):
        # 构造 SSE 数据
        _tp = request.GET.get('type')
        _id = request.GET.get('id')
        _num = int(request.GET.get('num', 0))
        if _tp is None or _id is None:
            yield 'data: [DONE]\n\n'

        while True:
            res = client['popop']['sse_logs'].find({
                'type': _tp,
                'id': int(_id),
            }).limit(1).skip(_num)
            for r in res:
                _log = r['log']
                # 处理多行情况,每行必须是 data: xxx\n
                _log = _log.replace('\n', '\ndata: ')
                _num += 1
                break
            else:
                _log = '[NONE]'
                time.sleep(0.2)
            yield f'data: {_log}\n\n'
            if _log == '[DONE]':
                break

nginx 支持 sse

https://stackoverflow.com/questions/13672743/eventsource-server-sent-events-through-nginx

uwsgi 的话,必须使用 uwsgi_buffering off;

nginx
location /api/v1/sse/ {
  include uwsgi_params;
  uwsgi_pass 127.0.0.1:8000;
  uwsgi_buffering off;
  proxy_set_header Connection '';
  proxy_http_version 1.1;
  chunked_transfer_encoding off;
  # proxy_buffering off;
  proxy_cache off;
}

多客户端并发情况

  • 客户端在调用 sse 之前先请求后端获取一次当前 log 位置(条数),以及初始的 log
  • 初始 log:比如总共 100 条,
    • 如果最后一条有 end 标识(data: [DONE]),初始 log 即为全部 log,则不用 sse 了
    • 如果最后一条没有 end 标识,表示日志正在产生,返回当前 log 位置,初始 log 为当前所有的日志

这样后,多客户端对同一条日志的实时查看就不会产生混乱了;各自均能正常的查看实时日志

[服务端2] 在 sanic 中使用 sse

我的 sanic 并不使用 nginx

python
class SSELogsView(HTTPMethodView):
    """日志流,使用SSE"""

    async def get(self, request):
        print(request.args)
        _tp = request.args.get('type')
        _id = request.args.get('id')
        _num = int(request.args.get('num', 0))
        response = await request.respond(content_type='text/event-stream')
        while True:
            res = request.app.ctx.client['popop']['sse_logs'].find({
                'type': _tp,
                'id': _id,
            }).limit(1).skip(_num)
            async for r in res:
                _log = r['log']
                # 处理多行情况,每行必须是 data: xxx\n
                _log = _log.replace('\n', '\ndata: ')
                _num += 1
                break
            else:
                # _log = '[NONE]'
                await asyncio.sleep(0.2)
                continue
            if _log == '[DONE]':
                break
            await response.send(f'data: {_log}\n\n')
        await response.eof()
  • sanic 原生支持流式响应,所以使用起来比 django 更加简单方便
  • 同时 sanic 还提供 response.eof() 来主动断开连接
  • 前端就不再使用 data: [DONE] 来判断是否断开了;直接使用 EventSource.onerror 方法就行
  • sanic 官方相关文档:Sanic-流式传输-响应流

鉴权

  • 和 WebSocket 一样,无法自定请求头;只能依靠浏览器的“隐式”身份验证
  • 通过 cookie 方式最简单方便了,这就要求前后端不能跨域

参考