Skip to content

Commit e3efe47

Browse files
committed
mod: rem atomic flags, increased udp buffer size
feat: implemented remaining methods impr: threading
1 parent 9fde055 commit e3efe47

File tree

1 file changed

+136
-53
lines changed

1 file changed

+136
-53
lines changed

src/udp_bridge.h

Lines changed: 136 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -20,89 +20,84 @@ This file is part of the Arduino_RouterBridge library.
2020
#define UDP_WRITE_METHOD "udp/write"
2121
#define UDP_READ_METHOD "udp/read"
2222

23-
#include <zephyr/sys/atomic.h>
2423
#include <api/Udp.h>
2524

25+
#define DEFAULT_UDP_BUF_SIZE 65507
26+
27+
28+
template<size_t BufferSize=DEFAULT_UDP_BUF_SIZE>
2629
class BridgeUDP final: public UDP {
2730

2831
BridgeClass* bridge;
2932
uint32_t connection_id{};
3033
RingBufferN<BufferSize> temp_buffer;
3134
struct k_mutex udp_mutex{};
32-
atomic_t _connected;
35+
bool _connected = false;
3336

34-
uint16_t _port; // local port to listen on
37+
uint16_t _port{}; // local port to listen on
3538

36-
//IPAddress _remoteIP; // remote IP address for the incoming packet whilst it's being processed
37-
String _remoteHost;
38-
uint16_t _remotePort; // remote port for the incoming packet whilst it's being processed
39-
uint16_t _offset; // offset into the packet being sent
40-
uint16_t _remaining; // remaining bytes of incoming packet yet to be processed
39+
// Outbound packets target
40+
String _targetHost{};
41+
uint16_t _targetPort{};
42+
43+
// Inbound packet info
44+
IPAddress _remoteIP{}; // remote IP address for the incoming packet whilst it's being processed
45+
uint16_t _remotePort{}; // remote port for the incoming packet whilst it's being processed
46+
uint16_t _remaining{}; // remaining bytes of incoming packet yet to be processed
4147

4248
public:
4349

4450
explicit BridgeUDP(BridgeClass& bridge): bridge(&bridge) {}
4551

4652
uint8_t begin(uint16_t port) override {
4753

48-
if (connected()) return 1;
49-
5054
if (!init()) {
5155
return 0;
5256
}
5357

5458
k_mutex_lock(&udp_mutex, K_FOREVER);
5559

56-
String hostname = "0.0.0.0";
57-
const bool resp = bridge->call(UDP_CONNECT_METHOD, connection_id, hostname, port);
58-
59-
if (!resp) {
60-
atomic_set(&_connected, 0);
60+
if (_connected) {
6161
k_mutex_unlock(&udp_mutex);
62-
return 0;
62+
return 1;
6363
}
64-
atomic_set(&_connected, 1);
6564

66-
_port = port;
65+
String hostname = "0.0.0.0";
66+
const bool ok = bridge->call(UDP_CONNECT_METHOD, hostname, port).result(connection_id);
67+
_connected = ok;
68+
if (_connected) _port = port;
6769
k_mutex_unlock(&udp_mutex);
6870

69-
return 1;
71+
return ok? 1 : 0;
7072
}
7173

7274
uint8_t beginMulticast(IPAddress ip, uint16_t port) override {
7375

74-
if (connected()) return 1;
75-
7676
if (!init()) {
7777
return 0;
7878
}
7979

8080
k_mutex_lock(&udp_mutex, K_FOREVER);
8181

82-
String hostname = ip.toString();
83-
const bool resp = bridge->call(UDP_CONNECT_MULTI_METHOD, connection_id, hostname, port);
84-
85-
if (!resp) {
86-
atomic_set(&_connected, 0);
82+
if (_connected) {
8783
k_mutex_unlock(&udp_mutex);
88-
return 0;
84+
return 1;
8985
}
90-
atomic_set(&_connected, 1);
9186

92-
_port = port;
87+
String hostname = ip.toString();
88+
const bool ok = bridge->call(UDP_CONNECT_MULTI_METHOD, hostname, port).result(connection_id);
89+
_connected = ok;
90+
if (_connected) _port = port;
9391
k_mutex_unlock(&udp_mutex);
9492

95-
return 1;
93+
return ok? 1 : 0;
9694
}
9795

9896
void stop() override {
9997
k_mutex_lock(&udp_mutex, K_FOREVER);
10098

10199
String msg;
102-
const bool resp = bridge->call(UDP_CLOSE_METHOD, msg, connection_id);
103-
if (resp) {
104-
atomic_set(&_connected, 0);
105-
}
100+
_connected = !bridge->call(UDP_CLOSE_METHOD, connection_id).result(msg);
106101

107102
k_mutex_unlock(&udp_mutex);
108103
}
@@ -115,25 +110,68 @@ class BridgeUDP final: public UDP {
115110

116111
k_mutex_lock(&udp_mutex, K_FOREVER);
117112

118-
_remoteHost = host;
119-
_remotePort = port;
113+
_targetHost = host;
114+
_targetPort = port;
120115

121116
k_mutex_unlock(&udp_mutex);
122117

123118
return 1;
124119
}
125120

126-
int endPacket() override;
121+
int endPacket() override {
122+
k_mutex_lock(&udp_mutex, K_FOREVER);
123+
124+
_targetHost = "";
125+
_targetPort = 0;
126+
127+
k_mutex_unlock(&udp_mutex);
128+
return 1;
129+
}
127130

128131
size_t write(uint8_t c) override {
129132
return write(&c, 1);
130133
}
131134

132-
size_t write(const uint8_t *buffer, size_t size) override;
135+
size_t write(const uint8_t *buffer, size_t size) override {
136+
137+
if (!connected()) return 0;
138+
139+
MsgPack::arr_t<uint8_t> payload;
140+
141+
for (size_t i = 0; i < size; ++i) {
142+
payload.push_back(buffer[i]);
143+
}
144+
145+
size_t written;
146+
const bool ok = bridge->call(UDP_WRITE_METHOD, connection_id, _targetHost, _targetPort, payload).result(written);
147+
return ok? written : 0;
148+
}
133149

134150
using Print::write;
135151

136-
int parsePacket() override;
152+
int parsePacket() override {
153+
k_mutex_lock(&udp_mutex, K_FOREVER);
154+
155+
while (_remaining) read();
156+
157+
int out = 0;
158+
if (available() >= 8) {
159+
uint8_t tmpBuf[8];
160+
for (size_t i = 0; i < 8; ++i) {
161+
tmpBuf[i] = temp_buffer.read_char();
162+
}
163+
_remoteIP = tmpBuf;
164+
_remotePort = tmpBuf[4];
165+
_remotePort = (_remotePort << 8) + tmpBuf[5];
166+
_remaining = tmpBuf[6];
167+
_remaining = (_remaining << 8) + tmpBuf[7];
168+
out = _remaining;
169+
}
170+
171+
k_mutex_unlock(&udp_mutex);
172+
173+
return out;
174+
}
137175

138176
int available() override {
139177
k_mutex_lock(&udp_mutex, K_FOREVER);
@@ -144,22 +182,66 @@ class BridgeUDP final: public UDP {
144182
return _available;
145183
}
146184

147-
int read() override;
185+
int read() override {
186+
uint8_t c;
187+
read(&c, 1);
188+
return c;
189+
}
148190

149-
int read(unsigned char *buffer, size_t len) override;
191+
int read(unsigned char *buffer, size_t len) override {
192+
k_mutex_lock(&udp_mutex, K_FOREVER);
193+
int i = 0;
194+
while (temp_buffer.available() && i < len) {
195+
buffer[i++] = temp_buffer.read_char();
196+
_remaining--;
197+
}
198+
k_mutex_unlock(&udp_mutex);
199+
return i;
200+
}
150201

151-
int read(char *buffer, size_t len) override;
202+
int read(char *buffer, size_t len) override {
203+
k_mutex_lock(&udp_mutex, K_FOREVER);
204+
int i = 0;
205+
while (temp_buffer.available() && i < len) {
206+
buffer[i++] = static_cast<char>(temp_buffer.read_char());
207+
_remaining--;
208+
}
209+
k_mutex_unlock(&udp_mutex);
210+
return i;
211+
}
152212

153-
int peek() override;
213+
int peek() override {
214+
k_mutex_lock(&udp_mutex, K_FOREVER);
215+
int out = 0;
216+
if (!_connected || _remaining == 0) out = -1;
217+
out = temp_buffer.peek();
218+
k_mutex_unlock(&udp_mutex);
219+
return out;
220+
}
154221

155-
void flush() override;
222+
void flush() override {
223+
// Implemented only when there's a TX buffer
224+
}
156225

157-
IPAddress remoteIP() override;
226+
IPAddress remoteIP() override {
227+
k_mutex_lock(&udp_mutex, K_FOREVER);
228+
const IPAddress ip = _remoteIP;
229+
k_mutex_unlock(&udp_mutex);
230+
return ip;
231+
}
158232

159-
uint16_t remotePort() override;
233+
uint16_t remotePort() override {
234+
k_mutex_lock(&udp_mutex, K_FOREVER);
235+
const uint16_t port = _remotePort;
236+
k_mutex_unlock(&udp_mutex);
237+
return port;
238+
}
160239

161-
bool connected() const {
162-
return atomic_get(&_connected) > 0;
240+
bool connected() {
241+
k_mutex_lock(&udp_mutex, K_FOREVER);
242+
const bool ok = _connected;
243+
k_mutex_unlock(&udp_mutex);
244+
return ok;
163245
}
164246

165247
private:
@@ -174,21 +256,22 @@ class BridgeUDP final: public UDP {
174256

175257
void _read(size_t size) {
176258

177-
if (size == 0 || !connected()) return;
178-
179259
k_mutex_lock(&udp_mutex, K_FOREVER);
180260

261+
if (size == 0 || !_connected) return;
262+
181263
MsgPack::arr_t<uint8_t> message;
182-
const bool ret = bridge->call(TCP_READ_METHOD, message, connection_id, size);
264+
RpcResult async_res = bridge->call(TCP_READ_METHOD, connection_id, size);
265+
const bool ret = async_res.result(message);
183266

184267
if (ret) {
185268
for (size_t i = 0; i < message.size(); ++i) {
186269
temp_buffer.store_char(static_cast<char>(message[i]));
187270
}
188271
}
189272

190-
if (bridge->get_last_client_error().code > NO_ERR) {
191-
atomic_set(&_connected, 0);
273+
if (async_res.error.code > NO_ERR) {
274+
_connected = false;
192275
}
193276

194277
k_mutex_unlock(&udp_mutex);

0 commit comments

Comments
 (0)