前言

有一段时间没有更新文章了,最近决定梳理一下这几个月一直在忙的聊天记录问答项目,分享一下作者的心得。

实际上,整个项目不止是基于聊天记录的问答,这里以聊天记录问答为例,是为了各位读者便于理解。

其它信息在这里不便多说,希望各位读者能够理解。

话不多说,咱们直接进入正题。

整个项目打算分为两块来讲:

  • 数据分块篇
  • 问答篇

一、数据样例

在这里插入图片描述
在这篇文章中,我们只需要关注右侧的聊天记录部分。

二、分析

从上图的数据样例可以看出,我们有几个可选的方案:

  • NaiveRAG
    • 将每条消息embedding,直接做RAG
    • 将消息分块embedding,做RAG
  • Text2SQL
    • 根据Query,生成查询语句,基于查询结果问答
  • GraphRAG
    • 根据聊天记录,做实体、关系、三元组抽取,构建知识图谱,基于知识图谱做问答

具体的方案,读者可以自行思考优劣,咱们在这里不展开叙述(下一篇讲)。

很显然,我们首先可以排除 “将每条消息embedding,直接做RAG” 的方案,因为这样做,我们检索出来的消息,几乎都长得一样,并且非常碎片化,基于这些数据生成的回答,往往不够全面。

那么,剩下的方案,看起来都是有效的,事实也确实如此,我们最终的方案便是三者结合。

NaiveRAG和GraphRAG有一个共同点:由于LLM最大上下文限制,当两人的聊天记录非常多时,需要对数据分块。

实际上,我们的业务不仅需要做问答,还需要对聊天记录摘要,因此,合理的数据分块,是非常重要的。

这便引出了我们这篇博客主题。

三、如何做

我们的核心原则:不能把同一个话题划分到多个窗口,要尽量保持话题的完整性。

结合我们平时的聊天习惯很容易想到的是:当两条消息的发送时间间隔不长时,那他们大概属于同一个话题。

很显然,这是对的。

我们的做法:

  1. 基于消息发送时间间隔划分
    • 当两条消息的发送时间间隔小于某个阈值时,被认为属于同一个窗口
    • 这个窗口中,可能包括了多个话题,但这些话题,没有被拆分到多个窗口,因此一个窗口中有多个话题也是是没关系的
    • 通过大量数据分析发现,这个阈值在 120 分钟时,效果是比较好的

通过时间间隔划分好窗口后我们发现,有些话题,还是被划分到多个窗口去了,比如:有些消息没有得到及时回复,可能间隔了三五个小时,但他们确实属于同一个话题,应该被划分到同一个窗口。

  1. 合并相似窗口

    • 当前一个窗口的末尾部分的内容与当前窗口的开始部分的内容相似度大于某个阈值时,被认为属于同一个窗口
    • 这个阈值,我们实验下来,0.8是比较合适的
  2. 划分长窗口

    • 目前LLM的上下文主流的都在32768左右,但我们实验发现,当上下文超过8k时,LLM的效果下降得厉害
    • 因此,我们对于超长的窗口,将其划分为长度在8k token左右的多个窗口,并且每个窗口保持一定长度的首尾消息冗余
  3. 合并短窗口

很多同学可能会有疑问,前面不是把长窗口划分成短窗口了吗,为什么又要合并起来

这里的短窗口,是指只有三五条消息的窗口,比如:两人虽然聊了很多个话题,但每个话题的消息数量都很少,也就是加起来的token很少,并且信息含量较低,如果把这种信息含量较低的内容都丢给LLM处理一次的话,很容易出现很多没什么用的总结,并且增加了LLM处理时间,因此,我们将这些短的窗口合并起来,只要不超过某一长度阈值,就都合并到同一个窗口

至此,我们便得到了划分比较合理的多个聊天窗口了。

四、结合代码理解

代码不一定完全实现了以上逻辑,为了帮助大家理解,可以参考下

