How to Integrate RabbitMQ with webMethods?

How to Integrate RabbitMQ with webMethods?

RabbitMq Web-methods Integration

RabbitMQ, a message broker, is enterprise-level message-queuing software. It’s equipped with multiple features for reliable delivery, routing, and federation to cater to extensible business requirements beyond the throughput. RabbitMQ currently powers 35000+ projects for startups and large enterprises. The fact that it can implement AMQP, an open wire protocol for messaging with powerful routing features, is what makes RabbitMQ highly popular for an open-source messaging queuing broker. It’s one of the earliest enterprise-grade messaging software to achieve quality compliance in terms of features, dev tools, client libraries, and quality documentation.

Java has always had messaging standards like JMS. However, it was a pain to find the right message broker for non-Java applications, which had distributed messaging, but limited to integration scenarios, monolithic or microservices. With the advent of AMQP, cross-language flexibility has become feasible for open-source message brokers.

Guide: RabbitMQ Message Broker Integration with Software AG's webMethods

RabbitMQ can be integrated into webMethods using Java Client programs. Here is a step-by-step guide to performing smooth integration.

Technologies used

  • webMethods Integration Server 9.X
  • RabbitMQ Server Version 3.7.4 with Erlang 20.3
  • Jar files to place in Integration Server classpath (rabbitmq-client.jar & amqp-client-5.5.1.jar)
RabbitMQ Webmethods Integration1

Once both webMethods Integration Server and RabbitMQ Server are installed, consumers on the Queue need to be created to listen to the messages from RabbitMQ and invoke flow service by passing data received in message payload to perform business logic in the flow service.

Receiving messages from RabbitMQ

Following are the steps to perform successful RabbitMQ integration with Java service:

Step 1: Create RabbitMQ Connection
We need to create connection to RabbitMQ for receiving messages from RabbitMQ to webMethods.

com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
                        
factory.setHost("localhost");
factory.setUsername("Guest");
factory.setPassword("Guest");
factory.setPort(5672);
Connection connection = factory.newConnection();

Step 2: Create Channel
Channel channel = connection.createChannel();
channel.queueDeclare("RMQ_Out_Queue", true, false, false, null);

Step 3: Create Consumer
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
String headers = properties.getHeaders().toString();
};
channel.basicConsume("RMQ_Out_Queue",true,consumer);

Step 4: Invoke flow service by passing inputs from Receive Client code
                        
NSName nsName = NSName.create( "RabbitMQTest.services:printMessage" );
com.wm.app.b2b.server.User user = com.wm.app.b2b.server.UserManager.getUser("Administrator");
Session s = StateManager.createContext(0x7fffffffL, "system", user);
s.setUser(user);
s.clearModified();
Service.doThreadInvoke("RabbitMQTest.services","printMessage", s, idata);
StateManager.deleteContext(s.getSessionID());

Step 5: Implement flow service 
The flow service can perform actual business logic with the inputs passed from the received java client program.

 

RabbitMQ Webmethods Integration

Step 6: Publish message from RabbitMQ
Once the consumer is created for the queue from the Java client program, publish message on the same queue. 

RabbitMQ Webmethods Integration 1RabbitMQ Message Help


Step 7: Once the message is published successfully in RabbitMQ, the consumer created by the received java client program gets the message and passes it to the flow service. You can see the debug message printed in Server logs of Integration Server as shown below.

RabbitMQ Webmethods Integration 4

Send messages to RabbitMQ
Step 1: Create RabbitMQ Connection
We need to create connection to RabbitMQ for receiving messages from webMethods to RabbitMQ.

com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
                        
factory.setHost("localhost");
factory.setUsername("Guest");
factory.setPassword("Guest");
factory.setPort(5672);
Connection connection = factory.newConnection();

Step 2: Create Channel
Channel channel = connection.createChannel();
channel.queueDeclare("RMQ_In_Queue", true, false, false, null);

Step 3: Publish Message
String message = "Hello!! this message is from webMethods.";
channel.basicPublish("", "RMQ_In_Queue", new AMQP.BasicProperties.Builder().headers(headers).build(), message.getBytes("UTF-8"));
Once you run the Send client program successfully, you can see the message being sent to queue in RabbitMQ.

RabbitMQ Webmethods Integration Message

Complete Java Client code    
Receiver Client Code:

package RabbitMQTest.client;

import com.wm.data.*;
import com.wm.util.Values;
import com.wm.app.b2b.server.Service;
import com.wm.app.b2b.server.ServiceException;
import com.wm.app.b2b.server.Session;
import com.wm.app.b2b.server.StateManager;
import com.wm.lang.ns.NSName;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.Connection;
import com.rabbitmq.client.Channel;
import com.wm.util.JournalLogger;

public final class receiveMessage_SVC

{

