Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions rr_asyncio_demo.py → demo/rr_asyncio_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@

"""

import rossros_asyncio as rr
# Add parent directory to Python path
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))


import logging
import time
import math
import time

import rossros_asyncio as rr

# logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.INFO)
Expand Down
11 changes: 9 additions & 2 deletions rr_demo.py → demo/rr_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@

"""

import rossros as rr
# Add parent directory to Python path
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))


import logging
import time
import math
import time

import rossros as rr

# logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.INFO)
Expand Down
72 changes: 72 additions & 0 deletions demo/rr_networking_node1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/python3
"""
This file (in conjunction with rr_networking_node2.py) demonstrates basic communication
between two RossROS network nodes. This node (node1) generates and publishes a square
wave signal and a sawtooth signal to the network.
"""

# Add parent directory to Python path
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))


import asyncio
import logging
import math
import time

import rossros_asyncio as rr
import rossros_networking as rrn

logging.getLogger().setLevel(logging.INFO)

async def node1():

# Start first node
node = rrn.Server('0.0.0.0', 9800)
loop = asyncio.get_event_loop()
loop.create_task(node.run())


# Create two signal-generation functions

# This function outputs a square wave that steps between +/- 1
# every second
def square():
return (2 * math.floor(time.time() % 2)) - 1


# This function counts up from zero to 1 every second
def sawtooth():
return time.time() % 1 # "%" is the modulus operator


# Initiate data and termination busses
bSquare = rrn.NetBus(node, "/square_wave", square())
bSawtooth = rrn.NetBus(node, "/sawtooth_wave", sawtooth())


# Wrap the square wave signal generator into a producer
readSquare = rr.Producer(
square, # function that will generate data
bSquare, # output data bus
0.05, # delay between data generation cycles
name="Read square wave signal")

# Wrap the sawtooth wave signal generator into a producer
readSawtooth = rr.Producer(
sawtooth, # function that will generate data
bSawtooth, # output data bus
0.05, # delay between data generation cycles
name="Read sawtooth wave signal")


# Create a list of producer-consumers to execute concurrently
producer_consumer_list = [readSquare, readSawtooth]

# Execute the list of producer-consumers concurrently
await rr.gather(producer_consumer_list)


asyncio.run(node1())
78 changes: 78 additions & 0 deletions demo/rr_networking_node2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#!/usr/bin/python3
"""
This file (in conjunction with rr_networking_node1.py) demonstrates basic communication
between two RossROS network nodes. This node (node2) listens for signals published by
node1, multiplies them together, and publishes the result.
"""

# Add parent directory to Python path
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))


import asyncio
import logging
import math
import time

import rossros_asyncio as rr
import rossros_networking as rrn

logging.getLogger().setLevel(logging.INFO)

async def node2():

# Start second node, bootstrapping from first node
node = rrn.Client('127.0.0.1', 9800)


# This function multiplies two inputs together
def mult(a, b):
return (a if a is not None else 0) * (b if b is not None else 0)


# Initiate data and termination busses
bSquare = rrn.NetBus(node, "/square_wave") # Published by node1
bSawtooth = rrn.NetBus(node, "/sawtooth_wave") # Published by node1
bMultiplied = rr.Bus(node, "/combined_wave")
bTerminate = rr.Bus(0, "/termination")


# Wrap the multiplier function into a consumer-producer
multiplyWaves = rr.ConsumerProducer(
mult, # function that will process data
(bSquare, bSawtooth), # input data buses
bMultiplied, # output data bus
0.01, # delay between data control cycles
bTerminate, # bus to watch for termination signal
"Multiply Waves")


# Make a printer that returns the most recent wave and product values
printBuses = rr.Printer(
(bSquare, bSawtooth, bMultiplied, bTerminate), # input data buses
# bMultiplied, # input data buses
0.25, # delay between printing cycles
bTerminate, # bus to watch for termination signal
"Print raw and derived data", # Name of printer
"Data bus readings are: ") # Prefix for output

# Make a timer (a special kind of producer) that turns on the termination
# bus when it triggers
terminationTimer = rr.Timer(
bTerminate, # Output data bus
10, # Duration
0.01, # Delay between checking for termination time
bTerminate, # Bus to check for termination signal
"Termination timer") # Name of this timer


# Create a list of producer-consumers to execute concurrently
producer_consumer_list = [multiplyWaves, printBuses, terminationTimer]

# Execute the list of producer-consumers concurrently
await rr.gather(producer_consumer_list)


asyncio.run(node2())
21 changes: 11 additions & 10 deletions rossros.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#! /usr/bin/python3
import concurrent.futures
import time
import logging
import time

from logdecorator import log_on_end, log_on_error, log_on_start
from readerwriterlock import rwlock
from logdecorator import log_on_start, log_on_end, log_on_error

DEBUG = logging.DEBUG
logging_format = "%(asctime)s: %(message)s"
Expand Down Expand Up @@ -49,7 +50,7 @@ def ensureTuple(value):
"""
Function that wraps an input value in a tuple if it is not already a tuple
"""

if isinstance(value, tuple):
value_tuple = value
else:
Expand Down Expand Up @@ -93,17 +94,17 @@ def __call__(self):

# Check if the loop should terminate
# termination_value = self.termination_buses[0].get_message(self.name)
if self.checkTerminationbuses():
if self.checkTerminationBuses():
break

# Collect all of the values from the input buses into a list
input_values = self.collectbusesToValues(self.input_buses)
input_values = self.collectBusesToValues(self.input_buses)

# Get the output value or tuple of values corresponding to the inputs
output_values = self.consumer_producer_function(*input_values)

# Deal the values into the output buses
self.dealValuesTobuses(output_values, self.output_buses)
self.dealValuesToBuses(output_values, self.output_buses)

# Pause for set amount of time
time.sleep(self.delay)
Expand All @@ -113,7 +114,7 @@ def __call__(self):
@log_on_start(DEBUG, "{self.name:s}: Starting collecting bus values into list")
@log_on_error(DEBUG, "{self.name:s}: Encountered an error while collecting bus values")
@log_on_end(DEBUG, "{self.name:s}: Finished collecting bus values")
def collectbusesToValues(self, buses):
def collectBusesToValues(self, buses):

# Wrap buses in a tuple if it isn't one already
buses = ensureTuple(buses)
Expand All @@ -132,7 +133,7 @@ def collectbusesToValues(self, buses):
@log_on_start(DEBUG, "{self.name:s}: Starting dealing values into buses")
@log_on_error(DEBUG, "{self.name:s}: Encountered an error while dealing values into buses")
@log_on_end(DEBUG, "{self.name:s}: Finished dealing values into buses")
def dealValuesTobuses(self, values, buses):
def dealValuesToBuses(self, values, buses):

# Wrap buses in a tuple if it isn't one already
buses = ensureTuple(buses)
Expand Down Expand Up @@ -160,10 +161,10 @@ def dealValuesTobuses(self, values, buses):
@log_on_start(DEBUG, "{self.name:s}: Starting to check termination buses")
@log_on_error(DEBUG, "{self.name:s}: Encountered an error while checking termination buses")
@log_on_end(DEBUG, "{self.name:s}: Finished checking termination buses")
def checkTerminationbuses(self):
def checkTerminationBuses(self):

# Look at all of the termination buses
termination_bus_values = self.collectbusesToValues(self.termination_buses)
termination_bus_values = self.collectBusesToValues(self.termination_buses)

# If any of the termination buses have triggered (gone true or non-negative), signal the loop to end
for tbv in termination_bus_values:
Expand Down
Loading