Tuesday, July 12, 2016

[WSO2][ESB][JMS] Configuring jms message store without message processor

Environment - ESB 4.9.0
                         Oracle jdk1.8.0_72
                         MB 3.1.0

Preconditions: ESB should be configured with MB
                          ESB and MB should  up and running

Situation: Create a proxy in ESB which will send incoming message in to jms message store 

 <proxy name="InformationIncome" startOnLoad="true" trace="disable" transports="http https">  
     <description/>  
     <target>  
       <inSequence>  
         <log level="full"/>  
         <property name="FORCE_SC_ACCEPTED" scope="axis2" value="true"/>  
         <property name="OUT_ONLY" value="true"/>  
         <property name="content-Type" scope="transport"  
           type="STRING" value="application/json"/>  
         <property name="messageType" scope="axis2" type="STRING" value="application/json"/>  
         <payloadFactory media-type="json">  
           <format>  
         {  
         "processDefinitionId":"myProcess:6:25188",  
         "variables": [  
          {  
           "name":"text",  
           "value":"$1"  
          }  
         ]  
       }   
       </format>  
           <args>  
             <arg evaluator="json" expression="$.text"/>  
           </args>  
         </payloadFactory>  
         <header  
           expression="fn:concat('Basic ', base64Encode('admin:admin'))"  
           name="Authorization" scope="transport"/>  
         <log level="full"/>  
         <store messageStore="InformationIncomeMS"/>  
       </inSequence>  
     </target>  
   </proxy>  

  <messageStore  
     class="org.apache.synapse.message.store.impl.jms.JmsStore" name="InformationIncomeMS">  
     <parameter name="store.jms.destination">JMSQueue</parameter>  
     <parameter name="store.producer.guaranteed.delivery.enable">false</parameter>  
     <parameter name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>  
     <parameter name="java.naming.provider.url">repository/conf/jndi.properties</parameter>  
     <parameter name="store.jms.JMSSpecVersion">1.1</parameter>  
   </messageStore>  

Add another proxy which will listen to message store queue.
 <proxy name="JMSQueue" startOnLoad="true" trace="disable" transports="jms">  
     <description/>  
     <target>  
       <inSequence>  
         <log level="full">  
           <property name="Message" value="==============This is listening sequence======"/>  
         </log>  
       </inSequence>  
     </target>  
   </proxy>  

Publish a message to Proxy.

Error:
 [2016-07-12 17:51:05,124] ERROR - BaseUtils Unsupported JMS message type org.wso2.andes.client.message.JMSObjectMessage  
 [2016-07-12 17:51:05,125] ERROR - JMSMessageReceiver Unknown error processing message  
 org.apache.axis2.transport.base.BaseTransportException: Unsupported JMS message type org.wso2.andes.client.message.JMSObjectMessage  
      at org.apache.axis2.transport.base.BaseUtils.handleException(BaseUtils.java:168)  
      at org.apache.axis2.transport.jms.JMSUtils.setSOAPEnvelope(JMSUtils.java:177)  
      at org.apache.axis2.transport.jms.JMSMessageReceiver.processThoughEngine(JMSMessageReceiver.java:195)  
      at org.apache.axis2.transport.jms.JMSMessageReceiver.onMessage(JMSMessageReceiver.java:122)  
      at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.handleMessage(ServiceTaskManager.java:575)  
      at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:468)  
      at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)  
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)  
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)  
      at java.lang.Thread.run(Thread.java:745)  


Solution:

Add a message processor to handle messages in message store
   <messageProcessor  
     class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"  
     messageStore="InformationIncomeMS" name="testP" targetEndpoint="aspnet">  
     <parameter name="client.retry.interval">1000</parameter>  
     <parameter name="throttle">false</parameter>  
     <parameter name="max.delivery.attempts">4</parameter>  
     <parameter name="member.count">1</parameter>  
     <parameter name="max.delivery.drop">Enabled</parameter>  
     <parameter name="interval">1000</parameter>  
     <parameter name="is.active">true</parameter>  
     <parameter name="target.endpoint">aspnet</parameter>  
   </messageProcessor>  



Sample code for endpoint:

 <endpoint xmlns="http://ws.apache.org/ns/synapse" name="aspnet">  
   <address uri="http://localhost:9000/services/SimpleStockQuoteService">  
    <suspendOnFailure>  
      <progressionFactor>1.0</progressionFactor>  
    </suspendOnFailure>  
    <markForSuspension>  
      <retriesBeforeSuspension>0</retriesBeforeSuspension>  
      <retryDelay>0</retryDelay>  
    </markForSuspension>  
   </address>  
 </endpoint>  

Analysis:


Once we store messages in a message store we need to use message processor to process the stored messages. When a message stored in a JMS Message Store, that message is getting serialized before storing it in the JMS Queue. After serializing, it is saved as Object message. It is the implementation behavior and it is getting deserialized when it is processed by the message processor. Basically, without using the message processor, can not consume that message.


2 comments:

  1. Helo,

    I didn't understand the endpoint "aspnet". Could you post its code?

    Thanks.

    ReplyDelete
    Replies
    1. The endpoint "aspnet" indicate the endpoint that you need to send the message in the message store. Processor will retrieve message from message store and send to the given endpoint (Ex: aspnet). I added sample code to blog post.

      Delete