-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_ingest.py
More file actions
30 lines (25 loc) · 810 Bytes
/
test_ingest.py
File metadata and controls
30 lines (25 loc) · 810 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import threading
import queue
import time
from apex.plugins.MemoryPlus.plugin import Plugin
# Isolate the ingestion loop
plugin = Plugin()
plugin.ingest_queue = queue.Queue(50)
plugin.ingest_stop_event = threading.Event()
# Start the worker
threading.Thread(target=plugin._ingest_loop, daemon=True).start()
# Stress test: 10 threads enqueueing
def worker(worker_id):
for i in range(100):
try:
plugin.ingest_queue.put((f"Test-{worker_id}-{i}", f"Content-{worker_id}-{i}", "Chatbot"), block=False)
except queue.Full:
pass
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
# Stop the worker
plugin.ingest_stop_event.set()
print("✅ Ingestion loop thread safety test passed.")