diff --git a/.gitignore b/.gitignore
index 541e887..41109ad 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,4 @@
/.project
/.settings
+*.iml
+*~
diff --git a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataConsumerImplTest.java b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataConsumerImplTest.java
index 80f99cc..3baa66e 100644
--- a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataConsumerImplTest.java
+++ b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataConsumerImplTest.java
@@ -21,8 +21,8 @@
import org.junit.Before;
import org.junit.Test;
-import org.risbic.intraconnect.jms.mock.MockJMSDataFlowNode;
-import org.risbic.intraconnect.jms.mock.MockJMSDataProvider;
+import org.risbic.transport.jms.mock.MockJMSDataFlowNode;
+import org.risbic.transport.jms.mock.MockJMSDataProvider;
import static org.junit.Assert.assertEquals;
diff --git a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataProviderImplTest.java b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataProviderImplTest.java
index 6be6fed..4376a09 100644
--- a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataProviderImplTest.java
+++ b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataProviderImplTest.java
@@ -23,7 +23,7 @@
import java.util.List;
import org.junit.Before;
import org.junit.Test;
-import org.risbic.intraconnect.jms.mock.MockJMSDataFlowNode;
+import org.risbic.transport.jms.mock.MockJMSDataFlowNode;
/**
* @author Martyn Taylor
diff --git a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataTransportFlowTest.java b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataTransportFlowTest.java
index 4977e61..f03abfd 100644
--- a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataTransportFlowTest.java
+++ b/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/JMSDataTransportFlowTest.java
@@ -16,7 +16,7 @@
import java.util.ArrayList;
import java.util.List;
-import org.risbic.intraconnect.jms.mock.MockJMSDataFlowNode;
+import org.risbic.transport.jms.mock.MockJMSDataFlowNode;
import org.junit.Before;
diff --git a/jms-data-transport/.gitignore b/jms-data-transport/.gitignore
new file mode 100644
index 0000000..9b08022
--- /dev/null
+++ b/jms-data-transport/.gitignore
@@ -0,0 +1,4 @@
+/.project
+/.classpath
+/.settings
+/target
diff --git a/jms-data-transport/README.md b/jms-data-transport/README.md
new file mode 100644
index 0000000..8555fd3
--- /dev/null
+++ b/jms-data-transport/README.md
@@ -0,0 +1,60 @@
+Running JMS Data Transport Tests
+================================
+
+Download and Install WildFly 8
+-------------------------------
+
+1. Download WildFly here: http://wildfly.org/downloads/
+2. Decompress to your desired folder
+3. Set JBOSS_HOME=
+
+Configure WildFly
+-----------------
+
+1. Add new user
+
+```bash
+ cd $JBOSS_HOME
+ ./bin/add_user.sh
+
+ What type of user do you wish to add?
+ a) Management User (mgmt-users.properties)
+ b) Application User (application-users.properties)
+ (a): a
+
+ Username : jms
+ Password : password
+
+ About to add user 'jms' for realm 'ManagementRealm'
+ Is this correct yes/no? yes
+
+ Is this new user going to be used for one AS process to connect to another AS process?
+ e.g. for a slave host controller connecting to the master or for a Remoting connection for server to server EJB calls.
+ yes/no? no
+```
+
+2. Add Risbic Topic
+
+Add the following XML snippet to $JBOSS_HOME/standalone/configuration/standalone-full.xml as a child element of
+
+```xml
+
+
+
+```
+
+Start Wildfly
+--------------
+
+```bash
+ cd $JBOSS_HOME
+ ./bin/standalone.sh -c standalone-full.xml
+```
+
+Run Tests
+----------
+
+```bash
+ cd
+ mvn clean test
+```
diff --git a/jms-data-transport/intraconnect-basic.iml b/jms-data-transport/intraconnect-basic.iml
new file mode 100644
index 0000000..ec11de8
--- /dev/null
+++ b/jms-data-transport/intraconnect-basic.iml
@@ -0,0 +1,68 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/jms-data-transport/jms-data-transport.iml b/jms-data-transport/jms-data-transport.iml
new file mode 100644
index 0000000..4aa1353
--- /dev/null
+++ b/jms-data-transport/jms-data-transport.iml
@@ -0,0 +1,67 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/jms-data-transport/pom.xml b/jms-data-transport/pom.xml
new file mode 100644
index 0000000..110ad9b
--- /dev/null
+++ b/jms-data-transport/pom.xml
@@ -0,0 +1,84 @@
+
+
+
+
+
+
+
+
+
+ 4.0.0
+
+
+ org.risbic
+ dataflow-utils
+ 1.0.0p1m1
+
+
+ jms-data-transport
+
+ jar
+
+ JMS Data Transport Classes
+
+
+
+ com.arjuna.databroker
+ data-common
+ 1.0.0p1m1
+ provided
+
+
+ javax.jms
+ jms
+ 1.1
+
+
+ org.jboss.spec
+ jboss-javaee-6.0
+ 1.0.0.Final
+ pom
+ provided
+
+
+ junit
+ junit
+ 4.8.1
+ test
+
+
+ org.jboss.logging
+ jboss-logging
+ 3.1.0.GA
+
+
+ org.hornetq
+ hornetq-jms-client
+ 2.4.3.Final
+
+
+ org.wildfly
+ wildfly-ejb-client-bom
+ 8.1.0.Final
+ pom
+ compile
+
+
+
+
+
+
+ maven-surefire-plugin
+ 2.12
+
+
+
+ java.util.logging.manager
+ org.jboss.logmanager.LogManager
+
+
+
+
+
+
+
diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/AbstractJMSDataTransport.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/AbstractJMSDataTransport.java
new file mode 100644
index 0000000..56686d0
--- /dev/null
+++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/AbstractJMSDataTransport.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.risbic.transport.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import java.io.Serializable;
+
+import org.jboss.logging.Logger;
+
+/**
+ * @author Martyn Taylor
+ */
+
+public abstract class AbstractJMSDataTransport implements JMSDataTransportLifeCycle
+{
+ private static final Logger logger = Logger.getLogger(AbstractJMSDataTransport.class.getName());
+
+ protected ConnectionFactory connectionFactory;
+
+ protected Connection connection;
+
+ protected Session session;
+
+ protected Destination destination;
+
+ protected String username;
+
+ protected String password;
+
+ public AbstractJMSDataTransport(ConnectionFactory connectionFactory, Destination destination, String username,String password)
+ {
+ this.destination = destination;
+ this.connectionFactory = connectionFactory;
+ this.username = username;
+ this.password = password;
+ }
+
+ @Override
+ public void start() throws JMSException
+ {
+ connection = connectionFactory.createConnection(username, password);
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Override
+ public void stop() throws JMSException
+ {
+ session.close();
+ connection.close();
+ }
+
+ @Override
+ public void destroy() throws JMSException
+ {
+ session = null;
+ connection = null;
+ }
+
+ @Override
+ public void restart() throws JMSException
+ {
+ stop();
+ start();
+ }
+}
diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataConsumerImpl.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataConsumerImpl.java
new file mode 100644
index 0000000..be33e77
--- /dev/null
+++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataConsumerImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.risbic.transport.jms;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import java.io.Serializable;
+
+import com.arjuna.databroker.data.DataConsumer;
+import com.arjuna.databroker.data.DataFlowNode;
+import com.arjuna.databroker.data.DataProvider;
+import org.jboss.logging.Logger;
+
+/**
+ * @author Martyn Taylor
+ */
+
+public class JMSDataConsumerImpl extends AbstractJMSDataTransport implements DataConsumer
+{
+ private static final Logger logger = Logger.getLogger(JMSDataConsumerImpl.class.getName());
+
+ private MessageConsumer consumer;
+
+ private DataFlowNode dfNode;
+
+ private String methodName;
+
+ private Class dataClass;
+
+ public JMSDataConsumerImpl(DataFlowNode dfNode, String methodName, Class dataClass, ConnectionFactory connectionFactory,
+ Destination destination, JMSDataProvider producer, String username, String password)
+ {
+ super(connectionFactory, destination, username, password);
+ this.dfNode = dfNode;
+ this.methodName = methodName;
+ this.dataClass = dataClass;
+ }
+
+ @Override
+ public DataFlowNode getDataFlowNode()
+ {
+ return null;
+ }
+
+ // We don't need this method in JMS. Consumption happens as messages appear in the destination.
+ @Override
+ public void consume(DataProvider dataProvider, Object data)
+ {
+ }
+
+ @Override
+ public void start() throws JMSException
+ {
+ super.start();
+ consumer = session.createConsumer(destination);
+ MessageListener messageListener = new MessageHandler();
+ consumer.setMessageListener(messageListener);
+ connection.start();
+ }
+
+ @Override
+ public void stop() throws JMSException
+ {
+ consumer.close();
+ session.unsubscribe(destination.toString());
+ super.stop();
+ }
+
+ @Override
+ public void destroy() throws JMSException
+ {
+ consumer = null;
+ super.destroy();
+ }
+
+ @Override
+ public void pause()
+ {
+ // Block the Message Handler?
+ }
+
+ private class MessageHandler implements MessageListener
+ {
+ private MessageHandler() {};
+
+ @Override
+ public void onMessage(Message message)
+ {
+ try
+ {
+ if (message instanceof ObjectMessage)
+ {
+ Object data = ((ObjectMessage) message).getObject();
+ Util.invokeMethod(dfNode, methodName, data);
+ }
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProvider.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProvider.java
new file mode 100644
index 0000000..2069896
--- /dev/null
+++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProvider.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.risbic.transport.jms;
+
+import java.util.UUID;
+
+import com.arjuna.databroker.data.DataProvider;
+
+/**
+ * @author Martyn Taylor
+ */
+
+public interface JMSDataProvider extends DataProvider, JMSDataTransportLifeCycle
+{
+ public UUID getId();
+}
diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProviderImpl.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProviderImpl.java
new file mode 100644
index 0000000..f8c6998
--- /dev/null
+++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataProviderImpl.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.risbic.transport.jms;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import java.io.Serializable;
+import java.util.UUID;
+
+import com.arjuna.databroker.data.DataFlowNode;
+import org.jboss.logging.Logger;
+
+/**
+ * @author Martyn Taylor
+ */
+
+public class JMSDataProviderImpl extends AbstractJMSDataTransport implements JMSDataProvider
+{
+ public static final String DATA_FLOW_NODE_ID = "DATA_FLOW_NODE_ID";
+
+ private static final Logger logger = Logger.getLogger(JMSDataProviderImpl.class.getName());
+
+ private MessageProducer producer;
+
+ private UUID id;
+
+ /**
+ * Creates a new JMSDataProvider. This data provider should be backed by a Durable Topic to which consumers will
+ * be dynamically subscribed. Message filters are used to send the correct messages to the Data Consumers. The
+ * filter is currently based on the dfNode name.
+ */
+ public JMSDataProviderImpl(ConnectionFactory connectionFactory, Destination destination, String username, String password)
+ {
+ super(connectionFactory, destination, username, password);
+ id = UUID.randomUUID();
+ }
+
+ @Override
+ public void produce(T data)
+ {
+ try
+ {
+ Message message = session.createObjectMessage(data);
+ message.setStringProperty(DATA_FLOW_NODE_ID, id.toString());
+ producer.send(message);
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return id;
+ }
+
+ @Override
+ public DataFlowNode getDataFlowNode()
+ {
+ return null;
+ }
+
+ // Life Cycle Methods
+ @Override
+ public void start() throws JMSException
+ {
+ super.start();
+ producer = session.createProducer(destination);
+ connection.start();
+ }
+
+ @Override
+ public void stop() throws JMSException
+ {
+ producer.close();
+ session.unsubscribe(destination.toString());
+ super.stop();
+ }
+
+ @Override
+ public void destroy() throws JMSException
+ {
+ producer = null;
+ super.destroy();
+ }
+
+ @Override
+ public void pause()
+ {
+
+ }
+}
diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataTransportLifeCycle.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataTransportLifeCycle.java
new file mode 100644
index 0000000..b4fb8d1
--- /dev/null
+++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/JMSDataTransportLifeCycle.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.risbic.transport.jms;
+
+/**
+ * @author Martyn Taylor
+ */
+
+public interface JMSDataTransportLifeCycle
+{
+ public void start() throws Exception;
+
+ public void stop() throws Exception;
+
+ public void pause() throws Exception;
+
+ public void restart() throws Exception;
+
+ public void destroy() throws Exception;
+}
diff --git a/jms-data-transport/src/main/java/org/risbic/transport/jms/Util.java b/jms-data-transport/src/main/java/org/risbic/transport/jms/Util.java
new file mode 100644
index 0000000..b84634f
--- /dev/null
+++ b/jms-data-transport/src/main/java/org/risbic/transport/jms/Util.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.risbic.transport.jms;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import com.arjuna.databroker.data.DataFlowNode;
+import org.jboss.logging.Logger;
+
+/**
+ * @author Martyn Taylor
+ */
+
+public class Util
+{
+ private static final Logger logger = Logger.getLogger(Util.class.getName());
+
+ public static void invokeMethod(DataFlowNode dfNode, String methodName, Object data)
+ {
+ try
+ {
+ getMethod(dfNode.getClass(), methodName).invoke(dfNode, data);
+ }
+ catch (Throwable throwable)
+ {
+ logger.warn("Problem invoking consumer", throwable);
+ throw new RuntimeException("Could not invoke method: " + methodName + " on Class: " +
+ dfNode.getClass(), throwable);
+ }
+ }
+
+ public static Method getMethod(Class> nodeClass, String nodeMethodName)
+ {
+ try
+ {
+ return nodeClass.getMethod(nodeMethodName, Serializable.class);
+ }
+ catch (Throwable throwable)
+ {
+ logger.warn("Unable to find method \"" + nodeMethodName + "\"", throwable);
+ throw new RuntimeException("Unable to find metohd", throwable);
+ }
+ }
+}
diff --git a/jms-data-transport/src/test/java/org/risbic/transport/jms/AbstractJMSDataTransportTest.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/AbstractJMSDataTransportTest.java
new file mode 100644
index 0000000..ecdb88e
--- /dev/null
+++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/AbstractJMSDataTransportTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.risbic.transport.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.junit.After;
+
+/**
+ * @author Martyn Taylor
+ */
+
+public abstract class AbstractJMSDataTransportTest extends TestCase
+{
+ protected ConnectionFactory connectionFactory;
+
+ protected Connection connection;
+
+ protected Session session;
+
+ protected Topic topic;
+
+ public void setUp() throws NamingException, JMSException, Exception
+ {
+ Properties jndiProperties = new Properties();
+ jndiProperties.put(Context.INITIAL_CONTEXT_FACTORY, org.jboss.naming.remote.client.InitialContextFactory.class.getName());
+ jndiProperties.put(Context.PROVIDER_URL, "http-remoting://localhost:8080");
+ jndiProperties.put(Context.SECURITY_PRINCIPAL, "jms");
+ jndiProperties.put(Context.SECURITY_CREDENTIALS, "password");
+ Context initialContext = new InitialContext(jndiProperties);
+
+ connectionFactory = (ConnectionFactory) initialContext.lookup("jms/RemoteConnectionFactory");
+ connection = connectionFactory.createConnection("jms", "password");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ topic = (Topic) initialContext.lookup("jms/topic/risbic");
+ }
+
+ @After
+ public void tearDown() throws JMSException
+ {
+ session.close();
+ connection.close();
+ }
+}
diff --git a/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataConsumerImplTest.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataConsumerImplTest.java
new file mode 100644
index 0000000..62e7a0c
--- /dev/null
+++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataConsumerImplTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.risbic.transport.jms;
+
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.risbic.transport.jms.mock.MockJMSDataFlowNode;
+import org.risbic.transport.jms.mock.MockJMSDataProvider;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Martyn Taylor
+ */
+
+public class JMSDataConsumerImplTest extends AbstractJMSDataTransportTest
+{
+ private MessageProducer producer;
+
+ @Override
+ @Before
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ producer = session.createProducer(topic);
+ }
+
+ @Test
+ public void testConsume() throws JMSException, InterruptedException
+ {
+ JMSDataProvider provider = new MockJMSDataProvider();
+ MockJMSDataFlowNode dfNode = new MockJMSDataFlowNode();
+ String methodName = "consume";
+ Class clazz = MockJMSDataFlowNode.class;
+
+ JMSDataConsumerImpl dataConsumer = new JMSDataConsumerImpl(dfNode, methodName, clazz, connectionFactory, topic, provider, "jms", "password");
+ dataConsumer.start();
+
+ String message = "Test Message";
+ List messages = new ArrayList();
+ for(int i=0; i<10; i++)
+ {
+ messages.add(message);
+ }
+ sendMessages(messages);
+
+ // Wait for messages to arrive
+ Thread.sleep(2000);
+ Assert.assertEquals(messages, dfNode.getData());
+ }
+
+ public void sendMessages(List messages) throws JMSException
+ {
+ for (Serializable message : messages)
+ {
+ producer.send(session.createObjectMessage(message));
+ }
+ }
+}
diff --git a/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataProviderImplTest.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataProviderImplTest.java
new file mode 100644
index 0000000..4f86eaf
--- /dev/null
+++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataProviderImplTest.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.risbic.transport.jms;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.risbic.transport.jms.mock.MockJMSDataFlowNode;
+
+/**
+ * @author Martyn Taylor
+ */
+
+public class JMSDataProviderImplTest extends AbstractJMSDataTransportTest
+{
+ private MessageConsumer consumer;
+
+ private TestMessageListener messageListener;
+
+ @Override
+ @Before
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ consumer = session.createConsumer(topic);
+ messageListener = new TestMessageListener();
+ consumer.setMessageListener(messageListener);
+ connection.start();
+ }
+
+ @Test
+ public void testConsume() throws JMSException, InterruptedException
+ {
+ MockJMSDataFlowNode dfNode = new MockJMSDataFlowNode();
+
+ JMSDataProviderImpl provider = new JMSDataProviderImpl(connectionFactory, topic, "jms", "password");
+ provider.start();
+
+ String message = "Test Message";
+ List messages = new ArrayList();
+ for(int i=0; i<10; i++)
+ {
+ provider.produce(message);
+ messages.add(message);
+ }
+
+ // Wait for messages to arrive to message listener.
+ Thread.sleep(2000);
+ assertEquals(messages, messageListener.getMessages());
+ }
+
+ private class TestMessageListener implements MessageListener
+ {
+ private List messages;
+
+ public TestMessageListener()
+ {
+ messages = new ArrayList();
+ }
+
+ @Override
+ public void onMessage(Message message)
+ {
+ try
+ {
+ ObjectMessage objectMessage = (ObjectMessage) message;
+ String messageContent = (String) objectMessage.getObject();
+ messages.add(messageContent);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public List getMessages()
+ {
+ return messages;
+ }
+ }
+}
diff --git a/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataTransportFlowTest.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataTransportFlowTest.java
new file mode 100644
index 0000000..cdcf6b5
--- /dev/null
+++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/JMSDataTransportFlowTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.risbic.transport.jms;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.risbic.transport.jms.mock.MockJMSDataFlowNode;
+
+import org.junit.Before;
+
+/**
+ * @author Martyn Taylor
+ */
+
+public class JMSDataTransportFlowTest extends AbstractJMSDataTransportTest
+{
+ @Before
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ public void testJMSDataProviderToJMSDataConsumerFlow() throws Exception
+ {
+ MockJMSDataFlowNode consumingNode = new MockJMSDataFlowNode();
+
+ JMSDataProvider dataProvider = new JMSDataProviderImpl(connectionFactory, topic, "jms","password");
+ JMSDataConsumerImpl dataConsumer = new JMSDataConsumerImpl(consumingNode, "consume", MockJMSDataFlowNode.class,
+ connectionFactory, topic, dataProvider, "jms", "password");
+
+ dataProvider.start();
+ dataConsumer.start();
+
+ List messages = new ArrayList();
+ for (int i=0; i<10; i++)
+ {
+ String message = "Test Message: " + i;
+ messages.add(message);
+ dataProvider.produce(message);
+ }
+
+ // Wait for messages to be received.
+ Thread.sleep(2000);
+ assertEquals(messages, consumingNode.getData());
+ }
+}
diff --git a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataFlowNode.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataFlowNode.java
similarity index 96%
rename from intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataFlowNode.java
rename to jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataFlowNode.java
index 0458bea..e587644 100644
--- a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataFlowNode.java
+++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataFlowNode.java
@@ -11,7 +11,7 @@
* permissions and limitations under the License.
*/
-package org.risbic.intraconnect.jms.mock;
+package org.risbic.transport.jms.mock;
import java.io.Serializable;
import java.util.ArrayList;
diff --git a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataProvider.java b/jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataProvider.java
similarity index 93%
rename from intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataProvider.java
rename to jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataProvider.java
index b5af7a4..03a9384 100644
--- a/intraconnect-basic/src/test/java/org/risbic/intraconnect/jms/mock/MockJMSDataProvider.java
+++ b/jms-data-transport/src/test/java/org/risbic/transport/jms/mock/MockJMSDataProvider.java
@@ -11,13 +11,13 @@
* permissions and limitations under the License.
*/
-package org.risbic.intraconnect.jms.mock;
+package org.risbic.transport.jms.mock;
import javax.jms.JMSException;
import java.util.UUID;
import com.arjuna.databroker.data.DataFlowNode;
-import org.risbic.intraconnect.jms.JMSDataProvider;
+import org.risbic.transport.jms.JMSDataProvider;
/**
* @author Martyn Taylor
diff --git a/pom.xml b/pom.xml
index 1311734..ee4f326 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,12 +1,5 @@
-
-
-
-
-
-
-
-
+
4.0.0
org.risbic
@@ -51,7 +44,8 @@
intraconnect-basic
- dataflow-utils-ear
+ dataflow-utils-ear
+ jms-data-transport