Skip to content

Commit 2c9e1fb

Browse files
authored
Merge pull request #167 from saksham-gera/dev
Add ZeroMQ as a First-Class Communication Backend in concore
2 parents a5f5610 + 9c1c4ca commit 2c9e1fb

File tree

88 files changed

+13781
-602
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

88 files changed

+13781
-602
lines changed

0mq/comm_node.dir/concore2.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
import time
2+
import os
3+
from ast import literal_eval
4+
import sys
5+
import re
6+
import zmq # Added for ZeroMQ
7+
8+
#if windows, create script to kill this process
9+
# because batch files don't provide easy way to know pid of last command
10+
# ignored for posix!=windows, because "concorepid" is handled by script
11+
# ignored for docker (linux!=windows), because handled by docker stop
12+
if hasattr(sys, 'getwindowsversion'):
13+
with open("concorekill.bat","w") as fpid:
14+
fpid.write("taskkill /F /PID "+str(os.getpid())+"\n")
15+
16+
# --- ZeroMQ Integration Start ---
17+
class ZeroMQPort:
18+
def __init__(self, port_type, address, zmq_socket_type):
19+
self.context = zmq.Context()
20+
self.socket = self.context.socket(zmq_socket_type)
21+
self.port_type = port_type # "bind" or "connect"
22+
self.address = address
23+
if self.port_type == "bind":
24+
self.socket.bind(address)
25+
print(f"ZMQ Port bound to {address}")
26+
else:
27+
self.socket.connect(address)
28+
print(f"ZMQ Port connected to {address}")
29+
30+
# Global ZeroMQ ports registry
31+
zmq_ports = {}
32+
33+
def init_zmq_port(port_name, port_type, address, socket_type_str):
34+
"""
35+
Initializes and registers a ZeroMQ port.
36+
port_name (str): A unique name for this ZMQ port.
37+
port_type (str): "bind" or "connect".
38+
address (str): The ZMQ address (e.g., "tcp://*:5555", "tcp://localhost:5555").
39+
socket_type_str (str): String representation of ZMQ socket type (e.g., "REQ", "REP", "PUB", "SUB").
40+
"""
41+
if port_name in zmq_ports:
42+
print(f"ZMQ Port {port_name} already initialized.")
43+
return # Avoid reinitialization
44+
45+
try:
46+
# Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP)
47+
zmq_socket_type = getattr(zmq, socket_type_str.upper())
48+
zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type)
49+
print(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}")
50+
except AttributeError:
51+
print(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.")
52+
except zmq.error.ZMQError as e:
53+
print(f"Error initializing ZMQ port {port_name} on {address}: {e}")
54+
except Exception as e:
55+
print(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")
56+
57+
def terminate_zmq():
58+
for port in zmq_ports.values():
59+
try:
60+
port.socket.close()
61+
port.context.term()
62+
except Exception as e:
63+
print(f"Error while terminating ZMQ port {port.address}: {e}")
64+
# --- ZeroMQ Integration End ---
65+
66+
def safe_literal_eval(filename, defaultValue):
67+
try:
68+
with open(filename, "r") as file:
69+
return literal_eval(file.read())
70+
except (FileNotFoundError, SyntaxError, ValueError, Exception) as e:
71+
# Keep print for debugging, but can be made quieter
72+
# print(f"Info: Error reading {filename} or file not found, using default: {e}")
73+
return defaultValue
74+
75+
iport = safe_literal_eval("concore.iport", {})
76+
oport = safe_literal_eval("concore.oport", {})
77+
78+
s = ''
79+
olds = ''
80+
delay = 1
81+
retrycount = 0
82+
inpath = "./in" #must be rel path for local
83+
outpath = "./out"
84+
simtime = 0
85+
86+
#9/21/22
87+
try:
88+
sparams_path = os.path.join(inpath + "1", "concore.params")
89+
if os.path.exists(sparams_path):
90+
with open(sparams_path, "r") as f:
91+
sparams = f.read()
92+
if sparams: # Ensure sparams is not empty
93+
if sparams[0] == '"' and sparams[-1] == '"': #windows keeps "" need to remove
94+
sparams = sparams[1:-1]
95+
if sparams != '{' and not (sparams.startswith('{') and sparams.endswith('}')): # Check if it needs conversion
96+
print("converting sparams: "+sparams)
97+
sparams = "{'"+re.sub(';',",'",re.sub('=',"':",re.sub(' ','',sparams)))+"}"
98+
print("converted sparams: " + sparams)
99+
try:
100+
params = literal_eval(sparams)
101+
except Exception as e:
102+
print(f"bad params content: {sparams}, error: {e}")
103+
params = dict()
104+
else:
105+
params = dict()
106+
else:
107+
params = dict()
108+
except Exception as e:
109+
# print(f"Info: concore.params not found or error reading, using empty dict: {e}")
110+
params = dict()
111+
112+
#9/30/22
113+
def tryparam(n, i):
114+
return params.get(n, i)
115+
116+
117+
#9/12/21
118+
def default_maxtime(default):
119+
global maxtime
120+
maxtime_path = os.path.join(inpath + "1", "concore.maxtime")
121+
maxtime = safe_literal_eval(maxtime_path, default)
122+
123+
default_maxtime(100)
124+
125+
def unchanged():
126+
global olds, s
127+
if olds == s:
128+
s = ''
129+
return True
130+
olds = s
131+
return False
132+
133+
def read(port_identifier, name, initstr_val):
134+
global s, simtime, retrycount
135+
136+
default_return_val = initstr_val
137+
if isinstance(initstr_val, str):
138+
try:
139+
default_return_val = literal_eval(initstr_val)
140+
except (SyntaxError, ValueError):
141+
pass
142+
143+
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
144+
zmq_p = zmq_ports[port_identifier]
145+
try:
146+
message = zmq_p.socket.recv_json()
147+
return message
148+
except zmq.error.ZMQError as e:
149+
print(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
150+
return default_return_val
151+
except Exception as e:
152+
print(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.")
153+
return default_return_val
154+
155+
try:
156+
file_port_num = int(port_identifier)
157+
except ValueError:
158+
print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
159+
return default_return_val
160+
161+
time.sleep(delay)
162+
file_path = os.path.join(inpath+str(file_port_num), name)
163+
ins = ""
164+
165+
try:
166+
with open(file_path, "r") as infile:
167+
ins = infile.read()
168+
except FileNotFoundError:
169+
ins = str(initstr_val)
170+
except Exception as e:
171+
print(f"Error reading {file_path}: {e}. Using default value.")
172+
return default_return_val
173+
174+
attempts = 0
175+
max_retries = 5
176+
while len(ins) == 0 and attempts < max_retries:
177+
time.sleep(delay)
178+
try:
179+
with open(file_path, "r") as infile:
180+
ins = infile.read()
181+
except Exception as e:
182+
print(f"Retry {attempts + 1}: Error reading {file_path} - {e}")
183+
attempts += 1
184+
retrycount += 1
185+
186+
if len(ins) == 0:
187+
print(f"Max retries reached for {file_path}, using default value.")
188+
return default_return_val
189+
190+
s += ins
191+
try:
192+
inval = literal_eval(ins)
193+
if isinstance(inval, list) and len(inval) > 0:
194+
current_simtime_from_file = inval[0]
195+
if isinstance(current_simtime_from_file, (int, float)):
196+
simtime = max(simtime, current_simtime_from_file)
197+
return inval[1:]
198+
else:
199+
print(f"Warning: Unexpected data format in {file_path}: {ins}. Returning raw content or default.")
200+
return inval
201+
except Exception as e:
202+
print(f"Error parsing content from {file_path} ('{ins}'): {e}. Returning default.")
203+
return default_return_val
204+
205+
206+
def write(port_identifier, name, val, delta=0):
207+
global simtime
208+
209+
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
210+
zmq_p = zmq_ports[port_identifier]
211+
try:
212+
zmq_p.socket.send_json(val)
213+
except zmq.error.ZMQError as e:
214+
print(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
215+
except Exception as e:
216+
print(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}")
217+
return
218+
try:
219+
if isinstance(port_identifier, str) and port_identifier in zmq_ports:
220+
file_path = os.path.join("../"+port_identifier, name)
221+
else:
222+
file_port_num = int(port_identifier)
223+
file_path = os.path.join(outpath+str(file_port_num), name)
224+
except ValueError:
225+
print(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
226+
return
227+
228+
if isinstance(val, str):
229+
time.sleep(2 * delay)
230+
elif not isinstance(val, list):
231+
print(f"File write to {file_path} must have list or str value, got {type(val)}")
232+
return
233+
234+
try:
235+
with open(file_path, "w") as outfile:
236+
if isinstance(val, list):
237+
data_to_write = [simtime + delta] + val
238+
outfile.write(str(data_to_write))
239+
simtime += delta
240+
else:
241+
outfile.write(val)
242+
except Exception as e:
243+
print(f"Error writing to {file_path}: {e}")
244+
245+
def initval(simtime_val_str):
246+
global simtime
247+
try:
248+
val = literal_eval(simtime_val_str)
249+
if isinstance(val, list) and len(val) > 0:
250+
first_element = val[0]
251+
if isinstance(first_element, (int, float)):
252+
simtime = first_element
253+
return val[1:]
254+
else:
255+
print(f"Error: First element in initval string '{simtime_val_str}' is not a number. Using data part as is or empty.")
256+
return val[1:] if len(val) > 1 else []
257+
else:
258+
print(f"Error: initval string '{simtime_val_str}' is not a list or is empty. Returning empty list.")
259+
return []
260+
261+
except Exception as e:
262+
print(f"Error parsing simtime_val_str '{simtime_val_str}': {e}. Returning empty list.")
263+
return []

0mq/comm_node.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import concore
2+
import concore2
3+
4+
concore.delay = 0.07
5+
concore2.delay = 0.07
6+
concore2.inpath = concore.inpath
7+
concore2.outpath = concore.outpath
8+
concore2.simtime = 0
9+
concore.default_maxtime(100)
10+
init_simtime_u = "[0.0, 0.0, 0.0]"
11+
init_simtime_ym = "[0.0, 0.0, 0.0]"
12+
13+
u = concore.initval(init_simtime_u)
14+
ym = concore2.initval(init_simtime_ym)
15+
while(concore2.simtime<concore.maxtime):
16+
while concore.unchanged():
17+
u = concore.read(concore.iport['U'],"u",init_simtime_u)
18+
concore.write(concore.oport['U1'],"u",u)
19+
print(u)
20+
old2 = concore2.simtime
21+
while concore2.unchanged() or concore2.simtime <= old2:
22+
ym = concore2.read(concore.iport['Y1'],"ym",init_simtime_ym)
23+
concore2.write(concore.oport['Y'],"ym",ym)
24+
print("funbody u="+str(u)+" ym="+str(ym)+" time="+str(concore2.simtime))
25+
print("retry="+str(concore.retrycount))

0 commit comments

Comments
 (0)