Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ will publish a message to the `reply_to` subject of the message:

nats_protocol = txnats.io.NatsProtocol(
verbose=False,
on_connect=lambda np: np.sub("get-response", "6", on_msg=respond_on_msg))
on_connect=lambda np: np.sub("getResponse", "6", on_msg=respond_on_msg))

For full context see [respond.py](example/respond.py)

Expand All @@ -88,7 +88,7 @@ and any message will go to one of the responders:

nats_protocol = txnats.io.NatsProtocol(
verbose=False,
on_connect=lambda np: np.sub("get-response", "6",
on_connect=lambda np: np.sub("getResponse", "6",
queue_group="excelsior",
on_msg=respond_on_msg))

Expand Down
102 changes: 102 additions & 0 deletions example/make_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from sys import stdout
import random
import string

import txnats

from twisted.logger import globalLogPublisher
from simple_log_observer import simpleObserver

from twisted.logger import Logger
log = Logger()

from twisted.internet import defer
from twisted.internet import task
from twisted.internet import reactor
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.internet.endpoints import connectProtocol

from txnats import actions

def event_subscriber(event):
if isinstance(event, actions.SendConnect):
log.info("got connect")
elif isinstance(event, actions.ReceivedInfo):
log.info("got info")
elif isinstance(event, actions.SendSub):
log.info("got sub sid: {}".format(event.sid))
elif isinstance(event, actions.UnsubMaxReached):
log.info("done subscription: {}".format(event.sid))
elif isinstance(event, actions.SendUnsub):
log.info("Unsub requested {}".format(event))
elif isinstance(event, actions.ReceivedPing):
log.info("got Ping")
elif isinstance(event, actions.ReceivedPong):
log.info("got Pong outstanding: {}".format(event.outstanding_pings))
elif isinstance(event, actions.SendPing):
log.info("Sending Ping outstanding: {}".format(event.outstanding_pings))
elif isinstance(event, actions.SendPong):
log.info("Sending Pong")
elif isinstance(event, actions.Disconnected):
log.info("disconnect")

@defer.inlineCallbacks
def someRequests(nats_protocol):
"""
The only point of this code is to show some basic subscribing
and publishing.
"""
log.info("about to request")
response_payload = yield nats_protocol.request("ssshh", "Boo")
log.info("response : {}".format(response_payload))
response_payload = yield nats_protocol.request("ssshh", "Woo")
log.info("response : {}".format(response_payload))
response_payload = yield nats_protocol.request("ssshh", "Choo")
log.info("response : {}".format(response_payload))

requests = []
for n in xrange(100):
requests.append(nats_protocol.request("ssshh", "Foo {}".format(n)))
response_payload = yield defer.gatherResults(requests)
#d.addErrback(lambda np: log.info("{p}", p=np))
# Log what is returned by the connectProtocol.
#d.addCallback(lambda np: log.info("{p}", p=np))
log.info("response : {}".format(response_payload))

# Lose the connection one second after the "and another thing" msg.
yield task.deferLater(nats_protocol.reactor,
10, nats_protocol.transport.loseConnection)

# stop the reactor(the event loop) one second after that.
yield task.deferLater(nats_protocol.reactor, 0, reactor.stop)


def main(reactor):

host = "demo.nats.io"
port = 4222

point = TCP4ClientEndpoint(reactor, host, port)
nats_protocol = txnats.io.NatsProtocol(
verbose=True,
event_subscribers=[
#event_subscriber
],
on_connect=someRequests)

connecting = connectProtocol(point, nats_protocol)
# Log if there is an error making the connection.
connecting.addErrback(lambda np: log.info("{p}", p=np))
# Log what is returned by the connectProtocol.
connecting.addCallback(lambda np: log.info("{p}", p=np))
return connecting


if __name__ == '__main__':
globalLogPublisher.addObserver(simpleObserver)
main(reactor)
reactor.run()

2 changes: 1 addition & 1 deletion example/respond.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def create_client(reactor, host, port):
"""
log.info("Start client.")
point = TCP4ClientEndpoint(reactor, host, port)
nats_protocol = txnats.io.NatsProtocol(verbose=False, on_connect=listen)
nats_protocol = txnats.io.NatsProtocol(verbose=True, on_connect=listen)

# Because NatsProtocol implements the Protocol interface, Twisted's
# connectProtocol knows how to connected to the endpoint.
Expand Down
9 changes: 9 additions & 0 deletions txnats/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ class SendPub(object):
reply_to = attr.ib(validator=attr.validators.optional(is_subject))


@attr.s(slots=True)
class Request(object):
sid = attr.ib(validator=is_subject_id)
subject = attr.ib(validator=is_subject)
payload = attr.ib(validator=attr.validators.instance_of(bytes))
reply_to = attr.ib(validator=attr.validators.optional(is_subject))
deferred = attr.ib()


@attr.s(slots=True)
class UnhandledCommand(object):
protocol = attr.ib(validator=is_instance_of_nats_protocol)
Expand Down
40 changes: 37 additions & 3 deletions txnats/protocol.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import division, absolute_import
from sys import stdout
import random
import string
import attr
import json
from io import BytesIO
Expand All @@ -22,7 +24,6 @@
from . import actions
from .errors import NatsError


LANG = "py.twisted"
CLIENT_NAME = "txnats"

Expand Down Expand Up @@ -90,6 +91,7 @@ def __init__(self, own_reactor=None, verbose=True, pedantic=False,
self.on_connect_d.addCallback(on_connect)
self.on_connect_d.addErrback(self._eb_trace_and_raise)
self.sids = {}
self.requests = {}
self.subscriptions = subscriptions if subscriptions is not None else {}
self.unsubs = unsubs if unsubs else {}
self.event_subscribers = event_subscribers if event_subscribers is not None else []
Expand Down Expand Up @@ -393,13 +395,45 @@ def pong(self):
self.transport.write(op)
self.dispatch(actions.SendPong(self))

def request(self, sid, subject):
def request(self, subject, payload):
"""
How to do this declaratively

Make a synchronous request for a subject.
declare publish with a timeout that if not replied to in time, will unregister

make random sid

Make a reply to.
Subscribe to the subject.
Make a Deferred and add it to the inbox under the reply to.
Do auto unsubscribe for one message.
"""
raise NotImplementedError()
request_id = None
deferred=defer.Deferred()
while True:
request_id = ''.join(
random.choice(
string.ascii_uppercase + string.digits) for _ in range(8))
if request_id not in self.requests:
break
while True:
sid = ''.join(
random.choice(
string.ascii_uppercase + string.digits) for _ in range(12))
if sid not in self.sids:
break

request = actions.Request(sid, subject, payload,
reply_to=request_id, deferred=deferred)

self.requests[request_id] = request

def sid_on_msg(nats_protocol, sid, subject, reply_to, payload):
del self.requests[request_id]
deferred.callback(payload)

self.sub(request_id, sid, on_msg=sid_on_msg)
self.unsub(sid, 1)
self.pub(subject, payload, request_id)
return deferred