From bf33b98043a056dd8b6c2ff3480d98a881c965b9 Mon Sep 17 00:00:00 2001 From: Dimitrios Amaxilatis Date: Thu, 20 Oct 2011 12:57:36 +0300 Subject: [PATCH 1/3] call fireChannelConnected upon a new connection --- .../itm/nettyrxtx/RXTXChannelSink.java | 453 +++++++++--------- 1 file changed, 232 insertions(+), 221 deletions(-) diff --git a/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java b/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java index 058ae3f..f6858a9 100644 --- a/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java +++ b/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java @@ -33,315 +33,326 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.IOException; +import java.net.SocketAddress; import java.util.TooManyListenersException; import java.util.concurrent.Executor; public class RXTXChannelSink extends AbstractChannelSink { - private static class WriteRunnable implements Runnable { +private static class WriteRunnable implements Runnable { - private final DefaultChannelFuture future; + private final DefaultChannelFuture future; - private final RXTXChannelSink channelSink; + private final RXTXChannelSink channelSink; - private final ChannelBuffer message; + private final ChannelBuffer message; - public WriteRunnable(final DefaultChannelFuture future, final RXTXChannelSink channelSink, - final ChannelBuffer message) { - this.future = future; - this.channelSink = channelSink; - this.message = message; - } - - public void run() { - try { + public WriteRunnable(final DefaultChannelFuture future, final RXTXChannelSink channelSink, + final ChannelBuffer message) { + this.future = future; + this.channelSink = channelSink; + this.message = message; + } - channelSink.outputStream.write(message.array(), message.readerIndex(), message.readableBytes()); - channelSink.outputStream.flush(); - if (log.isTraceEnabled()) { - log.trace("Wrote message to outputStream: {}", StringUtils.toHexString(message)); - } - future.setSuccess(); + public void run() { + try { - } catch (Exception e) { - future.setFailure(e); + channelSink.outputStream.write(message.array(), message.readerIndex(), message.readableBytes()); + channelSink.outputStream.flush(); + if (log.isTraceEnabled()) { + log.trace("Wrote message to outputStream: {}", StringUtils.toHexString(message)); } + future.setSuccess(); + + } catch (Exception e) { + future.setFailure(e); } } +} - private static class ConnectRunnable implements Runnable { +private static class ConnectRunnable implements Runnable { - private DefaultChannelFuture channelFuture; + private DefaultChannelFuture channelFuture; - private RXTXChannelSink channelSink; + private RXTXChannelSink channelSink; + private Channel channel; - private ConnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink) { - this.channelFuture = channelFuture; - this.channelSink = channelSink; - } + private SocketAddress remoteAddress; - public void run() { - - if (channelSink.closed) { - channelFuture.setFailure(new Exception("Channel is already closed.")); - } else { - try { - connectInternal(); - log.debug("Successfully connected."); - channelFuture.setSuccess(); - } catch (Exception e) { - log.warn("" + e, e); - channelFuture.setFailure(e); - } - } - - } + private ConnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink, Channel channel, SocketAddress remoteAddress) { + this.channel = channel; + this.remoteAddress = remoteAddress; + log.debug("RXTXChannelSink ConnectRunnable"); + this.channelFuture = channelFuture; + this.channelSink = channelSink; + } - private void connectInternal() - throws NoSuchPortException, PortInUseException, UnsupportedCommOperationException, IOException, - TooManyListenersException { + public void run() { - final CommPort commPort; + if (channelSink.closed) { + channelFuture.setFailure(new Exception("Channel is already closed.")); + } else { try { + connectInternal(); + log.debug("Successfully connected."); + channelFuture.setSuccess(); + Channels.fireChannelConnected(channel, remoteAddress); + log.debug("Fired Channel Connected."); - final CommPortIdentifier cpi = - CommPortIdentifier.getPortIdentifier(channelSink.remoteAddress.getDeviceAddress()); - commPort = cpi.open(this.getClass().getName(), 1000); - - } catch (NoSuchPortException e) { - log.warn("No such port {}: {}", channelSink.remoteAddress.getDeviceAddress(), e.getMessage()); - throw e; - } catch (PortInUseException e) { - log.warn("Port {} in use: {}", channelSink.remoteAddress.getDeviceAddress(), e.getMessage()); - throw e; - } - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - commPort.close(); - } catch (Exception e) { - log.warn("" + e, e); - } - } + } catch (Exception e) { + log.warn("" + e, e); + channelFuture.setFailure(e); } - ); - - channelSink.serialPort = ((SerialPort) commPort); - log.debug("Adding SerialPortEventListener"); - channelSink.serialPort.addEventListener(new RXTXSerialPortEventListener(channelSink)); - channelSink.serialPort.notifyOnDataAvailable(true); - channelSink.serialPort.setSerialPortParams( - channelSink.config.getBaudrate(), - channelSink.config.getDatabits().getValue(), - channelSink.config.getStopbits().getValue(), - channelSink.config.getParitybit().getValue() - ); - - // TODO do this more generic - channelSink.serialPort.setDTR(false); - channelSink.serialPort.setRTS(false); - - channelSink.outputStream = new BufferedOutputStream(channelSink.serialPort.getOutputStream()); - channelSink.inputStream = new BufferedInputStream(channelSink.serialPort.getInputStream()); } + } - private static class DisconnectRunnable implements Runnable { + private void connectInternal() + throws NoSuchPortException, PortInUseException, UnsupportedCommOperationException, IOException, + TooManyListenersException { - private DefaultChannelFuture channelFuture; + final CommPort commPort; + try { - private RXTXChannelSink channelSink; + final CommPortIdentifier cpi = + CommPortIdentifier.getPortIdentifier(channelSink.remoteAddress.getDeviceAddress()); + commPort = cpi.open(this.getClass().getName(), 1000); - public DisconnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink) { - this.channelFuture = channelFuture; - this.channelSink = channelSink; + } catch (NoSuchPortException e) { + log.warn("No such port {}: {}", channelSink.remoteAddress.getDeviceAddress(), e.getMessage()); + throw e; + } catch (PortInUseException e) { + log.warn("Port {} in use: {}", channelSink.remoteAddress.getDeviceAddress(), e.getMessage()); + throw e; } - public void run() { - if (channelSink.closed) { - channelFuture.setFailure(new Exception("Channel is already closed.")); - } else { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { try { - disconnectInternal(); - log.debug("Successfully disconnected"); - channelSink.channel.doSetClosed(); + commPort.close(); } catch (Exception e) { log.warn("" + e, e); - channelFuture.setFailure(e); } } } + ); + + channelSink.serialPort = ((SerialPort) commPort); + log.debug("Adding SerialPortEventListener"); + channelSink.serialPort.addEventListener(new RXTXSerialPortEventListener(channelSink)); + channelSink.serialPort.notifyOnDataAvailable(true); + channelSink.serialPort.setSerialPortParams( + channelSink.config.getBaudrate(), + channelSink.config.getDatabits().getValue(), + channelSink.config.getStopbits().getValue(), + channelSink.config.getParitybit().getValue() + ); + + // TODO do this more generic + channelSink.serialPort.setDTR(false); + channelSink.serialPort.setRTS(false); + + channelSink.outputStream = new BufferedOutputStream(channelSink.serialPort.getOutputStream()); + channelSink.inputStream = new BufferedInputStream(channelSink.serialPort.getInputStream()); + } +} + +private static class DisconnectRunnable implements Runnable { - private void disconnectInternal() throws Exception { + private DefaultChannelFuture channelFuture; - Exception exception = null; + private RXTXChannelSink channelSink; + + public DisconnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink) { + this.channelFuture = channelFuture; + this.channelSink = channelSink; + } + public void run() { + if (channelSink.closed) { + channelFuture.setFailure(new Exception("Channel is already closed.")); + } else { try { - if (channelSink.inputStream != null) { - channelSink.inputStream.close(); - } - } catch (IOException e) { - log.debug("Failed to close in-stream :" + e, e); - exception = e; + disconnectInternal(); + log.debug("Successfully disconnected"); + channelSink.channel.doSetClosed(); + } catch (Exception e) { + log.warn("" + e, e); + channelFuture.setFailure(e); } + } + } - try { - if (channelSink.outputStream != null) { - channelSink.outputStream.close(); - } - } catch (IOException e) { - log.debug("Failed to close out-stream :" + e, e); - exception = e; + private void disconnectInternal() throws Exception { + + Exception exception = null; + + try { + if (channelSink.inputStream != null) { + channelSink.inputStream.close(); } + } catch (IOException e) { + log.debug("Failed to close in-stream :" + e, e); + exception = e; + } - if (channelSink.serialPort != null) { - channelSink.serialPort.removeEventListener(); - channelSink.serialPort.close(); + try { + if (channelSink.outputStream != null) { + channelSink.outputStream.close(); } + } catch (IOException e) { + log.debug("Failed to close out-stream :" + e, e); + exception = e; + } - channelSink.inputStream = null; - channelSink.outputStream = null; - channelSink.serialPort = null; + if (channelSink.serialPort != null) { + channelSink.serialPort.removeEventListener(); + channelSink.serialPort.close(); + } - if (exception != null) { - throw exception; - } + channelSink.inputStream = null; + channelSink.outputStream = null; + channelSink.serialPort = null; + if (exception != null) { + throw exception; } + } +} - private static final Logger log = LoggerFactory.getLogger(RXTXChannelSink.class); +private static final Logger log = LoggerFactory.getLogger(RXTXChannelSink.class); - private final Executor executor; +private final Executor executor; - private RXTXChannelConfig config; +private RXTXChannelConfig config; - private RXTXChannel channel; +private RXTXChannel channel; - public RXTXChannelSink(final Executor executor) { - this.executor = executor; - config = new RXTXChannelConfig(); - } +public RXTXChannelSink(final Executor executor) { + this.executor = executor; + config = new RXTXChannelConfig(); +} - public boolean isConnected() { - return inputStream != null && outputStream != null; - } +public boolean isConnected() { + return inputStream != null && outputStream != null; +} - public RXTXDeviceAddress getRemoteAddress() { - return remoteAddress; - } +public RXTXDeviceAddress getRemoteAddress() { + return remoteAddress; +} - public boolean isBound() { - return false; - } +public boolean isBound() { + return false; +} - public ChannelConfig getConfig() { - return config; - } +public ChannelConfig getConfig() { + return config; +} - public void setChannel(final RXTXChannel channel) { - this.channel = channel; - } +public void setChannel(final RXTXChannel channel) { + this.channel = channel; +} - private static class RXTXSerialPortEventListener implements SerialPortEventListener { +private static class RXTXSerialPortEventListener implements SerialPortEventListener { - private RXTXChannelSink channelSink; + private RXTXChannelSink channelSink; - public RXTXSerialPortEventListener(final RXTXChannelSink channelSink) { - this.channelSink = channelSink; - } + public RXTXSerialPortEventListener(final RXTXChannelSink channelSink) { + this.channelSink = channelSink; + } + + public void serialEvent(final SerialPortEvent event) { + log.trace("{}", event); + switch (event.getEventType()) { + case SerialPortEvent.DATA_AVAILABLE: + try { + if (channelSink.inputStream != null && channelSink.inputStream.available() > 0) { + int available = channelSink.inputStream.available(); + byte[] buffer = new byte[available]; + int read = channelSink.inputStream.read(buffer); + if (read > 0) { + ChannelBuffer channelBuffer = ChannelBuffers.wrappedBuffer(buffer, 0, read); + UpstreamMessageEvent upstreamMessageEvent = new UpstreamMessageEvent( + channelSink.channel, + channelBuffer, + channelSink.getRemoteAddress() + ); + log.trace("read from stream: {}", StringUtils.toHexString(channelBuffer)); + channelSink.channel.getPipeline().sendUpstream(upstreamMessageEvent); - public void serialEvent(final SerialPortEvent event) { - log.trace("{}", event); - switch (event.getEventType()) { - case SerialPortEvent.DATA_AVAILABLE: - try { - if (channelSink.inputStream != null && channelSink.inputStream.available() > 0) { - int available = channelSink.inputStream.available(); - byte[] buffer = new byte[available]; - int read = channelSink.inputStream.read(buffer); - if (read > 0) { - ChannelBuffer channelBuffer = ChannelBuffers.wrappedBuffer(buffer, 0, read); - UpstreamMessageEvent upstreamMessageEvent = new UpstreamMessageEvent( - channelSink.channel, - channelBuffer, - channelSink.getRemoteAddress() - ); - log.trace("read from stream: {}", StringUtils.toHexString(channelBuffer)); - channelSink.channel.getPipeline().sendUpstream(upstreamMessageEvent); - } } - } catch (IOException e) { - log.error("" + e, e); - channelSink.channel.close(); } - break; - } + } catch (IOException e) { + log.error("" + e, e); + channelSink.channel.close(); + } + break; } } +} + +private RXTXDeviceAddress remoteAddress; - private RXTXDeviceAddress remoteAddress; +private BufferedOutputStream outputStream; - private BufferedOutputStream outputStream; +private BufferedInputStream inputStream; - private BufferedInputStream inputStream; +private SerialPort serialPort; - private SerialPort serialPort; +private volatile boolean closed = false; +public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { - private volatile boolean closed = false; + final ChannelFuture future = e.getFuture(); - public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + if (e instanceof ChannelStateEvent) { - final ChannelFuture future = e.getFuture(); + final ChannelStateEvent stateEvent = (ChannelStateEvent) e; + final ChannelState state = stateEvent.getState(); + final Object value = stateEvent.getValue(); - if (e instanceof ChannelStateEvent) { + switch (state) { - final ChannelStateEvent stateEvent = (ChannelStateEvent) e; - final ChannelState state = stateEvent.getState(); - final Object value = stateEvent.getValue(); + case OPEN: + if (Boolean.FALSE.equals(value)) { + executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this)); + } + break; - switch (state) { + case BOUND: + throw new UnsupportedOperationException(); - case OPEN: - if (Boolean.FALSE.equals(value)) { - executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this)); - } - break; + case CONNECTED: + if (value != null) { + remoteAddress = (RXTXDeviceAddress) value; + executor.execute(new ConnectRunnable((DefaultChannelFuture) future, this, pipeline.getChannel(), remoteAddress)); + } else { + executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this)); + } + break; - case BOUND: - throw new UnsupportedOperationException(); + case INTEREST_OPS: + throw new UnsupportedOperationException(); - case CONNECTED: - if (value != null) { - remoteAddress = (RXTXDeviceAddress) value; - executor.execute(new ConnectRunnable((DefaultChannelFuture) future, this)); - } else { - executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this)); - } - break; + } - case INTEREST_OPS: - throw new UnsupportedOperationException(); - } + } else if (e instanceof MessageEvent) { - } else if (e instanceof MessageEvent) { - - final MessageEvent event = (MessageEvent) e; - if (event.getMessage() instanceof ChannelBuffer) { - executor.execute( - new WriteRunnable((DefaultChannelFuture) future, this, (ChannelBuffer) event.getMessage()) - ); - } else { - throw new IllegalArgumentException( - "Only ChannelBuffer objects are supported to be written onto the RXTXChannelSink! " - + "Please check if the encoder pipeline is configured correctly." - ); - } + final MessageEvent event = (MessageEvent) e; + if (event.getMessage() instanceof ChannelBuffer) { + executor.execute( + new WriteRunnable((DefaultChannelFuture) future, this, (ChannelBuffer) event.getMessage()) + ); + } else { + throw new IllegalArgumentException( + "Only ChannelBuffer objects are supported to be written onto the RXTXChannelSink! " + + "Please check if the encoder pipeline is configured correctly." + ); } } +} } From 51def3186043b227ada0c15dc44e8e136aa6cfa1 Mon Sep 17 00:00:00 2001 From: Dimitrios Amaxilatis Date: Tue, 25 Oct 2011 11:55:44 +0300 Subject: [PATCH 2/3] revert formating --- .../itm/nettyrxtx/RXTXChannelSink.java | 455 +++++++++--------- 1 file changed, 222 insertions(+), 233 deletions(-) diff --git a/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java b/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java index f6858a9..066fc11 100644 --- a/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java +++ b/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java @@ -33,326 +33,315 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.IOException; -import java.net.SocketAddress; import java.util.TooManyListenersException; import java.util.concurrent.Executor; public class RXTXChannelSink extends AbstractChannelSink { -private static class WriteRunnable implements Runnable { + private static class WriteRunnable implements Runnable { - private final DefaultChannelFuture future; + private final DefaultChannelFuture future; - private final RXTXChannelSink channelSink; + private final RXTXChannelSink channelSink; - private final ChannelBuffer message; + private final ChannelBuffer message; - public WriteRunnable(final DefaultChannelFuture future, final RXTXChannelSink channelSink, - final ChannelBuffer message) { - this.future = future; - this.channelSink = channelSink; - this.message = message; - } + public WriteRunnable(final DefaultChannelFuture future, final RXTXChannelSink channelSink, + final ChannelBuffer message) { + this.future = future; + this.channelSink = channelSink; + this.message = message; + } - public void run() { - try { + public void run() { + try { - channelSink.outputStream.write(message.array(), message.readerIndex(), message.readableBytes()); - channelSink.outputStream.flush(); - if (log.isTraceEnabled()) { - log.trace("Wrote message to outputStream: {}", StringUtils.toHexString(message)); - } - future.setSuccess(); + channelSink.outputStream.write(message.array(), message.readerIndex(), message.readableBytes()); + channelSink.outputStream.flush(); + if (log.isTraceEnabled()) { + log.trace("Wrote message to outputStream: {}", StringUtils.toHexString(message)); + } + future.setSuccess(); - } catch (Exception e) { - future.setFailure(e); + } catch (Exception e) { + future.setFailure(e); + } } } -} -private static class ConnectRunnable implements Runnable { + private static class ConnectRunnable implements Runnable { - private DefaultChannelFuture channelFuture; + private DefaultChannelFuture channelFuture; - private RXTXChannelSink channelSink; - private Channel channel; + private RXTXChannelSink channelSink; - private SocketAddress remoteAddress; + private ConnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink) { + this.channelFuture = channelFuture; + this.channelSink = channelSink; + } - private ConnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink, Channel channel, SocketAddress remoteAddress) { - this.channel = channel; - this.remoteAddress = remoteAddress; - log.debug("RXTXChannelSink ConnectRunnable"); - this.channelFuture = channelFuture; - this.channelSink = channelSink; - } + public void run() { + + if (channelSink.closed) { + channelFuture.setFailure(new Exception("Channel is already closed.")); + } else { + try { + connectInternal(); + log.debug("Successfully connected."); + channelFuture.setSuccess(); + } catch (Exception e) { + log.warn("" + e, e); + channelFuture.setFailure(e); + } + } + + } - public void run() { + private void connectInternal() + throws NoSuchPortException, PortInUseException, UnsupportedCommOperationException, IOException, + TooManyListenersException { - if (channelSink.closed) { - channelFuture.setFailure(new Exception("Channel is already closed.")); - } else { + final CommPort commPort; try { - connectInternal(); - log.debug("Successfully connected."); - channelFuture.setSuccess(); - Channels.fireChannelConnected(channel, remoteAddress); - log.debug("Fired Channel Connected."); - } catch (Exception e) { - log.warn("" + e, e); - channelFuture.setFailure(e); + final CommPortIdentifier cpi = + CommPortIdentifier.getPortIdentifier(channelSink.remoteAddress.getDeviceAddress()); + commPort = cpi.open(this.getClass().getName(), 1000); + + } catch (NoSuchPortException e) { + log.warn("No such port {}: {}", channelSink.remoteAddress.getDeviceAddress(), e.getMessage()); + throw e; + } catch (PortInUseException e) { + log.warn("Port {} in use: {}", channelSink.remoteAddress.getDeviceAddress(), e.getMessage()); + throw e; } - } + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + commPort.close(); + } catch (Exception e) { + log.warn("" + e, e); + } + } + } + ); + + channelSink.serialPort = ((SerialPort) commPort); + log.debug("Adding SerialPortEventListener"); + channelSink.serialPort.addEventListener(new RXTXSerialPortEventListener(channelSink)); + channelSink.serialPort.notifyOnDataAvailable(true); + channelSink.serialPort.setSerialPortParams( + channelSink.config.getBaudrate(), + channelSink.config.getDatabits().getValue(), + channelSink.config.getStopbits().getValue(), + channelSink.config.getParitybit().getValue() + ); + + // TODO do this more generic + channelSink.serialPort.setDTR(false); + channelSink.serialPort.setRTS(false); + + channelSink.outputStream = new BufferedOutputStream(channelSink.serialPort.getOutputStream()); + channelSink.inputStream = new BufferedInputStream(channelSink.serialPort.getInputStream()); + } } - private void connectInternal() - throws NoSuchPortException, PortInUseException, UnsupportedCommOperationException, IOException, - TooManyListenersException { + private static class DisconnectRunnable implements Runnable { - final CommPort commPort; - try { + private DefaultChannelFuture channelFuture; - final CommPortIdentifier cpi = - CommPortIdentifier.getPortIdentifier(channelSink.remoteAddress.getDeviceAddress()); - commPort = cpi.open(this.getClass().getName(), 1000); + private RXTXChannelSink channelSink; - } catch (NoSuchPortException e) { - log.warn("No such port {}: {}", channelSink.remoteAddress.getDeviceAddress(), e.getMessage()); - throw e; - } catch (PortInUseException e) { - log.warn("Port {} in use: {}", channelSink.remoteAddress.getDeviceAddress(), e.getMessage()); - throw e; + public DisconnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink) { + this.channelFuture = channelFuture; + this.channelSink = channelSink; } - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { + public void run() { + if (channelSink.closed) { + channelFuture.setFailure(new Exception("Channel is already closed.")); + } else { try { - commPort.close(); + disconnectInternal(); + log.debug("Successfully disconnected"); + channelSink.channel.doSetClosed(); } catch (Exception e) { log.warn("" + e, e); + channelFuture.setFailure(e); } } } - ); - - channelSink.serialPort = ((SerialPort) commPort); - log.debug("Adding SerialPortEventListener"); - channelSink.serialPort.addEventListener(new RXTXSerialPortEventListener(channelSink)); - channelSink.serialPort.notifyOnDataAvailable(true); - channelSink.serialPort.setSerialPortParams( - channelSink.config.getBaudrate(), - channelSink.config.getDatabits().getValue(), - channelSink.config.getStopbits().getValue(), - channelSink.config.getParitybit().getValue() - ); - - // TODO do this more generic - channelSink.serialPort.setDTR(false); - channelSink.serialPort.setRTS(false); - - channelSink.outputStream = new BufferedOutputStream(channelSink.serialPort.getOutputStream()); - channelSink.inputStream = new BufferedInputStream(channelSink.serialPort.getInputStream()); - } -} -private static class DisconnectRunnable implements Runnable { + private void disconnectInternal() throws Exception { - private DefaultChannelFuture channelFuture; + Exception exception = null; - private RXTXChannelSink channelSink; - - public DisconnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink) { - this.channelFuture = channelFuture; - this.channelSink = channelSink; - } - - public void run() { - if (channelSink.closed) { - channelFuture.setFailure(new Exception("Channel is already closed.")); - } else { try { - disconnectInternal(); - log.debug("Successfully disconnected"); - channelSink.channel.doSetClosed(); - } catch (Exception e) { - log.warn("" + e, e); - channelFuture.setFailure(e); + if (channelSink.inputStream != null) { + channelSink.inputStream.close(); + } + } catch (IOException e) { + log.debug("Failed to close in-stream :" + e, e); + exception = e; } - } - } - - private void disconnectInternal() throws Exception { - - Exception exception = null; - try { - if (channelSink.inputStream != null) { - channelSink.inputStream.close(); + try { + if (channelSink.outputStream != null) { + channelSink.outputStream.close(); + } + } catch (IOException e) { + log.debug("Failed to close out-stream :" + e, e); + exception = e; } - } catch (IOException e) { - log.debug("Failed to close in-stream :" + e, e); - exception = e; - } - try { - if (channelSink.outputStream != null) { - channelSink.outputStream.close(); + if (channelSink.serialPort != null) { + channelSink.serialPort.removeEventListener(); + channelSink.serialPort.close(); } - } catch (IOException e) { - log.debug("Failed to close out-stream :" + e, e); - exception = e; - } - if (channelSink.serialPort != null) { - channelSink.serialPort.removeEventListener(); - channelSink.serialPort.close(); - } + channelSink.inputStream = null; + channelSink.outputStream = null; + channelSink.serialPort = null; - channelSink.inputStream = null; - channelSink.outputStream = null; - channelSink.serialPort = null; + if (exception != null) { + throw exception; + } - if (exception != null) { - throw exception; } - } -} -private static final Logger log = LoggerFactory.getLogger(RXTXChannelSink.class); + private static final Logger log = LoggerFactory.getLogger(RXTXChannelSink.class); -private final Executor executor; + private final Executor executor; -private RXTXChannelConfig config; + private RXTXChannelConfig config; -private RXTXChannel channel; + private RXTXChannel channel; -public RXTXChannelSink(final Executor executor) { - this.executor = executor; - config = new RXTXChannelConfig(); -} - -public boolean isConnected() { - return inputStream != null && outputStream != null; -} + public RXTXChannelSink(final Executor executor) { + this.executor = executor; + config = new RXTXChannelConfig(); + } -public RXTXDeviceAddress getRemoteAddress() { - return remoteAddress; -} + public boolean isConnected() { + return inputStream != null && outputStream != null; + } -public boolean isBound() { - return false; -} + public RXTXDeviceAddress getRemoteAddress() { + return remoteAddress; + } -public ChannelConfig getConfig() { - return config; -} + public boolean isBound() { + return false; + } -public void setChannel(final RXTXChannel channel) { - this.channel = channel; -} + public ChannelConfig getConfig() { + return config; + } -private static class RXTXSerialPortEventListener implements SerialPortEventListener { + public void setChannel(final RXTXChannel channel) { + this.channel = channel; + } - private RXTXChannelSink channelSink; + private static class RXTXSerialPortEventListener implements SerialPortEventListener { - public RXTXSerialPortEventListener(final RXTXChannelSink channelSink) { - this.channelSink = channelSink; - } + private RXTXChannelSink channelSink; - public void serialEvent(final SerialPortEvent event) { - log.trace("{}", event); - switch (event.getEventType()) { - case SerialPortEvent.DATA_AVAILABLE: - try { - if (channelSink.inputStream != null && channelSink.inputStream.available() > 0) { - int available = channelSink.inputStream.available(); - byte[] buffer = new byte[available]; - int read = channelSink.inputStream.read(buffer); - if (read > 0) { - ChannelBuffer channelBuffer = ChannelBuffers.wrappedBuffer(buffer, 0, read); - UpstreamMessageEvent upstreamMessageEvent = new UpstreamMessageEvent( - channelSink.channel, - channelBuffer, - channelSink.getRemoteAddress() - ); - log.trace("read from stream: {}", StringUtils.toHexString(channelBuffer)); - channelSink.channel.getPipeline().sendUpstream(upstreamMessageEvent); + public RXTXSerialPortEventListener(final RXTXChannelSink channelSink) { + this.channelSink = channelSink; + } + public void serialEvent(final SerialPortEvent event) { + log.trace("{}", event); + switch (event.getEventType()) { + case SerialPortEvent.DATA_AVAILABLE: + try { + if (channelSink.inputStream != null && channelSink.inputStream.available() > 0) { + int available = channelSink.inputStream.available(); + byte[] buffer = new byte[available]; + int read = channelSink.inputStream.read(buffer); + if (read > 0) { + ChannelBuffer channelBuffer = ChannelBuffers.wrappedBuffer(buffer, 0, read); + UpstreamMessageEvent upstreamMessageEvent = new UpstreamMessageEvent( + channelSink.channel, + channelBuffer, + channelSink.getRemoteAddress() + ); + log.trace("read from stream: {}", StringUtils.toHexString(channelBuffer)); + channelSink.channel.getPipeline().sendUpstream(upstreamMessageEvent); + } } + } catch (IOException e) { + log.error("" + e, e); + channelSink.channel.close(); } - } catch (IOException e) { - log.error("" + e, e); - channelSink.channel.close(); - } - break; + break; + } } } -} -private RXTXDeviceAddress remoteAddress; + private RXTXDeviceAddress remoteAddress; -private BufferedOutputStream outputStream; + private BufferedOutputStream outputStream; -private BufferedInputStream inputStream; + private BufferedInputStream inputStream; -private SerialPort serialPort; + private SerialPort serialPort; -private volatile boolean closed = false; -public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + private volatile boolean closed = false; - final ChannelFuture future = e.getFuture(); + public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { - if (e instanceof ChannelStateEvent) { + final ChannelFuture future = e.getFuture(); - final ChannelStateEvent stateEvent = (ChannelStateEvent) e; - final ChannelState state = stateEvent.getState(); - final Object value = stateEvent.getValue(); + if (e instanceof ChannelStateEvent) { - switch (state) { - - case OPEN: - if (Boolean.FALSE.equals(value)) { - executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this)); - } - break; + final ChannelStateEvent stateEvent = (ChannelStateEvent) e; + final ChannelState state = stateEvent.getState(); + final Object value = stateEvent.getValue(); - case BOUND: - throw new UnsupportedOperationException(); + switch (state) { - case CONNECTED: - if (value != null) { - remoteAddress = (RXTXDeviceAddress) value; - executor.execute(new ConnectRunnable((DefaultChannelFuture) future, this, pipeline.getChannel(), remoteAddress)); - } else { - executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this)); - } - break; + case OPEN: + if (Boolean.FALSE.equals(value)) { + executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this)); + } + break; - case INTEREST_OPS: - throw new UnsupportedOperationException(); + case BOUND: + throw new UnsupportedOperationException(); - } + case CONNECTED: + if (value != null) { + remoteAddress = (RXTXDeviceAddress) value; + executor.execute(new ConnectRunnable((DefaultChannelFuture) future, this)); + } else { + executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this)); + } + break; + case INTEREST_OPS: + throw new UnsupportedOperationException(); - } else if (e instanceof MessageEvent) { + } - final MessageEvent event = (MessageEvent) e; - if (event.getMessage() instanceof ChannelBuffer) { - executor.execute( - new WriteRunnable((DefaultChannelFuture) future, this, (ChannelBuffer) event.getMessage()) - ); - } else { - throw new IllegalArgumentException( - "Only ChannelBuffer objects are supported to be written onto the RXTXChannelSink! " - + "Please check if the encoder pipeline is configured correctly." - ); + } else if (e instanceof MessageEvent) { + + final MessageEvent event = (MessageEvent) e; + if (event.getMessage() instanceof ChannelBuffer) { + executor.execute( + new WriteRunnable((DefaultChannelFuture) future, this, (ChannelBuffer) event.getMessage()) + ); + } else { + throw new IllegalArgumentException( + "Only ChannelBuffer objects are supported to be written onto the RXTXChannelSink! " + + "Please check if the encoder pipeline is configured correctly." + ); + } } } -} -} +} \ No newline at end of file From faa71a5ea3013da43511939b4e0bf11f83bd7622 Mon Sep 17 00:00:00 2001 From: Dimitrios Amaxilatis Date: Tue, 25 Oct 2011 12:08:50 +0300 Subject: [PATCH 3/3] call Channels.fireChannelConnected upon a new connection --- .../de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java b/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java index 066fc11..8ad6b6d 100644 --- a/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java +++ b/src/main/java/de/uniluebeck/itm/nettyrxtx/RXTXChannelSink.java @@ -33,6 +33,7 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.IOException; +import java.net.SocketAddress; import java.util.TooManyListenersException; import java.util.concurrent.Executor; @@ -75,8 +76,12 @@ private static class ConnectRunnable implements Runnable { private DefaultChannelFuture channelFuture; private RXTXChannelSink channelSink; + private Channel channel; + private SocketAddress remoteAddress; - private ConnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink) { + private ConnectRunnable(final DefaultChannelFuture channelFuture, final RXTXChannelSink channelSink, Channel channel, SocketAddress remoteAddress) { + this.channel = channel; + this.remoteAddress = remoteAddress; this.channelFuture = channelFuture; this.channelSink = channelSink; } @@ -90,6 +95,9 @@ public void run() { connectInternal(); log.debug("Successfully connected."); channelFuture.setSuccess(); + Channels.fireChannelConnected(channel, remoteAddress); + log.debug("Fired Channel Connected."); + } catch (Exception e) { log.warn("" + e, e); channelFuture.setFailure(e); @@ -317,7 +325,7 @@ public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) thro case CONNECTED: if (value != null) { remoteAddress = (RXTXDeviceAddress) value; - executor.execute(new ConnectRunnable((DefaultChannelFuture) future, this)); + executor.execute(new ConnectRunnable((DefaultChannelFuture) future, this, pipeline.getChannel(), remoteAddress)); } else { executor.execute(new DisconnectRunnable((DefaultChannelFuture) future, this)); }