    /** 
     * The primary method for the Java service
     *
     * @param pipeline
     *            The IData pipeline
     * @throws ServiceException
     */
    public static final void receiveMessage(IData pipeline) throws ServiceException {
        String log="";
        String MsgFromQueue = "";
        String queueName = "RMQ_Out_Queue";
        try{
            com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
        
            factory.setHost("localhost");
            factory.setUsername("Guest");
            factory.setPassword("Guest");
            factory.setPort(5672);
            com.rabbitmq.client.Connection connection = factory.newConnection();
        
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, true, false, false, null);
        
            JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"function", "[**] Waiting for messages [**]");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    String headers = properties.getHeaders().toString();
        
                    JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"function", message); 
                    IData idata = new IDataFactory().create();
                    IDataCursor idc = idata.getCursor();
                    IDataUtil.put(idc, "jsonString", headers+"::"+message);
                    IData pipelineIn = IDataUtil.clone(idata);
                    JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"@@@@", "Before calling service.doInvoke()");
                    try {
                        NSName nsName = NSName.create( "RabbitMQTest.services:printMessage" );
                        com.wm.app.b2b.server.User user = com.wm.app.b2b.server.UserManager.getUser("Administrator");
                        Session s = StateManager.createContext(0x7fffffffL, "system", user);
                        s.setUser(user);
                        s.clearModified();
        
                        Service.doThreadInvoke("RabbitMQTest.services","printMessage", s, idata);
                        StateManager.deleteContext(s.getSessionID());
                        JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"@@@@", "After calling Service.doInvoke()");
                    } catch (Exception e) 
                    {
                        JournalLogger.log(4, JournalLogger.FAC_FLOW_SVC, JournalLogger.DEBUG,"*** EXCEPTION***", e.toString());
                        e.printStackTrace();
                    }
                }
            };
            channel.basicConsume(queueName,true,consumer);            
            log = "Consumer created Successfully for RabbitMQ Queue";
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "message",log );
            pipelineCursor.destroy();
        }
        catch(Exception e){
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "Error", e.getStackTrace());
            IDataUtil.put( pipelineCursor, "log-levl", log);
            pipelineCursor.destroy();
        }
            
    }
    
    // --- <<IS-BEGIN-SHARED-SOURCE-AREA>> ---
    
    
    
    // --- <<IS-END-SHARED-SOURCE-AREA>> ---

    /**
     * The service implementations given below are read-only and show only the
     * method definitions and not the complete implementation.
     */
    public static final void sendMessage(IData pipeline) throws ServiceException {
    }

    final static receiveMessage_SVC _instance = new receiveMessage_SVC();

    static receiveMessage_SVC _newInstance() { return new receiveMessage_SVC(); }

    static receiveMessage_SVC _cast(Object o) { return (receiveMessage_SVC)o; }


 
Sender Client Code: 
 

package RabbitMQTest.client;

import com.wm.data.*;
import com.wm.util.Values;
import com.wm.app.b2b.server.Service;
import com.wm.app.b2b.server.ServiceException;
import com.wm.app.b2b.server.Session;
import com.wm.app.b2b.server.StateManager;
import com.wm.lang.ns.NSName;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.Connection;
import com.rabbitmq.client.Channel;
import com.wm.util.JournalLogger;

public final class sendMessage_SVC

{

    /** 
     * The primary method for the Java service
     *
     * @param pipeline
     *            The IData pipeline
     * @throws ServiceException
     */
    public static final void sendMessage(IData pipeline) throws ServiceException {
        String queueName = "RMQ_In_Queue";
        try{
            com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
            factory.setHost("localhost");
            factory.setUsername("Guest");
            factory.setPassword("Guest");
            factory.setPort(5672);
            com.rabbitmq.client.Connection connection = factory.newConnection();
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("ID",  12345);
            headers.put("Name", "Kellton");
            Channel channel = connection.createChannel();
            channel.queueDeclare(queueName, true, false, false, null);
            String message = "Hello!! this message is from webMethods.";
            channel.basicPublish("", queueName, new AMQP.BasicProperties.Builder().headers(headers).build(), message.getBytes("UTF-8"));
            channel.close();
            connection.close();
        
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "message", "Message Sent To RabbitMQ Successfully !" );
            pipelineCursor.destroy();
        }
        catch(Exception e){
            IDataCursor pipelineCursor = pipeline.getCursor();
            IDataUtil.put( pipelineCursor, "Error", e.getLocalizedMessage() );
            IDataUtil.put( pipelineCursor, "StackTrace", e.getStackTrace() );
            IDataUtil.put( pipelineCursor, "ErrorString", e.toString());
            pipelineCursor.destroy();
        }
            
    }
    
    // --- <<IS-BEGIN-SHARED-SOURCE-AREA>> ---
    
    
    
    // --- <<IS-END-SHARED-SOURCE-AREA>> ---

    /**
     * The service implementations given below are read-only and show only the
     * method definitions and not the complete implementation.
     */
    public static final void receiveMessage(IData pipeline) throws ServiceException {
    }

    final static sendMessage_SVC _instance = new sendMessage_SVC();

    static sendMessage_SVC _newInstance() { return new sendMessage_SVC(); }

    static sendMessage_SVC _cast(Object o) { return (sendMessage_SVC)o; }

}