Friday, 8 December 2017

Wildfly Configure Hornetq to receive bytes message from remote queue

First step is to install Hornetq and configure a queue in /config/stand-alone/non-clustered/hornetq-jms.xml :

   <queue name="testQueue">
       <entry name="queue/testQueue"/>
       <durable>false</durable>
   </queue>

In same file we can change the remote port specified in netty connector.

Keep all properties inside a JmsContext:

public class JmsContext {

    public static final String FILE_NAME_ATTRIBUTE = "FILE_NAME";
    public static final String FILE_CODE_ATTRIBUTE = "FILE_CODE";

    private String initialContextFactory;
    private String remoteConnectionfactoryUrl;
    private String importQueueUrl;

    private String serverUrl;
    private String userName;
    private String password";

    private String fileName;
    private String fileCode;
    private byte[] bytes;
    // getters and setters
}

Create a JmsHelper class to send the message:

import org.apache.log4j.Logger;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.File;
import java.util.Properties;

public class JmsHelper {

    private static final Logger LOG = Logger.getLogger(JmsHelper.class);

    public static void sendMessage(JmsContext jmsContext) {

        Connection connection = null;
        Session session = null;
        MessageProducer msgProducer = null;

        try {

            LOG.info("Running " + JmsHelper.class.toString() + 
                     " on current path " + new File(".").getAbsolutePath());
            LOG.info("Publishing " + jmsContext.getFileName() + 
                     " to destination '" + jmsContext.getImportQueueUrl() + "'\n");

            final Properties initialContextProperties = new Properties();
            initialContextProperties.put(Context.INITIAL_CONTEXT_FACTORY, 
                     jmsContext.getInitialContextFactory());
            initialContextProperties.put(Context.PROVIDER_URL, 
                     jmsContext.getServerUrl());
            initialContextProperties.put(Context.SECURITY_PRINCIPAL, 
                     jmsContext.getUserName());
            initialContextProperties.put(Context.SECURITY_CREDENTIALS, 
                     jmsContext.getPassword());

            final InitialContext ic = new InitialContext(initialContextProperties);
            LOG.info("Initial context created.");
            ConnectionFactory factory = (ConnectionFactory)ic.lookup(
                              jmsContext.getRemoteConnectionfactoryUrl());
            LOG.info("Connection " + jmsContext.getRemoteConnectionfactoryUrl() + 
                              "factory acquired.");

            final Queue queue = (Queue) ic.lookup(jmsContext.getImportQueueUrl());
            LOG.info("Found queue: " + jmsContext.getImportQueueUrl());

            connection = factory.createConnection(jmsContext.getUserName(), 
                             jmsContext.getPassword());
            session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
            msgProducer = session.createProducer(queue);

            BytesMessage msg = session.createBytesMessage();
            msg.setStringProperty(ImportJmsContext.FILE_NAME_ATTRIBUTE, 
                             jmsContext.getFileName());
            msg.setStringProperty(ImportJmsContext.FILE_CODE_ATTRIBUTE, 
                             jmsContext.getFileCode());

            msg.writeBytes(jmsContext.getBytes());

            msgProducer.send(msg);
            LOG.info("Bytes file message was published to server.");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Create the test class that connects directly to the Hornetq server (see server url and inital context factory class):

import javax.jms.JMSException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

public class JmsHornetqTest {

    private String fileName = "...";
    private String filePath = "...";
    private String fileCode = "E1";
    private String serverUrl = "jnp://localhost:1099";//"remote://localhost:5445";    
    private String initialContextFactory = "org.jnp.interfaces.NamingContextFactory";
    private String remoteConnectionFactoryUrl = "/ConnectionFactory";
    private String hornetqUser = "guest";
    private String hornetqPassword = "guest";
    private String hornetqImportQueueUrl = "/queue/testQueue";

    public void sendMessage() {

        try {

            ImportJmsContext jmsContext = new ImportJmsContext();
            jmsContext.setServerUrl(serverUrl);
            jmsContext.setUserName(hornetqUser);
            jmsContext.setPassword(hornetqPassword);
            jmsContext.setRemoteConnectionfactoryUrl(remoteConnectionFactoryUrl);
            jmsContext.setInitialContextFactory(initialContextFactory);
            jmsContext.setImportQueueUrl(hornetqImportQueueUrl);
            jmsContext.setFileName(fileName);
            jmsContext.setFileCode(fileCode);
            jmsContext.setBytes(getBytes());

            ImportJmsHelper.sendMessage(jmsContext);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private byte[] getBytes() throws IOException {
        Path fileLocation = Paths.get(filePath + File.separator + fileName);
        return Files.readAllBytes(fileLocation);
    }

    public static void main(String[] args) throws JMSException {
        new JmsHornetqTest().sendMessage();
    }
}

Now the only thing we need to do is to configure (jms-context.xml) the hornetq in our Wildfly server to listen and receive the bytes message (we do not specify the destination , but the destination name from hornetq server and a destination resolver):

<bean id="jmsHQImportConnectionFactory" lazy-init="true" 
            class="org.hornetq.jms.client.HornetQConnectionFactory">
    <constructor-arg value="true"/>
    <constructor-arg>
        <list>
            <bean class="org.hornetq.api.core.TransportConfiguration">
                <constructor-arg value="org.hornetq.core.remoting.impl.netty.NettyConnectorFactory"/>
                <constructor-arg>
                    <map key-type="java.lang.String" value-type="java.lang.Object">                        
                        <entry key="host" value="localhost"/>
                        <entry key="port" value="5445"/>
                    </map>
                </constructor-arg>
            </bean>

            <!-- more beans if we have backup servers -->
        </list>
    </constructor-arg>

    <property name="reconnectAttempts" value="-1"/>
    <property name="failoverOnInitialConnection" value="true"/>
    <property name="initialConnectAttempts" value="15"/>
    <property name="consumerWindowSize" value="0"/>
</bean>
<bean id="jmsHQImportConnectionFactoryProxy" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
    <property name="targetConnectionFactory" ref="jmsHQImportConnectionFactory" />
    <property name="username" value="guest" />
    <property name="password" value="guest" />
</bean>
<bean id="jmsHQImportTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="jmsHQImportConnectionFactoryProxy"/>
</bean>
<bean id="jmsHQImportDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
    <property name="cache" value="true"/>
    <property name="fallbackToDynamicDestination" value="true"/>
</bean>
<bean id="jmsHQImportMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="concurrentConsumers" value="1"/>
    <property name="connectionFactory" ref="jmsHQImportConnectionFactoryProxy"/>
    <property name="transactionManager" ref="jmsHQImportTransactionManager"/>
    <property name="destinationName" value="testQueue"/>
    <property name="destinationResolver" ref="jmsHQImportDestinationResolver"/>
    <property name="messageListener" ref="jmsImportMessageListener"/>
</bean>