Oracle jdk1.8.0_72
MB 3.1.0
Preconditions: ESB should be configured with MB
ESB and MB should up and running
Situation: Create a proxy in ESB which will send incoming message in to jms message store
<proxy name="InformationIncome" startOnLoad="true" trace="disable" transports="http https">
<description/>
<target>
<inSequence>
<log level="full"/>
<property name="FORCE_SC_ACCEPTED" scope="axis2" value="true"/>
<property name="OUT_ONLY" value="true"/>
<property name="content-Type" scope="transport"
type="STRING" value="application/json"/>
<property name="messageType" scope="axis2" type="STRING" value="application/json"/>
<payloadFactory media-type="json">
<format>
{
"processDefinitionId":"myProcess:6:25188",
"variables": [
{
"name":"text",
"value":"$1"
}
]
}
</format>
<args>
<arg evaluator="json" expression="$.text"/>
</args>
</payloadFactory>
<header
expression="fn:concat('Basic ', base64Encode('admin:admin'))"
name="Authorization" scope="transport"/>
<log level="full"/>
<store messageStore="InformationIncomeMS"/>
</inSequence>
</target>
</proxy>
<messageStore
class="org.apache.synapse.message.store.impl.jms.JmsStore" name="InformationIncomeMS">
<parameter name="store.jms.destination">JMSQueue</parameter>
<parameter name="store.producer.guaranteed.delivery.enable">false</parameter>
<parameter name="java.naming.factory.initial">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
<parameter name="java.naming.provider.url">repository/conf/jndi.properties</parameter>
<parameter name="store.jms.JMSSpecVersion">1.1</parameter>
</messageStore>
Add another proxy which will listen to message store queue.
<proxy name="JMSQueue" startOnLoad="true" trace="disable" transports="jms">
<description/>
<target>
<inSequence>
<log level="full">
<property name="Message" value="==============This is listening sequence======"/>
</log>
</inSequence>
</target>
</proxy>
Publish a message to Proxy.
Error:
[2016-07-12 17:51:05,124] ERROR - BaseUtils Unsupported JMS message type org.wso2.andes.client.message.JMSObjectMessage
[2016-07-12 17:51:05,125] ERROR - JMSMessageReceiver Unknown error processing message
org.apache.axis2.transport.base.BaseTransportException: Unsupported JMS message type org.wso2.andes.client.message.JMSObjectMessage
at org.apache.axis2.transport.base.BaseUtils.handleException(BaseUtils.java:168)
at org.apache.axis2.transport.jms.JMSUtils.setSOAPEnvelope(JMSUtils.java:177)
at org.apache.axis2.transport.jms.JMSMessageReceiver.processThoughEngine(JMSMessageReceiver.java:195)
at org.apache.axis2.transport.jms.JMSMessageReceiver.onMessage(JMSMessageReceiver.java:122)
at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.handleMessage(ServiceTaskManager.java:575)
at org.apache.axis2.transport.jms.ServiceTaskManager$MessageListenerTask.run(ServiceTaskManager.java:468)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Solution:
Add a message processor to handle messages in message store
<messageProcessor
class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor"
messageStore="InformationIncomeMS" name="testP" targetEndpoint="aspnet">
<parameter name="client.retry.interval">1000</parameter>
<parameter name="throttle">false</parameter>
<parameter name="max.delivery.attempts">4</parameter>
<parameter name="member.count">1</parameter>
<parameter name="max.delivery.drop">Enabled</parameter>
<parameter name="interval">1000</parameter>
<parameter name="is.active">true</parameter>
<parameter name="target.endpoint">aspnet</parameter>
</messageProcessor>
Sample code for endpoint:
<endpoint xmlns="http://ws.apache.org/ns/synapse" name="aspnet">
<address uri="http://localhost:9000/services/SimpleStockQuoteService">
<suspendOnFailure>
<progressionFactor>1.0</progressionFactor>
</suspendOnFailure>
<markForSuspension>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
<retryDelay>0</retryDelay>
</markForSuspension>
</address>
</endpoint>
Analysis:
Once we store messages in a message store we need to use message processor to process the stored messages. When a message stored in a JMS Message Store, that message is getting serialized before storing it in the JMS Queue. After serializing, it is saved as Object message. It is the implementation behavior and it is getting deserialized when it is processed by the message processor. Basically, without using the message processor, can not consume that message.
Helo,
ReplyDeleteI didn't understand the endpoint "aspnet". Could you post its code?
Thanks.
The endpoint "aspnet" indicate the endpoint that you need to send the message in the message store. Processor will retrieve message from message store and send to the given endpoint (Ex: aspnet). I added sample code to blog post.
Delete