From 007cd3fa0b262d42f5c850776440a0c315850e1d Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Fri, 19 Sep 2014 10:26:55 +0100 Subject: [PATCH 1/2] Ignore Idea and Vim temp files --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 541e887..41109ad 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ /.project /.settings +*.iml +*~ From 7fb4cfdf962b505d741fbb5e3d2170ed02e2b2a0 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Fri, 19 Sep 2014 10:25:33 +0100 Subject: [PATCH 2/2] renamed JMS Transport --- .../jms/JMSDataConsumerImplTest.java | 4 +- .../jms/JMSDataProviderImplTest.java | 2 +- .../jms/JMSDataTransportFlowTest.java | 2 +- jms-data-transport/.gitignore | 4 + jms-data-transport/README.md | 60 +++++++++ jms-data-transport/intraconnect-basic.iml | 68 ++++++++++ jms-data-transport/jms-data-transport.iml | 67 ++++++++++ jms-data-transport/pom.xml | 84 +++++++++++++ .../jms/AbstractJMSDataTransport.java | 80 ++++++++++++ .../transport/jms/JMSDataConsumerImpl.java | 119 ++++++++++++++++++ .../risbic/transport/jms/JMSDataProvider.java | 27 ++++ .../transport/jms/JMSDataProviderImpl.java | 108 ++++++++++++++++ .../jms/JMSDataTransportLifeCycle.java | 31 +++++ .../java/org/risbic/transport/jms/Util.java | 55 ++++++++ .../jms/AbstractJMSDataTransportTest.java | 65 ++++++++++ .../jms/JMSDataConsumerImplTest.java | 77 ++++++++++++ .../jms/JMSDataProviderImplTest.java | 99 +++++++++++++++ .../jms/JMSDataTransportFlowTest.java | 58 +++++++++ .../jms/mock/MockJMSDataFlowNode.java | 2 +- .../jms/mock/MockJMSDataProvider.java | 4 +- pom.xml | 12 +- 21 files changed, 1012 insertions(+), 16 deletions(-) create mode 100644 jms-data-transport/.gitignore create mode 100644 jms-data-transport/README.md create mode 100644 jms-data-transport/intraconnect-basic.iml create mode 100644 jms-data-transport/jms-data-transport.iml create mode 100644 jms-data-transport/pom.xml create mode 100644 jms-data-transport/src/main/java/org/risbic/transport/jms/AbstractJMSDataTransport.java create mode 100644 jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataConsumerImpl.java create mode 100644 jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProvider.java create mode 100644 jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProviderImpl.java create mode 100644 jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataTransportLifeCycle.java create mode 100644 jms-data-transport/src/main/java/org/risbic/transport/jms/Util.java create mode 100644 jms-data-transport/src/test/java/org/risbic/transport/jms/AbstractJMSDataTransportTest.java create mode 100644 jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataConsumerImplTest.java create mode 100644 jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataProviderImplTest.java create mode 100644 jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataTransportFlowTest.java rename {intraconnect-basic/src/test/java/org/risbic/intraconnect => jms-data-transport/src/test/java/org/risbic/transport}/jms/mock/MockJMSDataFlowNode.java (96%) rename {intraconnect-basic/src/test/java/org/risbic/intraconnect => jms-data-transport/src/test/java/org/risbic/transport}/jms/mock/MockJMSDataProvider.java (93%) diff --git a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataConsumerImplTest.java b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataConsumerImplTest.java index 80f99cc..3baa66e 100644 --- a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataConsumerImplTest.java +++ b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataConsumerImplTest.java @@ -21,8 +21,8 @@ import org.junit.Before; import org.junit.Test; -import org.risbic.intraconnect.jms.mock.MockJMSDataFlowNode; -import org.risbic.intraconnect.jms.mock.MockJMSDataProvider; +import org.risbic.transport.jms.mock.MockJMSDataFlowNode; +import org.risbic.transport.jms.mock.MockJMSDataProvider; import static org.junit.Assert.assertEquals; diff --git a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataProviderImplTest.java b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataProviderImplTest.java index 6be6fed..4376a09 100644 --- a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataProviderImplTest.java +++ b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataProviderImplTest.java @@ -23,7 +23,7 @@ import java.util.List; import org.junit.Before; import org.junit.Test; -import org.risbic.intraconnect.jms.mock.MockJMSDataFlowNode; +import org.risbic.transport.jms.mock.MockJMSDataFlowNode; /** * @author Martyn Taylor diff --git a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataTransportFlowTest.java b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataTransportFlowTest.java index 4977e61..f03abfd 100644 --- a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataTransportFlowTest.java +++ b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataTransportFlowTest.java @@ -16,7 +16,7 @@ import java.util.ArrayList; import java.util.List; -import org.risbic.intraconnect.jms.mock.MockJMSDataFlowNode; +import org.risbic.transport.jms.mock.MockJMSDataFlowNode; import org.junit.Before; diff --git a/jms-data-transport/.gitignore b/jms-data-transport/.gitignore new file mode 100644 index 0000000..9b08022 --- /dev/null +++ b/jms-data-transport/.gitignore @@ -0,0 +1,4 @@ +/.project +/.classpath +/.settings +/target diff --git a/jms-data-transport/README.md b/jms-data-transport/README.md new file mode 100644 index 0000000..8555fd3 --- /dev/null +++ b/jms-data-transport/README.md @@ -0,0 +1,60 @@ +Running JMS Data Transport Tests +================================ + +Download and Install WildFly 8 +------------------------------- + +1. Download WildFly here: http://wildfly.org/downloads/ +2. Decompress to your desired folder +3. Set JBOSS_HOME= + +Configure WildFly +----------------- + +1. Add new user + +```bash + cd $JBOSS_HOME + ./bin/add_user.sh + + What type of user do you wish to add? + a) Management User (mgmt-users.properties) + b) Application User (application-users.properties) + (a): a + + Username : jms + Password : password + + About to add user 'jms' for realm 'ManagementRealm' + Is this correct yes/no? yes + + Is this new user going to be used for one AS process to connect to another AS process? + e.g. for a slave host controller connecting to the master or for a Remoting connection for server to server EJB calls. + yes/no? no +``` + +2. Add Risbic Topic + +Add the following XML snippet to $JBOSS_HOME/standalone/configuration/standalone-full.xml as a child element of + +```xml + + + +``` + +Start Wildfly +-------------- + +```bash + cd $JBOSS_HOME + ./bin/standalone.sh -c standalone-full.xml +``` + +Run Tests +---------- + +```bash + cd + mvn clean test +``` diff --git a/jms-data-transport/intraconnect-basic.iml b/jms-data-transport/intraconnect-basic.iml new file mode 100644 index 0000000..ec11de8 --- /dev/null +++ b/jms-data-transport/intraconnect-basic.iml @@ -0,0 +1,68 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/jms-data-transport/jms-data-transport.iml b/jms-data-transport/jms-data-transport.iml new file mode 100644 index 0000000..4aa1353 --- /dev/null +++ b/jms-data-transport/jms-data-transport.iml @@ -0,0 +1,67 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/jms-data-transport/pom.xml b/jms-data-transport/pom.xml new file mode 100644 index 0000000..110ad9b --- /dev/null +++ b/jms-data-transport/pom.xml @@ -0,0 +1,84 @@ + + + + + + + + + + 4.0.0 + + + org.risbic + dataflow-utils + 1.0.0p1m1 + + + jms-data-transport + + jar + + JMS Data Transport Classes + + + + com.arjuna.databroker + data-common + 1.0.0p1m1 + provided + + + javax.jms + jms + 1.1 + + + org.jboss.spec + jboss-javaee-6.0 + 1.0.0.Final + pom + provided + + + junit + junit + 4.8.1 + test + + + org.jboss.logging + jboss-logging + 3.1.0.GA + + + org.hornetq + hornetq-jms-client + 2.4.3.Final + + + org.wildfly + wildfly-ejb-client-bom + 8.1.0.Final + pom + compile + + + + + + + maven-surefire-plugin + 2.12 + + + + java.util.logging.manager + org.jboss.logmanager.LogManager + + + + + + + diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/AbstractJMSDataTransport.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/AbstractJMSDataTransport.java new file mode 100644 index 0000000..56686d0 --- /dev/null +++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/AbstractJMSDataTransport.java @@ -0,0 +1,80 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.risbic.transport.jms; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Session; +import java.io.Serializable; + +import org.jboss.logging.Logger; + +/** + * @author Martyn Taylor + */ + +public abstract class AbstractJMSDataTransport implements JMSDataTransportLifeCycle +{ + private static final Logger logger = Logger.getLogger(AbstractJMSDataTransport.class.getName()); + + protected ConnectionFactory connectionFactory; + + protected Connection connection; + + protected Session session; + + protected Destination destination; + + protected String username; + + protected String password; + + public AbstractJMSDataTransport(ConnectionFactory connectionFactory, Destination destination, String username,String password) + { + this.destination = destination; + this.connectionFactory = connectionFactory; + this.username = username; + this.password = password; + } + + @Override + public void start() throws JMSException + { + connection = connectionFactory.createConnection(username, password); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + @Override + public void stop() throws JMSException + { + session.close(); + connection.close(); + } + + @Override + public void destroy() throws JMSException + { + session = null; + connection = null; + } + + @Override + public void restart() throws JMSException + { + stop(); + start(); + } +} diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataConsumerImpl.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataConsumerImpl.java new file mode 100644 index 0000000..be33e77 --- /dev/null +++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataConsumerImpl.java @@ -0,0 +1,119 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.risbic.transport.jms; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import java.io.Serializable; + +import com.arjuna.databroker.data.DataConsumer; +import com.arjuna.databroker.data.DataFlowNode; +import com.arjuna.databroker.data.DataProvider; +import org.jboss.logging.Logger; + +/** + * @author Martyn Taylor + */ + +public class JMSDataConsumerImpl extends AbstractJMSDataTransport implements DataConsumer +{ + private static final Logger logger = Logger.getLogger(JMSDataConsumerImpl.class.getName()); + + private MessageConsumer consumer; + + private DataFlowNode dfNode; + + private String methodName; + + private Class dataClass; + + public JMSDataConsumerImpl(DataFlowNode dfNode, String methodName, Class dataClass, ConnectionFactory connectionFactory, + Destination destination, JMSDataProvider producer, String username, String password) + { + super(connectionFactory, destination, username, password); + this.dfNode = dfNode; + this.methodName = methodName; + this.dataClass = dataClass; + } + + @Override + public DataFlowNode getDataFlowNode() + { + return null; + } + + // We don't need this method in JMS. Consumption happens as messages appear in the destination. + @Override + public void consume(DataProvider dataProvider, Object data) + { + } + + @Override + public void start() throws JMSException + { + super.start(); + consumer = session.createConsumer(destination); + MessageListener messageListener = new MessageHandler(); + consumer.setMessageListener(messageListener); + connection.start(); + } + + @Override + public void stop() throws JMSException + { + consumer.close(); + session.unsubscribe(destination.toString()); + super.stop(); + } + + @Override + public void destroy() throws JMSException + { + consumer = null; + super.destroy(); + } + + @Override + public void pause() + { + // Block the Message Handler? + } + + private class MessageHandler implements MessageListener + { + private MessageHandler() {}; + + @Override + public void onMessage(Message message) + { + try + { + if (message instanceof ObjectMessage) + { + Object data = ((ObjectMessage) message).getObject(); + Util.invokeMethod(dfNode, methodName, data); + } + } + catch(Exception e) + { + throw new RuntimeException(e); + } + } + } +} diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProvider.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProvider.java new file mode 100644 index 0000000..2069896 --- /dev/null +++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProvider.java @@ -0,0 +1,27 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.risbic.transport.jms; + +import java.util.UUID; + +import com.arjuna.databroker.data.DataProvider; + +/** + * @author Martyn Taylor + */ + +public interface JMSDataProvider extends DataProvider, JMSDataTransportLifeCycle +{ + public UUID getId(); +} diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProviderImpl.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProviderImpl.java new file mode 100644 index 0000000..f8c6998 --- /dev/null +++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProviderImpl.java @@ -0,0 +1,108 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.risbic.transport.jms; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import java.io.Serializable; +import java.util.UUID; + +import com.arjuna.databroker.data.DataFlowNode; +import org.jboss.logging.Logger; + +/** + * @author Martyn Taylor + */ + +public class JMSDataProviderImpl extends AbstractJMSDataTransport implements JMSDataProvider +{ + public static final String DATA_FLOW_NODE_ID = "DATA_FLOW_NODE_ID"; + + private static final Logger logger = Logger.getLogger(JMSDataProviderImpl.class.getName()); + + private MessageProducer producer; + + private UUID id; + + /** + * Creates a new JMSDataProvider. This data provider should be backed by a Durable Topic to which consumers will + * be dynamically subscribed. Message filters are used to send the correct messages to the Data Consumers. The + * filter is currently based on the dfNode name. + */ + public JMSDataProviderImpl(ConnectionFactory connectionFactory, Destination destination, String username, String password) + { + super(connectionFactory, destination, username, password); + id = UUID.randomUUID(); + } + + @Override + public void produce(T data) + { + try + { + Message message = session.createObjectMessage(data); + message.setStringProperty(DATA_FLOW_NODE_ID, id.toString()); + producer.send(message); + } + catch(Exception e) + { + throw new RuntimeException(e); + } + } + + @Override + public UUID getId() + { + return id; + } + + @Override + public DataFlowNode getDataFlowNode() + { + return null; + } + + // Life Cycle Methods + @Override + public void start() throws JMSException + { + super.start(); + producer = session.createProducer(destination); + connection.start(); + } + + @Override + public void stop() throws JMSException + { + producer.close(); + session.unsubscribe(destination.toString()); + super.stop(); + } + + @Override + public void destroy() throws JMSException + { + producer = null; + super.destroy(); + } + + @Override + public void pause() + { + + } +} diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataTransportLifeCycle.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataTransportLifeCycle.java new file mode 100644 index 0000000..b4fb8d1 --- /dev/null +++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataTransportLifeCycle.java @@ -0,0 +1,31 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.risbic.transport.jms; + +/** + * @author Martyn Taylor + */ + +public interface JMSDataTransportLifeCycle +{ + public void start() throws Exception; + + public void stop() throws Exception; + + public void pause() throws Exception; + + public void restart() throws Exception; + + public void destroy() throws Exception; +} diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/Util.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/Util.java new file mode 100644 index 0000000..b84634f --- /dev/null +++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/Util.java @@ -0,0 +1,55 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.risbic.transport.jms; + +import java.io.Serializable; +import java.lang.reflect.Method; +import com.arjuna.databroker.data.DataFlowNode; +import org.jboss.logging.Logger; + +/** + * @author Martyn Taylor + */ + +public class Util +{ + private static final Logger logger = Logger.getLogger(Util.class.getName()); + + public static void invokeMethod(DataFlowNode dfNode, String methodName, Object data) + { + try + { + getMethod(dfNode.getClass(), methodName).invoke(dfNode, data); + } + catch (Throwable throwable) + { + logger.warn("Problem invoking consumer", throwable); + throw new RuntimeException("Could not invoke method: " + methodName + " on Class: " + + dfNode.getClass(), throwable); + } + } + + public static Method getMethod(Class nodeClass, String nodeMethodName) + { + try + { + return nodeClass.getMethod(nodeMethodName, Serializable.class); + } + catch (Throwable throwable) + { + logger.warn("Unable to find method \"" + nodeMethodName + "\"", throwable); + throw new RuntimeException("Unable to find metohd", throwable); + } + } +} diff --git a/jms-data-transport/src/test/java/org/risbic/transport/jms/AbstractJMSDataTransportTest.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/AbstractJMSDataTransportTest.java new file mode 100644 index 0000000..ecdb88e --- /dev/null +++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/AbstractJMSDataTransportTest.java @@ -0,0 +1,65 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.risbic.transport.jms; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.Topic; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Properties; + +import junit.framework.TestCase; + +import org.junit.After; + +/** + * @author Martyn Taylor + */ + +public abstract class AbstractJMSDataTransportTest extends TestCase +{ + protected ConnectionFactory connectionFactory; + + protected Connection connection; + + protected Session session; + + protected Topic topic; + + public void setUp() throws NamingException, JMSException, Exception + { + Properties jndiProperties = new Properties(); + jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, org.jboss.naming.remote.client.InitialContextFactory.class.getName()); + jndiProperties.put(Context.PROVIDER_URL, "http-remoting://localhost:8080"); + jndiProperties.put(Context.SECURITY_PRINCIPAL, "jms"); + jndiProperties.put(Context.SECURITY_CREDENTIALS, "password"); + Context initialContext = new InitialContext(jndiProperties); + + connectionFactory = (ConnectionFactory) initialContext.lookup("jms/RemoteConnectionFactory"); + connection = connectionFactory.createConnection("jms", "password"); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + topic = (Topic) initialContext.lookup("jms/topic/risbic"); + } + + @After + public void tearDown() throws JMSException + { + session.close(); + connection.close(); + } +} diff --git a/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataConsumerImplTest.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataConsumerImplTest.java new file mode 100644 index 0000000..62e7a0c --- /dev/null +++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataConsumerImplTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.risbic.transport.jms; + +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.risbic.transport.jms.mock.MockJMSDataFlowNode; +import org.risbic.transport.jms.mock.MockJMSDataProvider; + +import static org.junit.Assert.assertEquals; + +/** + * @author Martyn Taylor + */ + +public class JMSDataConsumerImplTest extends AbstractJMSDataTransportTest +{ + private MessageProducer producer; + + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + producer = session.createProducer(topic); + } + + @Test + public void testConsume() throws JMSException, InterruptedException + { + JMSDataProvider provider = new MockJMSDataProvider(); + MockJMSDataFlowNode dfNode = new MockJMSDataFlowNode(); + String methodName = "consume"; + Class clazz = MockJMSDataFlowNode.class; + + JMSDataConsumerImpl dataConsumer = new JMSDataConsumerImpl(dfNode, methodName, clazz, connectionFactory, topic, provider, "jms", "password"); + dataConsumer.start(); + + String message = "Test Message"; + List messages = new ArrayList(); + for(int i=0; i<10; i++) + { + messages.add(message); + } + sendMessages(messages); + + // Wait for messages to arrive + Thread.sleep(2000); + Assert.assertEquals(messages, dfNode.getData()); + } + + public void sendMessages(List messages) throws JMSException + { + for (Serializable message : messages) + { + producer.send(session.createObjectMessage(message)); + } + } +} diff --git a/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataProviderImplTest.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataProviderImplTest.java new file mode 100644 index 0000000..4f86eaf --- /dev/null +++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataProviderImplTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.risbic.transport.jms; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.ObjectMessage; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import org.risbic.transport.jms.mock.MockJMSDataFlowNode; + +/** + * @author Martyn Taylor + */ + +public class JMSDataProviderImplTest extends AbstractJMSDataTransportTest +{ + private MessageConsumer consumer; + + private TestMessageListener messageListener; + + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + consumer = session.createConsumer(topic); + messageListener = new TestMessageListener(); + consumer.setMessageListener(messageListener); + connection.start(); + } + + @Test + public void testConsume() throws JMSException, InterruptedException + { + MockJMSDataFlowNode dfNode = new MockJMSDataFlowNode(); + + JMSDataProviderImpl provider = new JMSDataProviderImpl(connectionFactory, topic, "jms", "password"); + provider.start(); + + String message = "Test Message"; + List messages = new ArrayList(); + for(int i=0; i<10; i++) + { + provider.produce(message); + messages.add(message); + } + + // Wait for messages to arrive to message listener. + Thread.sleep(2000); + assertEquals(messages, messageListener.getMessages()); + } + + private class TestMessageListener implements MessageListener + { + private List messages; + + public TestMessageListener() + { + messages = new ArrayList(); + } + + @Override + public void onMessage(Message message) + { + try + { + ObjectMessage objectMessage = (ObjectMessage) message; + String messageContent = (String) objectMessage.getObject(); + messages.add(messageContent); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + public List getMessages() + { + return messages; + } + } +} diff --git a/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataTransportFlowTest.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataTransportFlowTest.java new file mode 100644 index 0000000..cdcf6b5 --- /dev/null +++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataTransportFlowTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.risbic.transport.jms; + +import java.util.ArrayList; +import java.util.List; + +import org.risbic.transport.jms.mock.MockJMSDataFlowNode; + +import org.junit.Before; + +/** + * @author Martyn Taylor + */ + +public class JMSDataTransportFlowTest extends AbstractJMSDataTransportTest +{ + @Before + public void setUp() throws Exception + { + super.setUp(); + } + + public void testJMSDataProviderToJMSDataConsumerFlow() throws Exception + { + MockJMSDataFlowNode consumingNode = new MockJMSDataFlowNode(); + + JMSDataProvider dataProvider = new JMSDataProviderImpl(connectionFactory, topic, "jms","password"); + JMSDataConsumerImpl dataConsumer = new JMSDataConsumerImpl(consumingNode, "consume", MockJMSDataFlowNode.class, + connectionFactory, topic, dataProvider, "jms", "password"); + + dataProvider.start(); + dataConsumer.start(); + + List messages = new ArrayList(); + for (int i=0; i<10; i++) + { + String message = "Test Message: " + i; + messages.add(message); + dataProvider.produce(message); + } + + // Wait for messages to be received. + Thread.sleep(2000); + assertEquals(messages, consumingNode.getData()); + } +} diff --git a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataFlowNode.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataFlowNode.java similarity index 96% rename from intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataFlowNode.java rename to jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataFlowNode.java index 0458bea..e587644 100644 --- a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataFlowNode.java +++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataFlowNode.java @@ -11,7 +11,7 @@ * permissions and limitations under the License. */ -package org.risbic.intraconnect.jms.mock; +package org.risbic.transport.jms.mock; import java.io.Serializable; import java.util.ArrayList; diff --git a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataProvider.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataProvider.java similarity index 93% rename from intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataProvider.java rename to jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataProvider.java index b5af7a4..03a9384 100644 --- a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataProvider.java +++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataProvider.java @@ -11,13 +11,13 @@ * permissions and limitations under the License. */ -package org.risbic.intraconnect.jms.mock; +package org.risbic.transport.jms.mock; import javax.jms.JMSException; import java.util.UUID; import com.arjuna.databroker.data.DataFlowNode; -import org.risbic.intraconnect.jms.JMSDataProvider; +import org.risbic.transport.jms.JMSDataProvider; /** * @author Martyn Taylor diff --git a/pom.xml b/pom.xml index 1311734..ee4f326 100644 --- a/pom.xml +++ b/pom.xml @@ -1,12 +1,5 @@ - - - - - - - - + 4.0.0 org.risbic @@ -51,7 +44,8 @@ intraconnect-basic - dataflow-utils-ear + dataflow-utils-ear + jms-data-transport