class SplitChatWindow:
    def __init__(self):
        self.simalarity_endpoint = os.getenv(
            "SIMALARITY_ENDPOINT")

    def get_simalarity(self, a, b):
        payload = json.dumps(
            {"inputs": {"sentences": [b], "source_sentence": a}})
        headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer ' + os.getenv("OPENAI_API_KEY", "empty")
        }

        response = requests.request(
            "POST", self.simalarity_endpoint, headers=headers, data=payload
        )
        simalarity = response.json()["data"][0]
        return simalarity


    def split_chat_by_time(self, chat_records, time_threshold_minutes):
        chat_records.sort(
            key=lambda x: parse(x["Time"].strip())
        )

        topic_windows = []
        current_window = [chat_records[0]]

        for i in range(1, len(chat_records)):
            current_time = parse(
                chat_records[i]["Time"].strip()
            )
            prev_time = parse(
                chat_records[i - 1]["Time"].strip()
            )
            time_diff = (current_time - prev_time).total_seconds() / \
                60  # in minutes

            if time_diff <= time_threshold_minutes:
                current_window.append(chat_records[i])
            else:
                topic_windows.append(current_window)
                current_window = [chat_records[i]]

        topic_windows.append(current_window)
        return topic_windows

    def merge_similar_window_batch_with_sbert(self, windows, similarity_threshold):

        merged_windows = [windows[0]]
        window_content_list = [
            "\n".join([msg["Content"].strip() for msg in merged_windows[-1]])
        ]

        for window in windows[1:]:
            new_window_content = "\n".join(
                [msg["Content"].strip() for msg in window])
            window_content_list.append(new_window_content)

        for i in range(1, len(windows)):
            if window_content_list[i - 1].strip() != "" and window_content_list[i].strip() != "":
                similarity = self.get_simalarity(
                    window_content_list[i -
                                        1][-256:], window_content_list[i][:256]
                )
                if similarity > similarity_threshold:
                    merged_windows[-1].extend(windows[i])
                else:
                    merged_windows.append(windows[i])
            else:
                merged_windows.append(windows[i])

        return merged_windows

    def merge_short_window(self, windows, max_length=8192):
        merged_windows = [windows[0]]

        for idx, window in enumerate(windows[1:]):
            current_window_content = "\n".join(
                [
                    msg["SendNickName"]
                    + " -> "
                    + msg["ReceiveNickName"]
                    + ": "
                    + msg["Content"].strip()
                    for msg in merged_windows[-1]
                ]
            )
            new_window_content = "\n".join(
                [
                    msg["SendNickName"]
                    + " -> "
                    + msg["ReceiveNickName"]
                    + ": "
                    + msg["Content"].strip()
                    for msg in window
                ]
            )

            total_length = len(current_window_content) + \
                len(new_window_content)

            if total_length < max_length:
                merged_windows[-1] += window
            else:
                merged_windows.append(window)

        return merged_windows

    def split_long_window(self, windows, max_length=32000):
        new_windows = []

        for window in windows:
            current_window_content = "\n".join(
                [
                    msg["SendNickName"]
                    + " -> "
                    + msg["ReceiveNickName"]
                    + ": "
                    + msg["Content"].strip()
                    for msg in window
                ]
            )

            window_content_length = len(current_window_content)
            if window_content_length > max_length:
                # 计算需要拆分成多少个子窗口
                num_splits = (window_content_length // max_length) + 1
                messages_per_window = len(window) // num_splits

                # 按照消息数量平均拆分
                for i in range(0, len(window), messages_per_window):
                    new_windows.append(window[i:i + messages_per_window])

                logger.info(
                    f"window 长度为:{window_content_length}, 拆分成: {num_splits} 个window"
                )
            else:
                new_windows.append(window)

        return new_windows

    def split_window_by_length(self, message_list, max_length=512):
        merged_windows = [[message_list[0]]]
        for msg in message_list[1:]:
            current_window_content = "\n".join(
                [
                    msg["SendNickName"]
                    + " -> "
                    + msg["ReceiveNickName"]
                    + ": "
                    + msg["Content"].strip()
                    for msg in merged_windows[-1]
                ]
            )
            new_window_content = (
                msg["SendNickName"]
                + " -> "
                + msg["ReceiveNickName"]
                + ": "
                + msg["Content"].strip()
            )
            length = len(current_window_content) + len(new_window_content)
            if length < max_length:
                merged_windows[-1].append(msg)
            else:
                merged_windows.append([msg])

        return merged_windows

总结

有不同的见解欢迎在评论区留言讨论

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