Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rcljava/include/org_ros2_rcljava_executors_BaseExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetInit(
JNIEXPORT void
JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeDisposeWaitSet(JNIEnv *, jclass, jlong);

/*
* Class: org_ros2_rcljava_executors_BaseExecutor
* Method: nativeWaitSetResize
* Signature: (JIIIIII)V
*/
JNIEXPORT void
JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetResize(
JNIEnv *, jclass, jlong, jint, jint, jint, jint, jint, jint);

/*
* Class: org_ros2_rcljava_executors_BaseExecutor
* Method: nativeWaitSetClear
Expand Down
18 changes: 18 additions & 0 deletions rcljava/src/main/cpp/org_ros2_rcljava_executors_BaseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,24 @@ Java_org_ros2_rcljava_executors_BaseExecutor_nativeDisposeWaitSet(
}
}

JNIEXPORT void
JNICALL Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetResize(
JNIEnv * env, jclass, jlong wait_set_handle, jint number_of_subscriptions,
jint number_of_guard_conditions, jint number_of_timers, jint number_of_clients,
jint number_of_services, jint number_of_events)
{
rcl_wait_set_t * wait_set = reinterpret_cast<rcl_wait_set_t *>(wait_set_handle);

rcl_ret_t ret = rcl_wait_set_resize(
wait_set, number_of_subscriptions, number_of_guard_conditions, number_of_timers,
number_of_clients, number_of_services, number_of_events);
if (ret != RCL_RET_OK) {
std::string msg = "Failed to resize wait set: " + std::string(rcl_get_error_string().str);
rcl_reset_error();
rcljava_throw_rclexception(env, ret, msg);
}
}

JNIEXPORT void JNICALL
Java_org_ros2_rcljava_executors_BaseExecutor_nativeWaitSetClear(
JNIEnv * env, jclass, jlong wait_set_handle)
Expand Down
189 changes: 91 additions & 98 deletions rcljava/src/main/java/org/ros2/rcljava/executors/BaseExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

import javax.swing.Action;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -76,6 +77,20 @@ public class BaseExecutor {

private List<Map.Entry<Long, ActionServer>> actionServerHandles = new ArrayList<Map.Entry<Long, ActionServer>>();

private long waitSetHandle = 0;

public BaseExecutor() {
this.waitSetHandle = nativeGetZeroInitializedWaitSet();
long contextHandle = RCLJava.getDefaultContext().getHandle();
nativeWaitSetInit(
this.waitSetHandle, contextHandle, 0, 0,
0, 0, 0, 0);
}

public void dispose() {
nativeDisposeWaitSet(this.waitSetHandle);
}

protected void addNode(ComposableNode node) {
this.nodes.add(node);
}
Expand Down Expand Up @@ -225,38 +240,28 @@ protected void waitForWork(long timeout) {
}
}

int subscriptionsSize = 0;
int timersSize = 0;
int clientsSize = 0;
int servicesSize = 0;
int subscriptionsSize = this.subscriptionHandles.size();
int timersSize = this.timerHandles.size();
int clientsSize = this.clientHandles.size();
int servicesSize = this.serviceHandles.size();
int eventsSize = this.eventHandles.size();

for (ComposableNode node : this.nodes) {
subscriptionsSize += node.getNode().getSubscriptions().size();
timersSize += node.getNode().getTimers().size();
clientsSize += node.getNode().getClients().size();
servicesSize += node.getNode().getServices().size();

for (ActionServer actionServer : node.getNode().getActionServers()) {
subscriptionsSize += actionServer.getNumberOfSubscriptions();
timersSize += actionServer.getNumberOfTimers();
clientsSize += actionServer.getNumberOfClients();
servicesSize += actionServer.getNumberOfServices();
}
for (Map.Entry<Long, ActionServer> entry : this.actionServerHandles) {
ActionServer actionServer = entry.getValue();
subscriptionsSize += actionServer.getNumberOfSubscriptions();
timersSize += actionServer.getNumberOfTimers();
clientsSize += actionServer.getNumberOfClients();
servicesSize += actionServer.getNumberOfServices();
}

if (subscriptionsSize == 0 && timersSize == 0 && clientsSize == 0 && servicesSize == 0) {
return;
}

long waitSetHandle = nativeGetZeroInitializedWaitSet();
long contextHandle = RCLJava.getDefaultContext().getHandle();
nativeWaitSetInit(
waitSetHandle, contextHandle, subscriptionsSize, 0,
long waitSetHandle = this.waitSetHandle;
nativeWaitSetResize(
waitSetHandle, subscriptionsSize, 0,
timersSize, clientsSize, servicesSize, eventsSize);

nativeWaitSetClear(waitSetHandle);

for (Map.Entry<Long, Subscription> entry : this.subscriptionHandles) {
nativeWaitSetAddSubscription(waitSetHandle, entry.getKey());
}
Expand All @@ -282,93 +287,70 @@ protected void waitForWork(long timeout) {
}

nativeWait(waitSetHandle, timeout);

for (int i = 0; i < this.subscriptionHandles.size(); ++i) {
if (!nativeWaitSetSubscriptionIsReady(waitSetHandle, i)) {
this.subscriptionHandles.get(i).setValue(null);
}
}

for (int i = 0; i < this.timerHandles.size(); ++i) {
if (!nativeWaitSetTimerIsReady(waitSetHandle, i)) {
this.timerHandles.get(i).setValue(null);
}
}

for (int i = 0; i < this.serviceHandles.size(); ++i) {
if (!nativeWaitSetServiceIsReady(waitSetHandle, i)) {
this.serviceHandles.get(i).setValue(null);
}
}

for (int i = 0; i < this.clientHandles.size(); ++i) {
if (!nativeWaitSetClientIsReady(waitSetHandle, i)) {
this.clientHandles.get(i).setValue(null);
}
}

for (int i = 0; i < this.eventHandles.size(); ++i) {
if (!nativeWaitSetEventIsReady(waitSetHandle, i)) {
this.eventHandles.get(i).setValue(null);
}
}

for (Map.Entry<Long, ActionServer> entry : this.actionServerHandles) {
if (!entry.getValue().isReady(waitSetHandle)) {
entry.setValue(null);
}
}

Iterator<Map.Entry<Long, Subscription>> subscriptionIterator =
this.subscriptionHandles.iterator();
while (subscriptionIterator.hasNext()) {
Map.Entry<Long, Subscription> entry = subscriptionIterator.next();
if (entry.getValue() == null) {
subscriptionIterator.remove();
{
int waitSetIndex = 0;
Iterator<Map.Entry<Long, Subscription>> it = this.subscriptionHandles.iterator();
while (it.hasNext()) {
it.next();
if (!nativeWaitSetSubscriptionIsReady(waitSetHandle, waitSetIndex)) {
it.remove();
}
++waitSetIndex;
}
}

Iterator<Map.Entry<Long, Timer>> timerIterator = this.timerHandles.iterator();
while (timerIterator.hasNext()) {
Map.Entry<Long, Timer> entry = timerIterator.next();
if (entry.getValue() == null) {
timerIterator.remove();
{
int waitSetIndex = 0;
Iterator<Map.Entry<Long, Timer>> it = this.timerHandles.iterator();
while (it.hasNext()) {
it.next();
if (!nativeWaitSetTimerIsReady(waitSetHandle, waitSetIndex)) {
it.remove();
}
++waitSetIndex;
}
}

Iterator<Map.Entry<Long, Service>> serviceIterator = this.serviceHandles.iterator();
while (serviceIterator.hasNext()) {
Map.Entry<Long, Service> entry = serviceIterator.next();
if (entry.getValue() == null) {
serviceIterator.remove();
{
int waitSetIndex = 0;
Iterator<Map.Entry<Long, Service>> it = this.serviceHandles.iterator();
while (it.hasNext()) {
it.next();
if (!nativeWaitSetServiceIsReady(waitSetHandle, waitSetIndex)) {
it.remove();
}
++waitSetIndex;
}
}

Iterator<Map.Entry<Long, Client>> clientIterator = this.clientHandles.iterator();
while (clientIterator.hasNext()) {
Map.Entry<Long, Client> entry = clientIterator.next();
if (entry.getValue() == null) {
clientIterator.remove();
{
int waitSetIndex = 0;
Iterator<Map.Entry<Long, Client>> it = this.clientHandles.iterator();
while (it.hasNext()) {
it.next();
if (!nativeWaitSetClientIsReady(waitSetHandle, waitSetIndex)) {
it.remove();
}
++waitSetIndex;
}
}

Iterator<Map.Entry<Long, EventHandler>> eventIterator = this.eventHandles.iterator();
while (eventIterator.hasNext()) {
Map.Entry<Long, EventHandler> entry = eventIterator.next();
if (entry.getValue() == null) {
eventIterator.remove();
{
int waitSetIndex = 0;
Iterator<Map.Entry<Long, EventHandler>> it = this.eventHandles.iterator();
while (it.hasNext()) {
it.next();
if (!nativeWaitSetEventIsReady(waitSetHandle, waitSetIndex)) {
it.remove();
}
++waitSetIndex;
}
}

Iterator<Map.Entry<Long, ActionServer>> actionServerIterator = this.actionServerHandles.iterator();
while (actionServerIterator.hasNext()) {
Map.Entry<Long, ActionServer> entry = actionServerIterator.next();
if (entry.getValue() == null) {
actionServerIterator.remove();
{
Iterator<Map.Entry<Long, ActionServer>> it = this.actionServerHandles.iterator();
while (it.hasNext()) {
Map.Entry<Long, ActionServer> entry = it.next();
if (!entry.getValue().isReady(waitSetHandle)) {
it.remove();
}
}
}

nativeDisposeWaitSet(waitSetHandle);
}

protected AnyExecutable getNextExecutable() {
Expand Down Expand Up @@ -463,6 +445,9 @@ public void spinUntilComplete(Future future, long maxDurationNs) {
anyExecutable = getNextExecutable();
}
}
if (!RCLJava.ok()) {
this.dispose();
}
}

private void spinSomeImpl(long maxDurationNs, boolean exhaustive) {
Expand All @@ -483,6 +468,9 @@ private void spinSomeImpl(long maxDurationNs, boolean exhaustive) {
workAvailable = false;
}
}
if (!RCLJava.ok()) {
this.dispose();
}
}

protected void spinSome(long maxDurationNs) {
Expand Down Expand Up @@ -513,6 +501,11 @@ private static native void nativeWaitSetInit(
long waitSetHandle, long contextHandle, int numberOfSubscriptions,
int numberOfGuardConditions, int numberOfTimers, int numberOfClients,
int numberOfServices, int numberOfEvents);

private static native void nativeWaitSetResize(
long waitSetHandle, int numberOfSubscriptions,
int numberOfGuardConditions, int numberOfTimers, int numberOfClients,
int numberOfServices, int numberOfEvents);

private static native void nativeWaitSetClear(long waitSetHandle);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,8 @@ private void run() {
this.spinOnce();
}
}
if (!RCLJava.ok()) {
this.baseExecutor.dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,8 @@ public void spin() {
while (RCLJava.ok()) {
this.spinOnce();
}
if (!RCLJava.ok()) {
this.baseExecutor.dispose();
}
}
}