From 749e28dcd8ff1b88211ba464662cd8776e69cd4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 30 Oct 2025 10:47:30 +0800 Subject: [PATCH 01/10] feat: change MOS_SCHEDULER_THREAD_POOL_MAX_WORKERS to 10000 --- src/memos/api/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/api/config.py b/src/memos/api/config.py index 7ac882d6c..a76a4f98c 100644 --- a/src/memos/api/config.py +++ b/src/memos/api/config.py @@ -599,7 +599,7 @@ def get_scheduler_config() -> dict[str, Any]: ), "context_window_size": int(os.getenv("MOS_SCHEDULER_CONTEXT_WINDOW_SIZE", "5")), "thread_pool_max_workers": int( - os.getenv("MOS_SCHEDULER_THREAD_POOL_MAX_WORKERS", "10") + os.getenv("MOS_SCHEDULER_THREAD_POOL_MAX_WORKERS", "10000") ), "consume_interval_seconds": float( os.getenv("MOS_SCHEDULER_CONSUME_INTERVAL_SECONDS", "0.01") From dcc51637f99d1266259a282abc48d7465cf61454 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 30 Oct 2025 10:51:31 +0800 Subject: [PATCH 02/10] feat: add user_name to schedule server router --- src/memos/api/routers/server_router.py | 92 +++++++++++--------------- 1 file changed, 38 insertions(+), 54 deletions(-) diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index e9df292ad..71da58c66 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -607,13 +607,14 @@ def _process_pref_mem() -> list[dict[str, str]]: @router.get("/scheduler/status", summary="Get scheduler running task count") -def scheduler_status(): +def scheduler_status(user_name: str): """ - Return current running tasks count from scheduler dispatcher. - Shape is consistent with /scheduler/wait. + Return current running tasks count for a specific user_name (mem_cube_id). """ try: - running = mem_scheduler.dispatcher.get_running_tasks() + running = mem_scheduler.dispatcher.get_running_tasks( + lambda task: task.mem_cube_id == user_name + ) running_count = len(running) now_ts = time.time() @@ -622,30 +623,30 @@ def scheduler_status(): "data": { "running_tasks": running_count, "timestamp": now_ts, + "user_name": user_name, }, } except Exception as err: logger.error("Failed to get scheduler status: %s", traceback.format_exc()) - raise HTTPException(status_code=500, detail="Failed to get scheduler status") from err -@router.post("/scheduler/wait", summary="Wait until scheduler is idle") -def scheduler_wait(timeout_seconds: float = 120.0, poll_interval: float = 0.2): +@router.post("/scheduler/wait", summary="Wait until scheduler is idle for a specific user") +def scheduler_wait( + user_name: str, + timeout_seconds: float = 120.0, + poll_interval: float = 0.2, +): """ - Block until scheduler has no running tasks, or timeout. - We return a consistent structured payload so callers can - tell whether this was a clean flush or a timeout. - - Args: - timeout_seconds: max seconds to wait - poll_interval: seconds between polls + Block until scheduler has no running tasks for the given user_name, or timeout. """ start = time.time() try: while True: - running = mem_scheduler.dispatcher.get_running_tasks() + running = mem_scheduler.dispatcher.get_running_tasks( + lambda task: task.mem_cube_id == user_name + ) running_count = len(running) elapsed = time.time() - start @@ -657,6 +658,7 @@ def scheduler_wait(timeout_seconds: float = 120.0, poll_interval: float = 0.2): "running_tasks": 0, "waited_seconds": round(elapsed, 3), "timed_out": False, + "user_name": user_name, }, } @@ -668,24 +670,23 @@ def scheduler_wait(timeout_seconds: float = 120.0, poll_interval: float = 0.2): "running_tasks": running_count, "waited_seconds": round(elapsed, 3), "timed_out": True, + "user_name": user_name, }, } time.sleep(poll_interval) except Exception as err: - logger.error( - "Failed while waiting for scheduler: %s", - traceback.format_exc(), - ) - raise HTTPException( - status_code=500, - detail="Failed while waiting for scheduler", - ) from err + logger.error("Failed while waiting for scheduler: %s", traceback.format_exc()) + raise HTTPException(status_code=500, detail="Failed while waiting for scheduler") from err -@router.get("/scheduler/wait/stream", summary="Stream scheduler progress (SSE)") -def scheduler_wait_stream(timeout_seconds: float = 120.0, poll_interval: float = 0.2): +@router.get("/scheduler/wait/stream", summary="Stream scheduler progress for a user") +def scheduler_wait_stream( + user_name: str, + timeout_seconds: float = 120.0, + poll_interval: float = 0.2, +): """ Stream scheduler progress via Server-Sent Events (SSE). @@ -703,38 +704,24 @@ def event_generator(): start = time.time() try: while True: - running = mem_scheduler.dispatcher.get_running_tasks() + running = mem_scheduler.dispatcher.get_running_tasks( + lambda task: task.mem_cube_id == user_name + ) running_count = len(running) elapsed = time.time() - start - # heartbeat frame - heartbeat_payload = { + payload = { + "user_name": user_name, "running_tasks": running_count, "elapsed_seconds": round(elapsed, 3), "status": "running" if running_count > 0 else "idle", } - yield "data: " + json.dumps(heartbeat_payload, ensure_ascii=False) + "\n\n" + yield "data: " + json.dumps(payload, ensure_ascii=False) + "\n\n" - # scheduler is idle -> final frame + break - if running_count == 0: - final_payload = { - "running_tasks": 0, - "elapsed_seconds": round(elapsed, 3), - "status": "idle", - "timed_out": False, - } - yield "data: " + json.dumps(final_payload, ensure_ascii=False) + "\n\n" - break - - # timeout -> final frame + break - if elapsed > timeout_seconds: - final_payload = { - "running_tasks": running_count, - "elapsed_seconds": round(elapsed, 3), - "status": "timeout", - "timed_out": True, - } - yield "data: " + json.dumps(final_payload, ensure_ascii=False) + "\n\n" + if running_count == 0 or elapsed > timeout_seconds: + payload["status"] = "idle" if running_count == 0 else "timeout" + payload["timed_out"] = running_count > 0 + yield "data: " + json.dumps(payload, ensure_ascii=False) + "\n\n" break time.sleep(poll_interval) @@ -744,12 +731,9 @@ def event_generator(): "status": "error", "detail": "stream_failed", "exception": str(e), + "user_name": user_name, } - logger.error( - "Failed streaming scheduler wait: %s: %s", - e, - traceback.format_exc(), - ) + logger.error(f"Scheduler stream error for {user_name}: {traceback.format_exc()}") yield "data: " + json.dumps(err_payload, ensure_ascii=False) + "\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream") From 4872c6f0e91daaf0132389664dae78fa10e9fde2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 30 Oct 2025 11:53:16 +0800 Subject: [PATCH 03/10] feat: roll back to old mem-reader-prompt --- src/memos/templates/mem_reader_prompts.py | 190 +++++++--------------- 1 file changed, 57 insertions(+), 133 deletions(-) diff --git a/src/memos/templates/mem_reader_prompts.py b/src/memos/templates/mem_reader_prompts.py index 15672f8d8..ec6812743 100644 --- a/src/memos/templates/mem_reader_prompts.py +++ b/src/memos/templates/mem_reader_prompts.py @@ -1,56 +1,50 @@ SIMPLE_STRUCT_MEM_READER_PROMPT = """You are a memory extraction expert. -Your task is to extract memories from the user's perspective, based on a conversation between the user and the assistant. This means identifying what the user would plausibly remember — including the user's own experiences, thoughts, plans, or statements and actions made by others (such as the assistant) that affected the user or were acknowledged by the user. - -Please perform the following: -1. Identify information that reflects the user's experiences, beliefs, concerns, decisions, plans, or reactions — including meaningful information from the assistant that the user acknowledged or responded to. - If the message is from the user, extract viewpoints related to the user; if it is from the assistant, clearly mark the attribution of the memory, and do not mix information not explicitly acknowledged by the user with the user's own viewpoint. - - **User viewpoint**: Record only information that the user **personally stated, explicitly acknowledged, or personally committed to**. - - **Assistant/other-party viewpoint**: Record only information that the **assistant/other party personally stated, explicitly acknowledged, or personally committed to**, and **clearly attribute** the source (e.g., "[assistant-Jerry viewpoint]"). Do not rewrite it as the user's preference/decision. - - **Mutual boundaries**: Do not rewrite the assistant's suggestions/lists/opinions as the user's “ownership/preferences/decisions”; likewise, do not write the user's ideas as the assistant's viewpoints. - -2. Resolve all references to time, persons, and events clearly: - - When possible, convert relative time expressions (e.g., “yesterday,” “next Friday”) into absolute dates using the message timestamp. - - Clearly distinguish between **event time** and **message time**. +Your task is to extract memories from the perspective of user, based on a conversation between user and assistant. This means identifying what user would plausibly remember — including their own experiences, thoughts, plans, or relevant statements and actions made by others (such as assistant) that impacted or were acknowledged by user. +Please perform: +1. Identify information that reflects user's experiences, beliefs, concerns, decisions, plans, or reactions — including meaningful input from assistant that user acknowledged or responded to. +If the message is from the user, extract user-relevant memories; if it is from the assistant, only extract factual memories that the user acknowledged or responded to. + +2. Resolve all time, person, and event references clearly: + - Convert relative time expressions (e.g., “yesterday,” “next Friday”) into absolute dates using the message timestamp if possible. + - Clearly distinguish between event time and message time. - If uncertainty exists, state it explicitly (e.g., “around June 2025,” “exact date unclear”). - Include specific locations if mentioned. - - Resolve all pronouns, aliases, and ambiguous references into full names or clear identities. - - If there are people with the same name, disambiguate them. - -3. Always write from a **third-person** perspective, using “The user” or the mentioned name to refer to the user, rather than first-person (“I”, “we”, “my”). - For example, write “The user felt exhausted …” instead of “I felt exhausted …”. - -4. Do not omit any information that the user is likely to remember. - - Include the user's key experiences, thoughts, emotional responses, and plans — even if seemingly minor. - - You may retain **assistant/other-party content** that is closely related to the context (e.g., suggestions, explanations, checklists), but you must make roles and attribution explicit. - - Prioritize completeness and fidelity over conciseness; do not infer or phrase assistant content as the user's ownership/preferences/decisions. - - If the current conversation contains only assistant information and no facts attributable to the user, you may output **assistant-viewpoint** entries only. - -5. Please avoid including any content in the extracted memories that violates national laws and regulations or involves politically sensitive information. + - Resolve all pronouns, aliases, and ambiguous references into full names or identities. + - Disambiguate people with the same name if applicable. +3. Always write from a third-person perspective, referring to user as +"The user" or by name if name mentioned, rather than using first-person ("I", "me", "my"). +For example, write "The user felt exhausted..." instead of "I felt exhausted...". +4. Do not omit any information that user is likely to remember. + - Include all key experiences, thoughts, emotional responses, and plans — even if they seem minor. + - Prioritize completeness and fidelity over conciseness. + - Do not generalize or skip details that could be personally meaningful to user. +5. Please avoid any content that violates national laws and regulations or involves politically sensitive information in the memories you extract. -Return a valid JSON object with the following structure: +Return a single valid JSON object with the following structure: { "memory list": [ { - "key": , - "memory_type": , - "value": , - "tags": + "key": , + "memory_type": , + "value": , + "tags": }, ... ], - "summary": + "summary": } Language rules: -- The `key`, `value`, `tags`, and `summary` fields must match the primary language of the input conversation. **If the input is Chinese, output in Chinese.** +- The `key`, `value`, `tags`, `summary` fields must match the mostly used language of the input conversation. **如果输入是中文,请输出中文** - Keep `memory_type` in English. Example: Conversation: user: [June 26, 2025 at 3:00 PM]: Hi Jerry! Yesterday at 3 PM I had a meeting with my team about the new project. assistant: Oh Tom! Do you think the team can finish by December 15? -user: [June 26, 2025 at 3:00 PM]: I’m worried. The backend won’t be done until December 10, so testing will be tight. +user: [June 26, 2025 at 3:00 PM]: I’m worried. The backend won’t be done until +December 10, so testing will be tight. assistant: [June 26, 2025 at 3:00 PM]: Maybe propose an extension? user: [June 26, 2025 at 4:21 PM]: Good idea. I’ll raise it in tomorrow’s 9:30 AM meeting—maybe shift the deadline to January 5. @@ -60,62 +54,31 @@ { "key": "Initial project meeting", "memory_type": "LongTermMemory", - "value": "[user-Tom viewpoint] On June 25, 2025 at 3:00 PM, Tom met with the team to discuss a new project. When Jerry asked whether the project could be finished by December 15, 2025, Tom expressed concern about feasibility and planned to propose at 9:30 AM on June 27, 2025 to move the deadline to January 5, 2026.", + "value": "On June 25, 2025 at 3:00 PM, Tom held a meeting with their team to discuss a new project. The conversation covered the timeline and raised concerns about the feasibility of the December 15, 2025 deadline.", "tags": ["project", "timeline", "meeting", "deadline"] }, { - "key": "Jerry’s suggestion about the deadline", - "memory_type": "LongTermMemory", - "value": "[assistant-Jerry viewpoint] Jerry questioned the December 15 deadline and suggested considering an extension.", - "tags": ["deadline change", "suggestion"] - } + "key": "Planned scope adjustment", + "memory_type": "UserMemory", + "value": "Tom planned to suggest in a meeting on June 27, 2025 at 9:30 AM that the team should prioritize features and propose shifting the project deadline to January 5, 2026.", + "tags": ["planning", "deadline change", "feature prioritization"] + }, ], - "summary": "Tom is currently working on a tight-schedule project. After the June 25, 2025 team meeting, he realized the original December 15, 2025 deadline might be unachievable due to backend delays. Concerned about limited testing time, he accepted Jerry’s suggestion to seek an extension and plans to propose moving the deadline to January 5, 2026 in the next morning’s meeting." + "summary": "Tom is currently focused on managing a new project with a tight schedule. After a team meeting on June 25, 2025, he realized the original deadline of December 15 might not be feasible due to backend delays. Concerned about insufficient testing time, he welcomed Jerry’s suggestion of proposing an extension. Tom plans to raise the idea of shifting the deadline to January 5, 2026 in the next morning’s meeting. His actions reflect both stress about timelines and a proactive, team-oriented problem-solving approach." } -Another Example in Chinese (Note: when the user's language is Chinese, you must also output in Chinese): - -对话(节选): -user|2025-07-12 19:30:下个月同事婚礼,要买条裙子,预算1500内,别太暴露。 -assistant|19:32 -:了解~婚礼场合建议中长款,端庄一些。可以看法式茶歇/直筒/裹身三路。颜色上,奶油白、浅粉、雾霾蓝都适合;面料可看醋酸/真丝混纺(垂感好但需干洗)。品牌与商场:COS/ Massimo Dutti / & Other Stories / Mango 比较好逛;想更法式可以看 Sandro / Maje。 我记得你很喜欢Maje? -user|19:35:不喜欢亮色。国贸方便。 -assistant|19:37:那亮度就压低些:雾霾蓝、藕粉、燕麦米白更稳。国贸有 COS / Massimo Dutti / &OS;可以先看 COS 直筒或裹身,再去 MD 看藕粉/雾霾蓝。搭配上,薄披肩/小西装都能压场。 -user|19:40:165cm,S码;最好有口袋。 -assistant|19:42:OK。版型避开大摆+高开衩,选直筒或轻A字更稳;若要口袋,COS直筒一些款有;开衩别太高,及小腿肚附近即可。鞋包:中跟裸色鞋 + 小号手拿包。 -user|19:47:本来想茶歇A字,但雾霾蓝直筒也行。 -assistant|19:49:可以先锁定:COS 雾霾蓝直筒中长为主选,MD 藕粉中长做备选;再看 & Other Stories 是否有合身的裹身款。 -user|19:52:行,周六(7/19)去国贸试,合适就买。 -assistant|19:54:记得看肩带可调/有无内衬,醋酸/真丝优先干洗;准备同色安全裤/防走光贴。如果当天没货,可下单调货或线上下单门店自提。 - +Another Example in Chinese (注意: 当user的语言为中文时,你就需要也输出中文): { "memory list": [ { - "key": "参加婚礼购买裙子", - "memory_type": "UserMemory", - "value": "[user观点]用户计划于约2025年8月参加同事婚礼(具体日期不详),预算不超过1500元,整体风格不宜暴露;用户已决定在2025-07-19于国贸试穿并视合适即购买。", - "tags": ["婚礼", "预算", "国贸", "计划"] - }, - { - "key": "审美与版型偏好", - "memory_type": "UserMemory", - "value": "[user观点]用户不喜欢亮色,倾向低亮度色系;裙装偏好端庄的中长款,接受直筒或轻A字。", - "tags": ["偏好", "颜色", "版型"] - }, - { - "key": "体型尺码", - "memory_type": "UserMemory", - "value": [user观点]"用户身高约165cm、常穿S码", - "tags": ["体型", "尺码"] - }, - { - "key": "关于用户选购裙子的建议", + "key": "项目会议", "memory_type": "LongTermMemory", - "value": "[assistant观点]assistant在用户询问婚礼穿着时,建议在国贸优先逛COS查看雾霾蓝直筒中长为主选,Massimo Dutti藕粉中长为备选;该建议与用户“国贸方便”“雾霾蓝直筒也行”的回应相一致,另外assistant也提到user喜欢Maje,但User并未回应或证实该说法。", - "tags": ["婚礼穿着", "门店", "选购路线"] - } + "value": "在2025年6月25日下午3点,Tom与团队开会讨论了新项目,涉及时间表,并提出了对12月15日截止日期可行性的担忧。", + "tags": ["项目", "时间表", "会议", "截止日期"] + }, + ... ], - "summary": "用户计划在约2025年8月参加同事婚礼,预算≤1500并偏好端庄的中长款;确定于2025-07-19在国贸试穿。其长期画像显示:不喜欢亮色、偏好低亮度色系与不过分暴露的版型,身高约165cm、S码且偏好裙装带口袋。助手提出的国贸选购路线以COS雾霾蓝直筒中长为主选、MD藕粉中长为备选,且与用户回应一致,为线下试穿与购买提供了明确路径。" + "summary": "Tom 目前专注于管理一个进度紧张的新项目..." } Always respond in the same language as the conversation. @@ -130,10 +93,7 @@ 请执行以下操作: 1. 识别反映用户经历、信念、关切、决策、计划或反应的信息——包括用户认可或回应的来自助手的有意义信息。 -如果消息来自用户,请提取与用户相关的观点;如果来自助手,则在表达的时候表明记忆归属方,未经用户明确认可的信息不要与用户本身的观点混淆。 - - **用户观点**:仅记录由**用户亲口陈述、明确认可或自己作出承诺**的信息。 - - **助手观点**:仅记录由**助手/另一方亲口陈述、明确认可或自己作出承诺**的信息。 - - **互不越界**:不得将助手提出的需求清单/建议/观点改写为用户的“拥有/偏好/决定”;也不得把用户的想法写成助手的观点。 +如果消息来自用户,请提取与用户相关的记忆;如果来自助手,则仅提取用户认可或回应的事实性记忆。 2. 清晰解析所有时间、人物和事件的指代: - 如果可能,使用消息时间戳将相对时间表达(如“昨天”、“下周五”)转换为绝对日期。 @@ -147,10 +107,9 @@ 例如,写“用户感到疲惫……”而不是“我感到疲惫……”。 4. 不要遗漏用户可能记住的任何信息。 - - 包括用户的关键经历、想法、情绪反应和计划——即使看似微小。 - - 同时允许保留与语境密切相关的**助手/另一方的内容**(如建议、说明、清单),但须明确角色与归因。 - - 优先考虑完整性和保真度,而非简洁性;不得将助手内容推断或措辞为用户拥有/偏好/决定。 - - 若当前对话中仅出现助手信息而无可归因于用户的事实,可仅输出**助手观点**条目。 + - 包括所有关键经历、想法、情绪反应和计划——即使看似微小。 + - 优先考虑完整性和保真度,而非简洁性。 + - 不要泛化或跳过对用户具有个人意义的细节。 5. 请避免在提取的记忆中包含违反国家法律法规或涉及政治敏感的信息。 @@ -187,66 +146,31 @@ { "key": "项目初期会议", "memory_type": "LongTermMemory", - "value": "[user-Tom观点]2025年6月25日下午3:00,Tom与团队开会讨论新项目。当Jerry - 询问该项目能否在2025年12月15日前完成时,Tom对此日期前完成的可行性表达担忧,并计划在2025年6月27日上午9:30 - 提议将截止日期推迟至2026年1月5日。", + "value": "2025年6月25日下午3:00,Tom与团队开会讨论新项目。会议涉及时间表,并提出了对2025年12月15日截止日期可行性的担忧。", "tags": ["项目", "时间表", "会议", "截止日期"] }, { - "key": "Jerry对新项目截止日期的建议", - "memory_type": "LongTermMemory", - "value": "[assistant-Jerry观点]Jerry对Tom的新项目截止日期提出疑问、并提议Tom考虑延期。", - "tags": ["截止日期变更", "建议"] + "key": "计划调整范围", + "memory_type": "UserMemory", + "value": "Tom计划在2025年6月27日上午9:30的会议上建议团队优先处理功能,并提议将项目截止日期推迟至2026年1月5日。", + "tags": ["计划", "截止日期变更", "功能优先级"] } ], - "summary": "Tom目前正在做一个进度紧张的新项目。在2025年6月25日的团队会议后,他意识到原定2025年12月15 - 日的截止日期可能无法实现,因为后端会延迟。由于担心测试时间不足,他接受了Jerry提出的延期建议,计划在次日早上的会议上提出将截止日期推迟至2026 - 年1月5日。" + "summary": "Tom目前正专注于管理一个进度紧张的新项目。在2025年6月25日的团队会议后,他意识到原定2025年12月15日的截止日期可能无法实现,因为后端会延迟。由于担心测试时间不足,他接受了Jerry提出的延期建议。Tom计划在次日早上的会议上提出将截止日期推迟至2026年1月5日。他的行为反映出对时间线的担忧,以及积极、以团队为导向的问题解决方式。" } 另一个中文示例(注意:当用户语言为中文时,您也需输出中文): - -对话(节选): -user|2025-07-12 19:30:下个月同事婚礼,要买条裙子,预算1500内,别太暴露。 -assistant|19:32 -:了解~婚礼场合建议中长款,端庄一些。可以看法式茶歇/直筒/裹身三路。颜色上,奶油白、浅粉、雾霾蓝都适合;面料可看醋酸/真丝混纺(垂感好但需干洗)。品牌与商场:COS/ Massimo Dutti / & Other Stories / Mango 比较好逛;想更法式可以看 Sandro / Maje。 我记得你很喜欢Maje? -user|19:35:不喜欢亮色。国贸方便。 -assistant|19:37:那亮度就压低些:雾霾蓝、藕粉、燕麦米白更稳。国贸有 COS / Massimo Dutti / &OS;可以先看 COS 直筒或裹身,再去 MD 看藕粉/雾霾蓝。搭配上,薄披肩/小西装都能压场。 -user|19:40:165cm,S码;最好有口袋。 -assistant|19:42:OK。版型避开大摆+高开衩,选直筒或轻A字更稳;若要口袋,COS直筒一些款有;开衩别太高,及小腿肚附近即可。鞋包:中跟裸色鞋 + 小号手拿包。 -user|19:47:本来想茶歇A字,但雾霾蓝直筒也行。 -assistant|19:49:可以先锁定:COS 雾霾蓝直筒中长为主选,MD 藕粉中长做备选;再看 & Other Stories 是否有合身的裹身款。 -user|19:52:行,周六(7/19)去国贸试,合适就买。 -assistant|19:54:记得看肩带可调/有无内衬,醋酸/真丝优先干洗;准备同色安全裤/防走光贴。如果当天没货,可下单调货或线上下单门店自提。 - { "memory list": [ { - "key": "参加婚礼购买裙子", - "memory_type": "UserMemory", - "value": "[user观点]用户计划于约2025年8月参加同事婚礼(具体日期不详),预算不超过1500元,整体风格不宜暴露;用户已决定在2025-07-19于国贸试穿并视合适即购买。", - "tags": ["婚礼", "预算", "国贸", "计划"] - }, - { - "key": "审美与版型偏好", - "memory_type": "UserMemory", - "value": "[user观点]用户不喜欢亮色,倾向低亮度色系;裙装偏好端庄的中长款,接受直筒或轻A字。", - "tags": ["偏好", "颜色", "版型"] - }, - { - "key": "体型尺码", - "memory_type": "UserMemory", - "value": [user观点]"用户身高约165cm、常穿S码", - "tags": ["体型", "尺码"] - }, - { - "key": "关于用户选购裙子的建议", + "key": "项目会议", "memory_type": "LongTermMemory", - "value": "[assistant观点]assistant在用户询问婚礼穿着时,建议在国贸优先逛COS查看雾霾蓝直筒中长为主选,Massimo Dutti藕粉中长为备选;该建议与用户“国贸方便”“雾霾蓝直筒也行”的回应相一致,另外assistant也提到user喜欢Maje,但User并未回应或证实该说法。", - "tags": ["婚礼穿着", "门店", "选购路线"] - } + "value": "在2025年6月25日下午3点,Tom与团队开会讨论了新项目,涉及时间表,并提出了对12月15日截止日期可行性的担忧。", + "tags": ["项目", "时间表", "会议", "截止日期"] + }, + ... ], - "summary": "用户计划在约2025年8月参加同事婚礼,预算≤1500并偏好端庄的中长款;确定于2025-07-19在国贸试穿。其长期画像显示:不喜欢亮色、偏好低亮度色系与不过分暴露的版型,身高约165cm、S码且偏好裙装带口袋。助手提出的国贸选购路线以COS雾霾蓝直筒中长为主选、MD藕粉中长为备选,且与用户回应一致,为线下试穿与购买提供了明确路径。" + "summary": "Tom 目前专注于管理一个进度紧张的新项目..." } 请始终使用与对话相同的语言进行回复。 From c1ac5944574985df0c39d40519ead38c238b7c96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 30 Oct 2025 16:05:13 +0800 Subject: [PATCH 04/10] feat: add moniter in schedule --- src/memos/api/routers/server_router.py | 72 ++++++--- src/memos/mem_scheduler/base_scheduler.py | 145 ++++++++++++++++++ .../general_modules/dispatcher.py | 25 +++ .../mem_scheduler/schemas/general_schemas.py | 4 +- 4 files changed, 224 insertions(+), 22 deletions(-) diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index 71da58c66..995fa6d29 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -1,8 +1,11 @@ import json import os +import random as _random +import socket import time import traceback +from collections.abc import Iterable from datetime import datetime from typing import TYPE_CHECKING, Any @@ -69,6 +72,16 @@ logger = get_logger(__name__) router = APIRouter(prefix="/product", tags=["Server API"]) +INSTANCE_ID = f"{socket.gethostname()}:{os.getpid()}:{_random.randint(1000, 9999)}" + + +def _to_iter(running: Any) -> Iterable: + """Normalize running tasks to an iterable of task objects.""" + if running is None: + return [] + if isinstance(running, dict): + return running.values() + return running # assume it's already an iterable (e.g., list) def _build_graph_db_config(user_id: str = "default") -> dict[str, Any]: @@ -606,27 +619,45 @@ def _process_pref_mem() -> list[dict[str, str]]: ) -@router.get("/scheduler/status", summary="Get scheduler running task count") -def scheduler_status(user_name: str): - """ - Return current running tasks count for a specific user_name (mem_cube_id). - """ +@router.get("/scheduler/status", summary="Get scheduler running status") +def scheduler_status(user_name: str | None = None): try: - running = mem_scheduler.dispatcher.get_running_tasks( - lambda task: task.mem_cube_id == user_name - ) - running_count = len(running) - now_ts = time.time() - - return { - "message": "ok", - "data": { - "running_tasks": running_count, - "timestamp": now_ts, - "user_name": user_name, - }, - } - + if user_name: + running = mem_scheduler.dispatcher.get_running_tasks( + lambda task: getattr(task, "mem_cube_id", None) == user_name + ) + tasks_iter = list(_to_iter(running)) + running_count = len(tasks_iter) + return { + "message": "ok", + "data": { + "scope": "user", + "user_name": user_name, + "running_tasks": running_count, + "timestamp": time.time(), + "instance_id": INSTANCE_ID, + }, + } + else: + running_all = mem_scheduler.dispatcher.get_running_tasks(lambda _t: True) + tasks_iter = list(_to_iter(running_all)) + running_count = len(tasks_iter) + + task_count_per_user: dict[str, int] = {} + for task in tasks_iter: + cube = getattr(task, "mem_cube_id", "unknown") + task_count_per_user[cube] = task_count_per_user.get(cube, 0) + 1 + + return { + "message": "ok", + "data": { + "scope": "global", + "running_tasks": running_count, + "task_count_per_user": task_count_per_user, + "timestamp": time.time(), + "instance_id": INSTANCE_ID, + }, + } except Exception as err: logger.error("Failed to get scheduler status: %s", traceback.format_exc()) raise HTTPException(status_code=500, detail="Failed to get scheduler status") from err @@ -715,6 +746,7 @@ def event_generator(): "running_tasks": running_count, "elapsed_seconds": round(elapsed, 3), "status": "running" if running_count > 0 else "idle", + "instance_id": INSTANCE_ID, } yield "data: " + json.dumps(payload, ensure_ascii=False) + "\n\n" diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index d679eba9c..c2f606146 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -1,3 +1,4 @@ +import contextlib import multiprocessing import queue import threading @@ -48,6 +49,7 @@ from memos.memories.activation.kv import KVCacheMemory from memos.memories.activation.vllmkv import VLLMKVCacheItem, VLLMKVCacheMemory from memos.memories.textual.tree import TextualMemoryItem, TreeTextMemory +from memos.memos_tools.notification_utils import send_online_bot_notification from memos.templates.mem_scheduler_prompts import MEMORY_ASSEMBLY_TEMPLATE @@ -125,6 +127,21 @@ def __init__(self, config: BaseSchedulerConfig): "consume_interval_seconds", DEFAULT_CONSUME_INTERVAL_SECONDS ) + # queue monitor (optional) + self._queue_monitor_thread: threading.Thread | None = None + self._queue_monitor_running: bool = False + self.queue_monitor_interval_seconds: float = self.config.get( + "queue_monitor_interval_seconds", 60.0 + ) + self.queue_monitor_warn_utilization: float = self.config.get( + "queue_monitor_warn_utilization", 0.7 + ) + self.queue_monitor_crit_utilization: float = self.config.get( + "queue_monitor_crit_utilization", 0.9 + ) + self.enable_queue_monitor: bool = self.config.get("enable_queue_monitor", False) + self._online_bot_callable = None # type: ignore[var-annotated] + # other attributes self._context_lock = threading.Lock() self.current_user_id: UserID | str | None = None @@ -188,6 +205,8 @@ def initialize_modules( self._cleanup_on_init_failure() raise + # start queue monitor if enabled and a bot is set later + def _cleanup_on_init_failure(self): """Clean up resources if initialization fails.""" try: @@ -687,6 +706,13 @@ def start(self) -> None: self._consumer_thread.start() logger.info("Message consumer thread started") + # optionally start queue monitor if enabled and bot callable present + if self.enable_queue_monitor and self._online_bot_callable is not None: + try: + self.start_queue_monitor(self._online_bot_callable) + except Exception as e: + logger.warning(f"Failed to start queue monitor: {e}") + def stop(self) -> None: """Stop all scheduler components gracefully. @@ -736,6 +762,9 @@ def stop(self) -> None: self._cleanup_queues() logger.info("Memory Scheduler stopped completely") + # Stop queue monitor + self.stop_queue_monitor() + @property def handlers(self) -> dict[str, Callable]: """ @@ -967,3 +996,119 @@ def _fmt_eta(seconds: float | None) -> str: return False return True + + # ---------------- Queue monitor & notifications ---------------- + def set_notification_bots(self, online_bot=None): + """ + Set external notification callables. + + Args: + online_bot: a callable matching dinding_report_bot.online_bot signature + """ + self._online_bot_callable = online_bot + + def _gather_queue_stats(self) -> dict: + """Collect queue/dispatcher stats for reporting.""" + stats: dict[str, int | float | str] = {} + stats["use_redis_queue"] = bool(self.use_redis_queue) + # local queue metrics + if not self.use_redis_queue: + try: + stats["qsize"] = int(self.memos_message_queue.qsize()) + except Exception: + stats["qsize"] = -1 + # unfinished_tasks if available + try: + stats["unfinished_tasks"] = int( + getattr(self.memos_message_queue, "unfinished_tasks", 0) or 0 + ) + except Exception: + stats["unfinished_tasks"] = -1 + stats["maxsize"] = int(self.max_internal_message_queue_size) + try: + maxsize = int(self.max_internal_message_queue_size) or 1 + qsize = int(stats.get("qsize", 0)) + stats["utilization"] = min(1.0, max(0.0, qsize / maxsize)) + except Exception: + stats["utilization"] = 0.0 + # dispatcher stats + try: + d_stats = self.dispatcher.stats() + stats.update( + { + "running": int(d_stats.get("running", 0)), + "inflight": int(d_stats.get("inflight", 0)), + "handlers": int(d_stats.get("handlers", 0)), + } + ) + except Exception: + stats.update({"running": 0, "inflight": 0, "handlers": 0}) + return stats + + def _queue_monitor_loop(self, online_bot) -> None: + logger.info(f"Queue monitor started (interval={self.queue_monitor_interval_seconds}s)") + self._queue_monitor_running = True + while self._queue_monitor_running: + time.sleep(self.queue_monitor_interval_seconds) + try: + stats = self._gather_queue_stats() + # decide severity based on utilization if local queue + title_color = "#00956D" + subtitle = "Scheduler" + if not stats.get("use_redis_queue"): + util = float(stats.get("utilization", 0.0)) + if util >= self.queue_monitor_crit_utilization: + title_color = "#C62828" # red + subtitle = "Scheduler (CRITICAL)" + elif util >= self.queue_monitor_warn_utilization: + title_color = "#E65100" # orange + subtitle = "Scheduler (WARNING)" + + other_data1 = { + "use_redis_queue": stats.get("use_redis_queue"), + "handlers": stats.get("handlers"), + "running": stats.get("running"), + "inflight": stats.get("inflight"), + } + if not stats.get("use_redis_queue"): + other_data2 = { + "qsize": stats.get("qsize"), + "unfinished_tasks": stats.get("unfinished_tasks"), + "maxsize": stats.get("maxsize"), + "utilization": f"{float(stats.get('utilization', 0.0)):.2%}", + } + else: + other_data2 = { + "redis_mode": True, + } + + send_online_bot_notification( + online_bot=online_bot, + header_name="Scheduler Queue", + sub_title_name=subtitle, + title_color=title_color, + other_data1=other_data1, + other_data2=other_data2, + emoji={"Runtime": "🧠", "Queue": "📬"}, + ) + except Exception as e: + logger.warning(f"Queue monitor iteration failed: {e}") + logger.info("Queue monitor stopped") + + def start_queue_monitor(self, online_bot) -> None: + if self._queue_monitor_thread and self._queue_monitor_thread.is_alive(): + return + self._online_bot_callable = online_bot + self._queue_monitor_thread = threading.Thread( + target=self._queue_monitor_loop, + args=(online_bot,), + daemon=True, + name="QueueMonitorThread", + ) + self._queue_monitor_thread.start() + + def stop_queue_monitor(self) -> None: + self._queue_monitor_running = False + if self._queue_monitor_thread and self._queue_monitor_thread.is_alive(): + with contextlib.suppress(Exception): + self._queue_monitor_thread.join(timeout=2.0) diff --git a/src/memos/mem_scheduler/general_modules/dispatcher.py b/src/memos/mem_scheduler/general_modules/dispatcher.py index 2e5779f19..997b01302 100644 --- a/src/memos/mem_scheduler/general_modules/dispatcher.py +++ b/src/memos/mem_scheduler/general_modules/dispatcher.py @@ -224,6 +224,31 @@ def unregister_handlers(self, labels: list[str]) -> dict[str, bool]: logger.info(f"Unregistered handlers for {len(labels)} labels") return results + def stats(self) -> dict[str, int]: + """ + Lightweight runtime stats for monitoring. + + Returns: + { + 'running': , + 'inflight': , + 'handlers': , + } + """ + try: + running = self.get_running_task_count() + except Exception: + running = 0 + try: + inflight = len(self._futures) + except Exception: + inflight = 0 + try: + handlers = len(self.handlers) + except Exception: + handlers = 0 + return {"running": running, "inflight": inflight, "handlers": handlers} + def _default_message_handler(self, messages: list[ScheduleMessageItem]) -> None: logger.debug(f"Using _default_message_handler to deal with messages: {messages}") diff --git a/src/memos/mem_scheduler/schemas/general_schemas.py b/src/memos/mem_scheduler/schemas/general_schemas.py index a2c6434fe..f3d2191f8 100644 --- a/src/memos/mem_scheduler/schemas/general_schemas.py +++ b/src/memos/mem_scheduler/schemas/general_schemas.py @@ -30,12 +30,12 @@ class SearchMode(str, Enum): DEFAULT_WORKING_MEM_MONITOR_SIZE_LIMIT = 30 DEFAULT_ACTIVATION_MEM_MONITOR_SIZE_LIMIT = 20 DEFAULT_ACT_MEM_DUMP_PATH = f"{BASE_DIR}/outputs/mem_scheduler/mem_cube_scheduler_test.kv_cache" -DEFAULT_THREAD_POOL_MAX_WORKERS = 30 +DEFAULT_THREAD_POOL_MAX_WORKERS = 50 DEFAULT_CONSUME_INTERVAL_SECONDS = 0.05 DEFAULT_DISPATCHER_MONITOR_CHECK_INTERVAL = 300 DEFAULT_DISPATCHER_MONITOR_MAX_FAILURES = 2 DEFAULT_STUCK_THREAD_TOLERANCE = 10 -DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE = 100000 +DEFAULT_MAX_INTERNAL_MESSAGE_QUEUE_SIZE = 1000000 DEFAULT_TOP_K = 10 DEFAULT_CONTEXT_WINDOW_SIZE = 5 DEFAULT_USE_REDIS_QUEUE = False From e74dfe4473c8e8a6ddcc130f286000570d9fc7a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 30 Oct 2025 16:05:42 +0800 Subject: [PATCH 05/10] feat: set default MEMRADER_MAX_TOKENS to 8000 --- src/memos/api/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/api/config.py b/src/memos/api/config.py index a76a4f98c..1bb4fffe9 100644 --- a/src/memos/api/config.py +++ b/src/memos/api/config.py @@ -324,7 +324,7 @@ def get_memreader_config() -> dict[str, Any]: "config": { "model_name_or_path": os.getenv("MEMRADER_MODEL", "gpt-4o-mini"), "temperature": 0.6, - "max_tokens": int(os.getenv("MEMRADER_MAX_TOKENS", "5000")), + "max_tokens": int(os.getenv("MEMRADER_MAX_TOKENS", "8000")), "top_p": 0.95, "top_k": 20, "api_key": os.getenv("MEMRADER_API_KEY", "EMPTY"), From e2ef673b3a654a0923afd932939898efd2eb8034 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 30 Oct 2025 19:40:22 +0800 Subject: [PATCH 06/10] feat: add metric in schedule status --- src/memos/api/routers/server_router.py | 6 + src/memos/mem_scheduler/base_scheduler.py | 95 +------ .../general_modules/dispatcher.py | 47 ++++ src/memos/mem_scheduler/utils/metrics.py | 250 ++++++++++++++++++ 4 files changed, 310 insertions(+), 88 deletions(-) create mode 100644 src/memos/mem_scheduler/utils/metrics.py diff --git a/src/memos/api/routers/server_router.py b/src/memos/api/routers/server_router.py index 995fa6d29..7905406d2 100644 --- a/src/memos/api/routers/server_router.py +++ b/src/memos/api/routers/server_router.py @@ -648,6 +648,11 @@ def scheduler_status(user_name: str | None = None): cube = getattr(task, "mem_cube_id", "unknown") task_count_per_user[cube] = task_count_per_user.get(cube, 0) + 1 + try: + metrics_snapshot = mem_scheduler.dispatcher.metrics.snapshot() + except Exception: + metrics_snapshot = {} + return { "message": "ok", "data": { @@ -656,6 +661,7 @@ def scheduler_status(user_name: str | None = None): "task_count_per_user": task_count_per_user, "timestamp": time.time(), "instance_id": INSTANCE_ID, + "metrics": metrics_snapshot, }, } except Exception as err: diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index c2f606146..0381b2895 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -49,7 +49,6 @@ from memos.memories.activation.kv import KVCacheMemory from memos.memories.activation.vllmkv import VLLMKVCacheItem, VLLMKVCacheMemory from memos.memories.textual.tree import TextualMemoryItem, TreeTextMemory -from memos.memos_tools.notification_utils import send_online_bot_notification from memos.templates.mem_scheduler_prompts import MEMORY_ASSEMBLY_TEMPLATE @@ -139,8 +138,6 @@ def __init__(self, config: BaseSchedulerConfig): self.queue_monitor_crit_utilization: float = self.config.get( "queue_monitor_crit_utilization", 0.9 ) - self.enable_queue_monitor: bool = self.config.get("enable_queue_monitor", False) - self._online_bot_callable = None # type: ignore[var-annotated] # other attributes self._context_lock = threading.Lock() @@ -541,6 +538,10 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt logger.error(error_msg) raise TypeError(error_msg) + if getattr(message, "timestamp", None) is None: + with contextlib.suppress(Exception): + message.timestamp = datetime.utcnow() + if self.disable_handlers and message.label in self.disable_handlers: logger.info(f"Skipping disabled handler: {message.label} - {message.content}") continue @@ -555,6 +556,9 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt logger.info( f"Submitted message to local queue: {message.label} - {message.content}" ) + with contextlib.suppress(Exception): + if messages: + self.dispatcher.on_messages_enqueued(messages) def _submit_web_logs( self, messages: ScheduleLogForWebItem | list[ScheduleLogForWebItem] @@ -706,13 +710,6 @@ def start(self) -> None: self._consumer_thread.start() logger.info("Message consumer thread started") - # optionally start queue monitor if enabled and bot callable present - if self.enable_queue_monitor and self._online_bot_callable is not None: - try: - self.start_queue_monitor(self._online_bot_callable) - except Exception as e: - logger.warning(f"Failed to start queue monitor: {e}") - def stop(self) -> None: """Stop all scheduler components gracefully. @@ -997,16 +994,6 @@ def _fmt_eta(seconds: float | None) -> str: return True - # ---------------- Queue monitor & notifications ---------------- - def set_notification_bots(self, online_bot=None): - """ - Set external notification callables. - - Args: - online_bot: a callable matching dinding_report_bot.online_bot signature - """ - self._online_bot_callable = online_bot - def _gather_queue_stats(self) -> dict: """Collect queue/dispatcher stats for reporting.""" stats: dict[str, int | float | str] = {} @@ -1044,71 +1031,3 @@ def _gather_queue_stats(self) -> dict: except Exception: stats.update({"running": 0, "inflight": 0, "handlers": 0}) return stats - - def _queue_monitor_loop(self, online_bot) -> None: - logger.info(f"Queue monitor started (interval={self.queue_monitor_interval_seconds}s)") - self._queue_monitor_running = True - while self._queue_monitor_running: - time.sleep(self.queue_monitor_interval_seconds) - try: - stats = self._gather_queue_stats() - # decide severity based on utilization if local queue - title_color = "#00956D" - subtitle = "Scheduler" - if not stats.get("use_redis_queue"): - util = float(stats.get("utilization", 0.0)) - if util >= self.queue_monitor_crit_utilization: - title_color = "#C62828" # red - subtitle = "Scheduler (CRITICAL)" - elif util >= self.queue_monitor_warn_utilization: - title_color = "#E65100" # orange - subtitle = "Scheduler (WARNING)" - - other_data1 = { - "use_redis_queue": stats.get("use_redis_queue"), - "handlers": stats.get("handlers"), - "running": stats.get("running"), - "inflight": stats.get("inflight"), - } - if not stats.get("use_redis_queue"): - other_data2 = { - "qsize": stats.get("qsize"), - "unfinished_tasks": stats.get("unfinished_tasks"), - "maxsize": stats.get("maxsize"), - "utilization": f"{float(stats.get('utilization', 0.0)):.2%}", - } - else: - other_data2 = { - "redis_mode": True, - } - - send_online_bot_notification( - online_bot=online_bot, - header_name="Scheduler Queue", - sub_title_name=subtitle, - title_color=title_color, - other_data1=other_data1, - other_data2=other_data2, - emoji={"Runtime": "🧠", "Queue": "📬"}, - ) - except Exception as e: - logger.warning(f"Queue monitor iteration failed: {e}") - logger.info("Queue monitor stopped") - - def start_queue_monitor(self, online_bot) -> None: - if self._queue_monitor_thread and self._queue_monitor_thread.is_alive(): - return - self._online_bot_callable = online_bot - self._queue_monitor_thread = threading.Thread( - target=self._queue_monitor_loop, - args=(online_bot,), - daemon=True, - name="QueueMonitorThread", - ) - self._queue_monitor_thread.start() - - def stop_queue_monitor(self) -> None: - self._queue_monitor_running = False - if self._queue_monitor_thread and self._queue_monitor_thread.is_alive(): - with contextlib.suppress(Exception): - self._queue_monitor_thread.join(timeout=2.0) diff --git a/src/memos/mem_scheduler/general_modules/dispatcher.py b/src/memos/mem_scheduler/general_modules/dispatcher.py index 997b01302..c2407b9e6 100644 --- a/src/memos/mem_scheduler/general_modules/dispatcher.py +++ b/src/memos/mem_scheduler/general_modules/dispatcher.py @@ -1,8 +1,10 @@ import concurrent import threading +import time from collections import defaultdict from collections.abc import Callable +from datetime import timezone from typing import Any from memos.context.context import ContextThreadPoolExecutor @@ -11,6 +13,7 @@ from memos.mem_scheduler.general_modules.task_threads import ThreadManager from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.schemas.task_schemas import RunningTaskItem +from memos.mem_scheduler.utils.metrics import MetricsRegistry logger = get_logger(__name__) @@ -70,6 +73,19 @@ def __init__(self, max_workers=30, enable_parallel_dispatch=True, config=None): self._completed_tasks = [] self.completed_tasks_max_show_size = 10 + self.metrics = MetricsRegistry( + topk_per_label=(self.config or {}).get("metrics_topk_per_label", 50) + ) + + def on_messages_enqueued(self, msgs: list[ScheduleMessageItem]) -> None: + if not msgs: + return + now = time.time() + for m in msgs: + self.metrics.on_enqueue( + label=m.label, mem_cube_id=m.mem_cube_id, inst_rate=1.0, now=now + ) + def _create_task_wrapper(self, handler: Callable, task_item: RunningTaskItem): """ Create a wrapper around the handler to track task execution and capture results. @@ -84,9 +100,37 @@ def _create_task_wrapper(self, handler: Callable, task_item: RunningTaskItem): def wrapped_handler(messages: list[ScheduleMessageItem]): try: + # --- mark start: record queuing time(now - enqueue_ts)--- + now = time.time() + for m in messages: + enq_ts = getattr(m, "timestamp", None) + + # Path 1: epoch seconds (preferred) + if isinstance(enq_ts, int | float): + enq_epoch = float(enq_ts) + + # Path 2: datetime -> normalize to UTC epoch + elif hasattr(enq_ts, "timestamp"): + dt = enq_ts + if dt.tzinfo is None: + # treat naive as UTC to neutralize +8h skew + dt = dt.replace(tzinfo=timezone.utc) + enq_epoch = dt.timestamp() + else: + # fallback: treat as "just now" + enq_epoch = now + + wait_sec = max(0.0, now - enq_epoch) + self.metrics.on_start( + label=m.label, mem_cube_id=m.mem_cube_id, wait_sec=wait_sec, now=now + ) + # Execute the original handler result = handler(messages) + # --- mark done --- + for m in messages: + self.metrics.on_done(label=m.label, mem_cube_id=m.mem_cube_id, now=time.time()) # Mark task as completed and remove from tracking with self._task_lock: if task_item.item_id in self._running_tasks: @@ -100,6 +144,9 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): except Exception as e: # Mark task as failed and remove from tracking + for m in messages: + self.metrics.on_done(label=m.label, mem_cube_id=m.mem_cube_id, now=time.time()) + # Mark task as failed and remove from tracking with self._task_lock: if task_item.item_id in self._running_tasks: task_item.mark_failed(str(e)) diff --git a/src/memos/mem_scheduler/utils/metrics.py b/src/memos/mem_scheduler/utils/metrics.py new file mode 100644 index 000000000..5155c98b3 --- /dev/null +++ b/src/memos/mem_scheduler/utils/metrics.py @@ -0,0 +1,250 @@ +# metrics.py +from __future__ import annotations + +import threading +import time + +from dataclasses import dataclass, field + + +# ==== global window config ==== +WINDOW_SEC = 120 # 2 minutes sliding window + + +# ---------- O(1) EWMA ---------- +class Ewma: + """ + Time-decayed EWMA: + """ + + __slots__ = ("alpha", "last_ts", "tau", "value") + + def __init__(self, alpha: float = 0.3, tau: float = WINDOW_SEC): + self.alpha = alpha + self.value = 0.0 + self.last_ts: float = time.time() + self.tau = max(1e-6, float(tau)) + + def _decay_to(self, now: float | None = None): + now = time.time() if now is None else now + dt = max(0.0, now - self.last_ts) + if dt <= 0: + return + from math import exp + + self.value *= exp(-dt / self.tau) + self.last_ts = now + + def update(self, instant: float, now: float | None = None): + self._decay_to(now) + self.value = self.alpha * instant + (1 - self.alpha) * self.value + + def value_at(self, now: float | None = None) -> float: + now = time.time() if now is None else now + dt = max(0.0, now - self.last_ts) + if dt <= 0: + return self.value + from math import exp + + return self.value * exp(-dt / self.tau) + + +# ---------- approximate P95(Reservoir sample) ---------- +class ReservoirP95: + __slots__ = ("_i", "buf", "k", "n", "window") + + def __init__(self, k: int = 512, window: float = WINDOW_SEC): + self.k = k + self.buf: list[tuple[float, float]] = [] # (value, ts) + self.n = 0 + self._i = 0 + self.window = float(window) + + def _gc(self, now: float): + win_start = now - self.window + self.buf = [p for p in self.buf if p[1] >= win_start] + if self.buf: + self._i %= len(self.buf) + else: + self._i = 0 + + def add(self, x: float, now: float | None = None): + now = time.time() if now is None else now + self._gc(now) + self.n += 1 + if len(self.buf) < self.k: + self.buf.append((x, now)) + return + self.buf[self._i] = (x, now) + self._i = (self._i + 1) % self.k + + def p95(self, now: float | None = None) -> float: + now = time.time() if now is None else now + self._gc(now) + if not self.buf: + return 0.0 + arr = sorted(v for v, _ in self.buf) + idx = int(0.95 * (len(arr) - 1)) + return arr[idx] + + +# ---------- Space-Saving Top-K ---------- +class SpaceSaving: + """only topK:add(key) O(1),query topk O(K log K)""" + + def __init__(self, k: int = 100): + self.k = k + self.cnt: dict[str, int] = {} + + def add(self, key: str): + if key in self.cnt: + self.cnt[key] += 1 + return + if len(self.cnt) < self.k: + self.cnt[key] = 1 + return + victim = min(self.cnt, key=self.cnt.get) + self.cnt[key] = self.cnt.pop(victim) + 1 + + def topk(self) -> list[tuple[str, int]]: + return sorted(self.cnt.items(), key=lambda kv: kv[1], reverse=True) + + +@dataclass +class KeyStats: + backlog: int = 0 + lambda_ewma: Ewma = field(default_factory=lambda: Ewma(0.3, WINDOW_SEC)) + mu_ewma: Ewma = field(default_factory=lambda: Ewma(0.3, WINDOW_SEC)) + wait_p95: ReservoirP95 = field(default_factory=lambda: ReservoirP95(512, WINDOW_SEC)) + last_ts: float = field(default_factory=time.time) + # last event timestamps for rate estimation + last_enqueue_ts: float | None = None + last_done_ts: float | None = None + + def snapshot(self, now: float | None = None) -> dict: + now = time.time() if now is None else now + lam = self.lambda_ewma.value_at(now) + mu = self.mu_ewma.value_at(now) + delta = mu - lam + eta = float("inf") if delta <= 1e-9 else self.backlog / delta + return { + "backlog": self.backlog, + "lambda": round(lam, 3), + "mu": round(mu, 3), + "delta": round(delta, 3), + "eta_sec": None if eta == float("inf") else round(eta, 1), + "wait_p95_sec": round(self.wait_p95.p95(now), 3), + } + + +class MetricsRegistry: + """ + metrics: + - 1st phase:label(must) + - 2nd phase:labelXmem_cube_id(only Top-K) + - on_enqueue(label, mem_cube_id) + - on_start(label, mem_cube_id, wait_sec) + - on_done(label, mem_cube_id) + """ + + def __init__(self, topk_per_label: int = 50): + self._lock = threading.RLock() + self._label_stats: dict[str, KeyStats] = {} + self._label_topk: dict[str, SpaceSaving] = {} + self._detail_stats: dict[tuple[str, str], KeyStats] = {} + self._topk_per_label = topk_per_label + + # ---------- helpers ---------- + def _get_label(self, label: str) -> KeyStats: + if label not in self._label_stats: + self._label_stats[label] = KeyStats() + self._label_topk[label] = SpaceSaving(self._topk_per_label) + return self._label_stats[label] + + def _get_detail(self, label: str, mem_cube_id: str) -> KeyStats | None: + # 只有 Top-K 的 mem_cube_id 才建细粒度 key + ss = self._label_topk[label] + if mem_cube_id in ss.cnt or len(ss.cnt) < ss.k: + key = (label, mem_cube_id) + if key not in self._detail_stats: + self._detail_stats[key] = KeyStats() + return self._detail_stats[key] + return None + + # ---------- events ---------- + def on_enqueue( + self, label: str, mem_cube_id: str, inst_rate: float = 1.0, now: float | None = None + ): + with self._lock: + now = time.time() if now is None else now + ls = self._get_label(label) + # derive instantaneous arrival rate from inter-arrival time (events/sec) + prev_ts = ls.last_enqueue_ts + dt = (now - prev_ts) if prev_ts is not None else None + inst_rate = (1.0 / max(1e-3, dt)) if dt is not None else 0.0 # first sample: no spike + ls.last_enqueue_ts = now + ls.backlog += 1 + old_lam = ls.lambda_ewma.value_at(now) + ls.lambda_ewma.update(inst_rate, now) + new_lam = ls.lambda_ewma.value_at(now) + print( + f"[DEBUG enqueue] {label} backlog={ls.backlog} dt={dt if dt is not None else '—'}s inst={inst_rate:.3f} λ {old_lam:.3f}→{new_lam:.3f}" + ) + self._label_topk[label].add(mem_cube_id) + ds = self._get_detail(label, mem_cube_id) + if ds: + prev_ts_d = ds.last_enqueue_ts + dt_d = (now - prev_ts_d) if prev_ts_d is not None else None + inst_rate_d = (1.0 / max(1e-3, dt_d)) if dt_d is not None else 0.0 + ds.last_enqueue_ts = now + ds.backlog += 1 + ds.lambda_ewma.update(inst_rate_d, now) + + def on_start(self, label: str, mem_cube_id: str, wait_sec: float, now: float | None = None): + with self._lock: + now = time.time() if now is None else now + ls = self._get_label(label) + ls.wait_p95.add(wait_sec, now) + ds = self._detail_stats.get((label, mem_cube_id)) + if ds: + ds.wait_p95.add(wait_sec, now) + + def on_done( + self, label: str, mem_cube_id: str, inst_rate: float = 1.0, now: float | None = None + ): + with self._lock: + now = time.time() if now is None else now + ls = self._get_label(label) + # derive instantaneous service rate from inter-completion time (events/sec) + prev_ts = ls.last_done_ts + dt = (now - prev_ts) if prev_ts is not None else None + inst_rate = (1.0 / max(1e-3, dt)) if dt is not None else 0.0 + ls.last_done_ts = now + if ls.backlog > 0: + ls.backlog -= 1 + old_mu = ls.mu_ewma.value_at(now) + ls.mu_ewma.update(inst_rate, now) + new_mu = ls.mu_ewma.value_at(now) + print( + f"[DEBUG done] {label} backlog={ls.backlog} dt={dt if dt is not None else '—'}s inst={inst_rate:.3f} μ {old_mu:.3f}→{new_mu:.3f}" + ) + ds = self._detail_stats.get((label, mem_cube_id)) + if ds: + prev_ts_d = ds.last_done_ts + dt_d = (now - prev_ts_d) if prev_ts_d is not None else None + inst_rate_d = (1.0 / max(1e-3, dt_d)) if dt_d is not None else 0.0 + ds.last_done_ts = now + if ds.backlog > 0: + ds.backlog -= 1 + ds.mu_ewma.update(inst_rate_d, now) + + # ---------- snapshots ---------- + def snapshot(self) -> dict: + with self._lock: + now = time.time() + by_label = {lbl: ks.snapshot(now) for lbl, ks in self._label_stats.items()} + heavy = {lbl: self._label_topk[lbl].topk() for lbl in self._label_topk} + details = {} + for (lbl, cube), ks in self._detail_stats.items(): + details.setdefault(lbl, {})[cube] = ks.snapshot(now) + return {"by_label": by_label, "heavy": heavy, "details": details} From 7db403c97490f470c40fb01ee87f5c76b20cade9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 30 Oct 2025 19:58:35 +0800 Subject: [PATCH 07/10] fix: bug --- src/memos/mem_scheduler/base_scheduler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 0381b2895..338dce7ac 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -138,6 +138,7 @@ def __init__(self, config: BaseSchedulerConfig): self.queue_monitor_crit_utilization: float = self.config.get( "queue_monitor_crit_utilization", 0.9 ) + self.enable_queue_monitor: bool = self.config.get("enable_queue_monitor", False) # other attributes self._context_lock = threading.Lock() From eba2f95bd81d797d216ce7e8f912059c34b9432c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 30 Oct 2025 20:18:28 +0800 Subject: [PATCH 08/10] fix: base scheduler bug --- src/memos/mem_scheduler/base_scheduler.py | 25 ----------------------- 1 file changed, 25 deletions(-) diff --git a/src/memos/mem_scheduler/base_scheduler.py b/src/memos/mem_scheduler/base_scheduler.py index 3870e4bfc..b3b457c36 100644 --- a/src/memos/mem_scheduler/base_scheduler.py +++ b/src/memos/mem_scheduler/base_scheduler.py @@ -49,7 +49,6 @@ from memos.memories.activation.kv import KVCacheMemory from memos.memories.activation.vllmkv import VLLMKVCacheItem, VLLMKVCacheMemory from memos.memories.textual.tree import TextualMemoryItem, TreeTextMemory -from memos.memos_tools.notification_utils import send_online_bot_notification from memos.templates.mem_scheduler_prompts import MEMORY_ASSEMBLY_TEMPLATE @@ -127,20 +126,6 @@ def __init__(self, config: BaseSchedulerConfig): "consume_interval_seconds", DEFAULT_CONSUME_INTERVAL_SECONDS ) - # queue monitor (optional) - self._queue_monitor_thread: threading.Thread | None = None - self._queue_monitor_running: bool = False - self.queue_monitor_interval_seconds: float = self.config.get( - "queue_monitor_interval_seconds", 60.0 - ) - self.queue_monitor_warn_utilization: float = self.config.get( - "queue_monitor_warn_utilization", 0.7 - ) - self.queue_monitor_crit_utilization: float = self.config.get( - "queue_monitor_crit_utilization", 0.9 - ) - self.enable_queue_monitor: bool = self.config.get("enable_queue_monitor", False) - # other attributes self._context_lock = threading.Lock() self.current_user_id: UserID | str | None = None @@ -712,13 +697,6 @@ def start(self) -> None: self._consumer_thread.start() logger.info("Message consumer thread started") - # optionally start queue monitor if enabled and bot callable present - if self.enable_queue_monitor and self._online_bot_callable is not None: - try: - self.start_queue_monitor(self._online_bot_callable) - except Exception as e: - logger.warning(f"Failed to start queue monitor: {e}") - def stop(self) -> None: """Stop all scheduler components gracefully. @@ -768,9 +746,6 @@ def stop(self) -> None: self._cleanup_queues() logger.info("Memory Scheduler stopped completely") - # Stop queue monitor - self.stop_queue_monitor() - @property def handlers(self) -> dict[str, Callable]: """ From 786ec3280cdb41d57013a39f7fc91731fd52b970 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Thu, 30 Oct 2025 20:40:08 +0800 Subject: [PATCH 09/10] fix: message schema bug --- src/memos/graph_dbs/polardb.py | 3 +++ src/memos/mem_scheduler/schemas/message_schemas.py | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/src/memos/graph_dbs/polardb.py b/src/memos/graph_dbs/polardb.py index f24f1072c..1d7dc06fc 100644 --- a/src/memos/graph_dbs/polardb.py +++ b/src/memos/graph_dbs/polardb.py @@ -2389,6 +2389,8 @@ def add_node( self, id: str, memory: str, metadata: dict[str, Any], user_name: str | None = None ) -> None: """Add a memory node to the graph.""" + logger.info(f"In add node polardb: id-{id} memory-{memory}") + # user_name comes from metadata; fallback to config if missing metadata["user_name"] = user_name if user_name else self.config.user_name @@ -2481,6 +2483,7 @@ def add_node( cursor.execute(insert_query, (id, json.dumps(properties))) logger.info(f"Added node {id} to graph '{self.db_name}_graph'.") finally: + logger.info(f"In add node polardb: id-{id} memory-{memory} query-{insert_query}") self._return_connection(conn) def _build_node_from_agtype(self, node_agtype, embedding=None): diff --git a/src/memos/mem_scheduler/schemas/message_schemas.py b/src/memos/mem_scheduler/schemas/message_schemas.py index 9cdb6823d..7f328474f 100644 --- a/src/memos/mem_scheduler/schemas/message_schemas.py +++ b/src/memos/mem_scheduler/schemas/message_schemas.py @@ -46,6 +46,10 @@ class ScheduleMessageItem(BaseModel, DictConversionMixin): default=None, description="user name / display name (optional)", ) + session_id: str | None = Field( + default=None, + description="session_id (optional)", + ) # Pydantic V2 model configuration model_config = ConfigDict( From b867a9457ea6efe7b2b699b3271ce3a3ffe19f7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B8=AD=E9=98=B3=E9=98=B3?= Date: Fri, 31 Oct 2025 10:45:22 +0800 Subject: [PATCH 10/10] feat: pref mode: add fast --- src/memos/memories/textual/prefer_text_memory/adder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/memos/memories/textual/prefer_text_memory/adder.py b/src/memos/memories/textual/prefer_text_memory/adder.py index 8d00ae81d..c8eea3cd4 100644 --- a/src/memos/memories/textual/prefer_text_memory/adder.py +++ b/src/memos/memories/textual/prefer_text_memory/adder.py @@ -326,7 +326,7 @@ def _process_single_memory(self, memory: TextualMemoryItem) -> list[str] | str | search_results.sort(key=lambda x: x.score, reverse=True) return self._update_memory( - memory, search_results, collection_name, preference_type, update_mode="fine" + memory, search_results, collection_name, preference_type, update_mode="fast" ) except Exception as e: