diff --git a/generate.py b/generate.py index bce6b5e..53f1687 100644 --- a/generate.py +++ b/generate.py @@ -18,6 +18,25 @@ int fd; short events; short revents; + ...; +}; +struct nn_cmsghdr { + size_t cmsg_len; + int cmsg_level; + int cmsg_type; + ...; +}; +struct nn_iovec { + void * iov_base; + size_t iov_len; + ...; +}; +struct nn_msghdr { + struct nn_iovec *msg_iov; + int msg_iovlen; + void * msg_control; + size_t msg_controllen; + ...; }; ''' @@ -50,6 +69,13 @@ def functions(hfiles): lines.append(ln) cont = ln.strip()[-1] + lines.extend([ + 'struct nn_cmsghdr *NN_CMSG_FIRSTHDR(struct nn_msghdr *hdr);', + 'struct nn_cmsghdr *NN_CMSG_NXTHDR(struct nn_msghdr * hdr, struct nn_cmsghdr *cmsg);', + 'unsigned char * NN_CMSG_DATA(struct nn_cmsghdr * cmsg);', + 'size_t NN_CMSG_SPACE(size_t len);', + 'size_t NN_CMSG_LEN(size_t len);' + ]) return ''.join(ln[10:] if ln.startswith('NN_') else ln for ln in lines) def symbols(ffi, host_library): diff --git a/nnpy/__init__.py b/nnpy/__init__.py index 3db54bf..a3b506c 100644 --- a/nnpy/__init__.py +++ b/nnpy/__init__.py @@ -4,7 +4,7 @@ import os from .errors import NNError -from .socket import Socket +from .socket import Socket, MessageControl class PollSet(object): diff --git a/nnpy/socket.py b/nnpy/socket.py index 9544cf1..2f9b63f 100644 --- a/nnpy/socket.py +++ b/nnpy/socket.py @@ -1,10 +1,13 @@ from . import errors, ffi, nanomsg import sys +import collections NN_MSG = int(ffi.cast("size_t", -1)) ustr = str if sys.version_info[0] > 2 else unicode +MessageControl = collections.namedtuple('MessageControl', ['level', 'type', 'data']) + class Socket(object): """ Nanomsg scalability protocols (SP) socket. @@ -86,6 +89,71 @@ def recv(self, flags=0): s = ffi.buffer(buf[0], rc)[:] nanomsg.nn_freemsg(buf[0]) return s + + def sendmsg(self, data, control, flags=0): + # Some data types can use a zero-copy buffer creation strategy when + # paired with new versions of CFFI. Namely, CFFI 1.8 supports `bytes` + # types with `from_buffer`, which is about 18% faster. We try the fast + # way first and degrade as needed for the platform. + hdr = ffi.new('struct nn_msghdr *') + + def gen(control_): + chdr = ffi.new('struct nn_cmsghdr *') + for level, tp, data in control_: + chdr.cmsg_level = level + chdr.cmsg_type = tp + chdr.cmsg_len = nanomsg.NN_CMSG_SPACE(len(data)) + payload = ffi.buffer(chdr)[:] + data + padding = b'\0' * (chdr.cmsg_len - len(payload)) + yield payload + padding + + control = b''.join(gen(control)) + + try: + control = ffi.from_buffer(control) + data = ffi.from_buffer(data) + except TypeError: + control = ffi.new('char[%i]' % len(control), control) + data = data.encode() if isinstance(data, ustr) else data + data = ffi,new('char[%i]' % len(data), data) + iov = ffi.new('struct nn_iovec *') + iov.iov_base = data + iov.iov_len = len(data) + hdr.msg_iov = iov + hdr.msg_iovlen = 1 + hdr.msg_control = control + hdr.msg_controllen = len(control) + + rc = nanomsg.nn_sendmsg(self.sock, hdr, flags) + return errors.convert(rc, rc) + + def recvmsg(self, flags=0): + hdr = ffi.new('struct nn_msghdr *') + iov = ffi.new('struct nn_iovec *') + buf = ffi.new('char**') + control = ffi.new('char **') + iov.iov_base = buf + iov.iov_len = NN_MSG + hdr.msg_iov = iov + hdr.msg_iovlen = 1 + hdr.msg_control = control + hdr.msg_controllen = NN_MSG + rc = nanomsg.nn_recvmsg(self.sock, hdr, flags) + errors.convert(rc) + + def gen(hdr_): + chdr = nanomsg.NN_CMSG_FIRSTHDR(hdr_) + while chdr: + yield MessageControl( + chdr.cmsg_level, chdr.cmsg_type, + ffi.buffer(nanomsg.NN_CMSG_DATA(chdr), chdr.cmsg_len - ffi.sizeof(chdr[0]))[:]) + chdr = nanomsg.NN_CMSG_NXTHDR(hdr_, chdr) + + s = ffi.buffer(buf[0], rc)[:] + c = list(gen(hdr)) + nanomsg.nn_freemsg(buf[0]) + nanomsg.nn_freemsg(control[0]) + return s, c def get_statistic(self, statistic): rc = nanomsg.nn_get_statistic(self.sock, statistic)