Wednesday, April 2, 2014

WSO2 ESB : Huge File Processing inside VFS Listner using Smooks Mediator

You can use WSO2 ESB for processing large files (in GBs ) using VFS transport while Smooks mediator can be used inside VFS listener to read records from the files and route it to a destination queue , database or flat file for further processing rather than loading full file content into the proxy payload.


Further to this concept, below example will show you how to process large file for reading millions of CSV records and route it to the JMS Queue for further processing. The steps will help you to directly copy and paste the files into the specific folders for deploying the services , however you can create same configuration from ESB console rather than directly copying to the deployment folders.

Step 1: Download and install WSO2 ESB


Step 2: Enable VFS transport on ESB and enable JMS transport.

  • Uncomment the below two lines from /<WSO2_ESB>/repository/conf/axis2/axis2.xml


<transportSender name="vfs" class="org.apache.synapse.transport.vfs.VFSTransportSender"/>
<transportReceiver name="vfs" class="org.apache.synapse.transport.vfs.VFSTransportListener"/>



Step 3: Add the smooks configuration

  • Copy smook-config-mapping.xml file to the <WSO2_ESB>/repository/resources/smooks/
smook-config-mapping.xml

<smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd"
xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.1.xsd"
xmlns:jms="http://www.milyn.org/xsd/smooks/jms-routing-1.2.xsd"
xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd">

<params>
<param name="stream.filter.type">SAX</param>
<param name="stream.filter.readerPoolSize">100</param>
</params>

<csv:reader fields="firstname,lastname,gender,age,country" separator="," quote="'" skipLines="1" />

<resource-config selector="csv-record">
<resource>org.milyn.delivery.DomModelCreator</resource>
</resource-config>

<ftl:freemarker applyOnElement="csv-record">
<ftl:template>/repository/resources/smooks/csv_record_as_xml.ftl</ftl:template>
<ftl:use>
<ftl:bindTo id="csv_record_as_xml"/>
</ftl:use>
</ftl:freemarker>

<jms:router routeOnElement="csv-record" beanId="csv_record_as_xml" destination="TestQueue">
<jms:message>
<!-- Need to use special FreeMarker variable ".vars" -->
<jms:correlationIdPattern>${.vars["csv-record"].age}</jms:correlationIdPattern>
</jms:message>
<jms:jndi properties="/repository/resources/smooks/activemq.sr.jndi.properties" />
<jms:highWaterMark mark="10000000"/>
</jms:router>
</smooks-resource-list>



  • To add the config key in local entries , please copy smook_config.xml <WSO2-ESB>/repository/deployment/server/synapse-configs/default/local-entries

smook_config.xml

<?xml version="1.0" encoding="UTF-8"?>
<localEntry xmlns="http://ws.apache.org/ns/synapse"
key="smook_config"
src="file:repository/resources/smooks/smooks-config-mapping.xml">
<description/>
</localEntry>




Step 4: Add JMS configuration and FTL template which used by smook mediator to split the message and make out put format.

  • Add the 2 files called activemq.sr.jndi.properties and csv_record_as_xml.ftl into <WSO2-ESB>/repository/resources/smooks.

activemq.sr.jndi.properties

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
queue.TestQueue = TestQueue



csv_record_as_xml.ftl

<#assign csvrec = .vars["csv-record"]> <#-- special assignment because order-item has a hyphen -->
<Customer>
<firstname>${csvrec.firstname}</firstname>
<lastname>${csvrec.lastname}</lastname>
<gender>${csvrec.gender}</gender>
<age>${csvrec.age}</age>
<country>${csvrec.country}</country>
</Customer>




Step 5: Create your VFS listener proxy to read the file and pass it to the smooks mediator

  • Now copy the TestSmookRecVFS.xml proxy into the folder      <WSO2_ESB>/repository/deployment/server/synapse-configs/default/proxy-services
TestSmookRecVFS.xml
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="TestSmookRecVFS"
transports="vfs"
startOnLoad="true"
trace="disable">
<description/>
<target>
<inSequence>
<log>
<property name="JK*********" value="INSIDE VFS******"/>
</log>
<property name="DISABLE_SMOOKS_RESULT_PAYLOAD" value="true"/>
<smooks config-key="smook_config">
<input type="text"/>
<output type="xml"/>
</smooks>
</inSequence>
</target>
<parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
<parameter name="transport.PollInterval">10</parameter>
<parameter name="transport.vfs.MoveAfterProcess">file:///home/jayalal/Work/SmookRes/out</parameter>
<parameter name="transport.vfs.FileURI">file:///home/jayalal/Work/SmookRes/in</parameter>
<parameter name="transport.vfs.MoveAfterFailure">file:///home/jayalal/Work/SmookRes/failed</parameter>
<parameter name="transport.vfs.FileNamePattern">.*.txt</parameter>
<parameter name="transport.vfs.ContentType">text/plain</parameter>
<parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
</proxy>




Step 6 : Testing and Verify


