-
Notifications
You must be signed in to change notification settings - Fork 2
Python Decorators for Node_Registry
Nandish Patel edited this page Jun 30, 2021
·
3 revisions
- subscribe
- approx_time_sync
- service
- client
- publisher
- connection_based
- parameter
- parameter_list
- timer
- inject
- timeit
Create a new subscription.
@rosnode.subscribe(_Msg_Type, _topic_name)
def func_for_subs(msg):
rosnode.logger.info('I heard: "%s"' % msg.data)Approximately synchronizes messages by their timestamps.
Call the wrapper function when the topics in the list are within some timestamp limit(slop).
@rosnode.approx_time_sync(
[
[JointState, _topic_1],
[JointState, _topic_2],
[JointState, _topic_3],
[JointState, _topic_4],
]
queue=5,
slop=0.1,
)
def callback(
topic_1_msg: JointState,
topic_2_msg: JointState,
topic_3_msg: JointState,
topic_4_msg: JointState,
):
rosnode.logger.info('inside approx time synced callback')Create the single parameter specified in the argument.
@rosnode.parameter(
'param1', 'SingleValue'
)
@rosnode
def node() -> str:
return 'Minimal_publisher'
# parameter value can be used using
rosnode.param1 # will return 'SingleValue1'Create a list of parameters.
@rosnode.parameter_list([
['param1', 'value1'],
['param2', 'value2'],
['param3', 'value3']
])
@rosnode
def node() -> str:
return 'Minimal_publisher'
# parameter value can be used using
rosnode.param1 # will return 'value1'Inject a function into the Xnode class.
@rosnode.inject
def func_call():
return 'function call'
rosnode.func_callCalculate the Processing time of the given function.
@rosnode.timeit
def func_call():
sleep(2)
return None
func_call() # this will return:-> "func_call Processing time: 2.002486467361450195"Create a new timer.
@rosnode.timer(1.0) # function will call every 1hz
def timer_cb():
rosnode.logger.info('function running')Create a new Publisher.
@rosnode.publisher(String, _pub_topic)Call a function only if there are active subscribers.
@rosnode.timer(1.0)
def timer_cb():
pub = rosnode.get_publisher(_pub_topic)
msg = String()
rosnode.node.i += 1
msg.data = 'Hello World %d' % rosnode.node.i
pub.publish(msg)
rosnode.logger.info('Publishing: "%s" ' % rosnode.param1)
msg_onSubs = String()
msg_onSubs.data = 'Subscriber is increased'
@rosnode.connection_based('topic')
def func_call2():
return {'second_topic': msg}