HTTP Stream 模式

客户端发起请求,服务端数据过大 或者 需要延时处理并且同步返回数据的场景

下面以 Flask 为例

1. 服务端

实现每隔 2s 返回一行数据 {task}-{time.now()}-{process}

import time
from datetime import datetime
from flask import Flask, Response,request

app = Flask(__name__)

def compute(a):
    return f'{(a/10) * 100}%'
    
def stream(task):
    try:
        print(f"{task} - start!")
        a = 1
        while True:
            yield f"{task}-{datetime.now().isoformat()}-{compute(a)}\n"
            a += 1
            time.sleep(2)
            # if a > 10:
            #     break
    except Exception as e:
        print(e)
        e.with_traceback()
    finally:
        print(f"{task} - finish!")
 
@app.route('/data')
def example():
    task = request.args.get("task")
    return Response(stream(task), mimetype='text/plain')

if __name__ == '__main__':
   app.run()

2.客户端

发起请求,并带不同的参数

import  requests

task = "121"
res = requests.get(f"http://127.0.0.1:5000/data?task={task}",stream=True)
results = []
for result in res.iter_content(chunk_size=1024):
    # 中间结果
    print("RESULT: ",result)
    results.append(result)
# 最终结果
print(results)

3.执行

  • 服务端启动
  • task = 121 运行 30s , 停止
  • task = 120 运行 8s , 停止

如图所示,服务端响应结果如下

  • 当 task 120 开始和结束后,服务端会收到请求停止和结束
  • 当 task 121 开始和结束后,服务端会收到请求停止和结束
    在这里插入图片描述

4.例子:任务引擎

如代码所示,实现的一个异步计算和同步返回的例子

任务引擎代码

from threading import  Thread

import time

class  Task():

    def __init__(self,task):
        # 任务
        self.task = task
        # 状态
        self.is_start = False
        # 进度
        self.proecess = ""

    def start(self):
        """
        异步执行:启动线程
        """
        if self.is_start:
            return
        self.is_start = True
        # 异步执行
        local = Thread(target=self.__execute)
        local.setDaemon(True)
        local.start()

    def __execute(self):
        # 启动
        a = 1
        print(f"{self.task} - start!")
        # 异步执行的操作在这里实现
        # 例子为,每 2s 输出进度
        while self.is_start:
            a+=1
            self.proecess = f"{self.task}-{(a/10)*100}%"
            time.sleep(2)
        # 结束
        print(f"{self.task} stop!")

    def get_stream(task_id):
        """
        同步返回:返回执行中间结果
        """
        try:
            t = Task(task_id)
            # 异步执行
            t.start()
            # 同步等待结果,返回执行中间结果
            while True:
                yield t.proecess
                time.sleep(2)
        except Exception as e:
            print(e)
        finally:
            t.stop()
            print("finish")
        
    def stop(self):
        self.is_start = False

服务端代码

调用,客户端代码与上面一致

import time
from datetime import datetime
from flask import Flask, Response,request
from task import Task

app = Flask(__name__)

@app.route('/data')
def example():
    task = request.args.get("task")
    return Response(Task.get_stream(task), mimetype='text/plain')

if __name__ == '__main__':
   app.run()
Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