Put your flat file containing CSV records into the “ transport.vfs.FileURI” location. You will notice VFS lister will pick the file and transfer record into the JMS queue.


Input.txt

name1,lastname1,Male,30,country1
name2,lastname2,Female,40,country2
name3,lastname3,Female,40,country3
name4,lastname4,Female,30,country4



Please note, active MQ will not store message more than 200 in default, so better you have reading listener to read from other end (as explain in next step) , so you can verify all of your millions of records successfully queued into the JMS queue.

Step 7: Verify with JMS Listener


You can read messages coming to the “ TestQueue” by using JMS listener proxy inside the ESB and further process the message.

Add the JMS Listner to process the messages from the queue;
- Put the proxy service file called JKTestJMSListner.xml into <WSO2_ESB>/repository/deployment/server/synapse-configs/default/proxy-services


JKTestJMSListner.xml


<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="JKTestJMSListner"
transports="jms"
startOnLoad="true"
trace="disable">
<description/>
<target>
<inSequence>
<log/>
</inSequence>
</target>
<parameter name="transport.jms.ContentType">
<rules>
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.ConcurrentConsumers">1</parameter>
<parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
<parameter name="transport.jms.SessionTransacted">false</parameter>
<parameter name="transport.jms.Destination">TestQueue</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
<parameter name="transport.jms.MaxConcurrentConsumers">1</parameter>
</proxy>




NOTES:

We have added below property to avoid the large file content into the payload since we already have consumed the file content and routed into the JMS queue.

<property name="DISABLE_SMOOKS_RESULT_PAYLOAD" value="true"/>


If this parameter is set to false (default value is false). Your file content will be loaded into the message context and may give out of memory error as below , this property will be available in ESB 4.9 onward or you have patch for fixes (https://wso2.org/jira/browse/ESBJAVA-3031) for your old ESB version.

You will be getting below exception if you have not included disable smook flag and your JVM memory is out.

java.lang.OutOfMemoryError: Java heap space
Dumping heap to /home/jayalal/Software/wso2esb-4.7.0/repository/logs/heap-dump.hprof ...
Heap dump file created [2015812579 bytes in 11.531 secs]
[2014-03-13 06:23:42,973] ERROR - NativeWorkerPool Uncaught exception
java.lang.OutOfMemoryError: Java heap space
at org.apache.axiom.om.impl.llom.factory.OMLinkedListImplFactory.createOMText(OMLinkedListImplFactory.java:192)
at org.apache.axiom.om.impl.builder.StAXBuilder.createOMText(StAXBuilder.java:294)
at org.apache.axiom.om.impl.builder.StAXBuilder.createOMText(StAXBuilder.java:250)
at org.apache.axiom.om.impl.builder.StAXOMBuilder.next(StAXOMBuilder.java:252)
at org.apache.axiom.om.impl.llom.OMSerializableImpl.build(OMSerializableImpl.java:78)
at org.apache.axiom.om.impl.llom.OMElementImpl.build(OMElementImpl.java:722)
at org.apache.axiom.om.impl.llom.OMElementImpl.detach(OMElementImpl.java:700)
at org.apache.axiom.om.impl.llom.OMNodeImpl.setParent(OMNodeImpl.java:105)
at org.apache.axiom.om.impl.llom.OMElementImpl.addChild(OMElementImpl.java:296)
at org.apache.axiom.om.impl.llom.OMElementImpl.addChild(OMElementImpl.java:212)
at org.apache.axiom.soap.impl.llom.SOAPBodyImpl.addChild(SOAPBodyImpl.java:231)
at org.wso2.carbon.mediator.transform.Output.setXMLPayload(Output.java:228)
at org.wso2.carbon.mediator.transform.Output.process(Output.java:95)
at org.wso2.carbon.mediator.transform.SmooksMediator.mediate(SmooksMediator.java:125)
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:114)
at org.apache.synapse.core.axis2.ProxyServiceMessageReceiver.receive(ProxyServiceMessageReceiver.java:162)
at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
at org.apache.axis2.transport.base.AbstractTransportListener.handleIncomingMessage(AbstractTransportListener.java:328)
at org.apache.synapse.transport.vfs.VFSTransportListener.processFile(VFSTransportListener.java:590)
at org.apache.synapse.transport.vfs.VFSTransportListener.scanFileOrDirectory(VFSTransportListener.java:324)
at org.apache.synapse.transport.vfs.VFSTransportListener.poll(VFSTransportListener.java:158)
at org.apache.synapse.transport.vfs.VFSTransportListener.poll(VFSTransportListener.java:107)
at org.apache.axis2.transport.base.AbstractPollingTransportListener$1$1.run(AbstractPollingTransportListener.java:67)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)

2 comments:

  1. Hi

    Have you tried using "DISABLE_SMOOKS_RESULT_PAYLOAD" property as explained in this document.
    Also if you have not got latest builds for 4.8.1 , then you may have to use 4.9

    Thanks
    Jayalal

    ReplyDelete
  2. Hi,

    Instead of using queue, can we put it in Database inside smooks?

    ReplyDelete