Wednesday, April 2, 2014

WSO2 ESB: File processing using VFS transport and mediation inside a sequence

VFS transport feature in ESB can be used to process the files in the system periodically and take actions as configured. Basically you can use this to transfer file from one location to other with the mediation in between.
Below example shows how to process simple csv flat file in a folder and process each record inside ESB proxy service and forward for further processing into a separate sequence.
We will be using smooks mediator to read csv records from the file and load those into the proxy payload as xml records.

Step 1: Download and install WSO2 ESB




Step 2: Enable VFS transport on ESB and enable JMS transport.

  • Uncomment the below two lines from /<WSO2_ESB>/repository/conf/axis2/axis2.xml


<transportSender name="vfs" class="org.apache.synapse.transport.vfs.VFSTransportSender"/>
<transportReceiver name="vfs" class="org.apache.synapse.transport.vfs.VFSTransportListener"/>


Step 3: Add local entry to refer smooks configuration file

  • Go to the WSO2 ESB Management console ( https://localhost:9443/carbon )
  • Select “Local Entries” from the left panel and add new “Source URL Entry” with below values
    • Name : smook_config
    • URL : file:repository/resources/smooks/smooks-config-mapping.xml
  • Also add copy smook-config-mapping.xml file to the <WSO2_ESB>/repository/resources/smooks/ folder

smook-config-mapping.xml
<smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.0.xsd">
<!--Configure the CSVParser to parse the message into a stream of SAX events. -->
<resource-config selector="org.xml.sax.driver">
<resource>org.milyn.csv.CSVParser</resource>
<param name="fields" type="string-list">firstname,lastname,gender,age,country</param>
</resource-config>

</smooks-resource-list>


Step 4: Create Proxy as VFS Listener and Sequence to process each record


Add Sequence to used by proxy to forward the each record:

<?xml version="1.0" encoding="UTF-8"?>
<sequence xmlns="http://ws.apache.org/ns/synapse" name="RecProcessSequence">
<log level="full">
<property name="==Processing record =====" value="=="/>
</log>
</sequence>

Add the Proxy :


This Listner proxy will read the files from the input path and load into the proxy payload and iterate each record and call sequence (RecProcessSequence) to process each record further.
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="TestSmookRecVFS"
transports="vfs"
startOnLoad="true"
trace="disable">
<description/>
<target>
<inSequence>
<smooks config-key="smook_config">
<input type="text"/>
<output type="xml"/>
</smooks>
<iterate xmlns:ns2="http://org.apache.synapse/xsd"
xmlns:ns="http://org.apache.synapse/xsd"
xmlns:sec="http://secservice.samples.esb.wso2.org"
expression="//csv-set/csv-record">
<target>
<sequence>
<clone>
<target sequence="RecProcessSequence"/>
</clone>
</sequence>
</target>
</iterate>
</inSequence>
</target>
<parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
<parameter name="transport.PollInterval">10</parameter>
<parameter name="transport.vfs.MoveAfterProcess">file:///home/jayalal/Work/SmookRes/out</parameter>
<parameter name="transport.vfs.FileURI">file:///home/jayalal/Work//SmookRes/in</parameter>
<parameter name="transport.vfs.MoveAfterFailure">file:///home/jayalal/Work/ /SmookRes/failed</parameter>
<parameter name="transport.vfs.FileNamePattern">.*.txt</parameter>
<parameter name="transport.vfs.ContentType">text/plain</parameter>
<parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
</proxy>


Step 5: Testing and Verify

Copy the input.txt file into the “transport.vfs.FileURI” folder.
name1,lastname1,Male,30,country1
name2,lastname2,Female,40,country2
name3,lastname3,Female,40,country3
name4,lastname4,Female,30,country4
name5,lastname5,Female,40,country5


You will see the ESB logs will print the messages process by ESB.
[2014-01-29 15:11:41,922] INFO - LogMediator To: , WSAction: urn:mediate, SOAPAction: urn:mediate, MessageID: urn:uuid:de01ae68-6afc-4539-9c0e-f820fa181c19, Direction: request, ==Processing record ===== = ==, Envelope: <?xml version='1.0' encoding='utf-8'?><soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"><soapenv:Body><csv-record number="1"><firstname>name1</firstname><lastname>lastname1</lastname><gender>Male</gender><age>30</age><country>SriLanka</country></csv-record></soapenv:Body></soapenv:Envelope>
[2014-01-29 15:11:41,924] INFO - LogMediator To: , WSAction: urn:mediate, SOAPAction: urn:mediate, MessageID: urn:uuid:b436bd8d-a30b-40e1-83c2-00f109b793a5, Direction: request, ==Processing record ===== = ==, Envelope: <?xml version='1.0' encoding='utf-8'?><soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"><soapenv:Body><csv-record number="2"><firstname>name2</firstname><lastname>lastname2</lastname><gender>Female</gender><age>40</age><country>SriLanka</country></csv-record></soapenv:Body></soapenv:Envelope>


More References:















WSO2 ESB : Huge File Processing inside VFS Listner using Smooks Mediator

You can use WSO2 ESB for processing large files (in GBs ) using VFS transport while Smooks mediator can be used inside VFS listener to read records from the files and route it to a destination queue , database or flat file for further processing rather than loading full file content into the proxy payload.


Further to this concept, below example will show you how to process large file for reading millions of CSV records and route it to the JMS Queue for further processing. The steps will help you to directly copy and paste the files into the specific folders for deploying the services , however you can create same configuration from ESB console rather than directly copying to the deployment folders.

Step 1: Download and install WSO2 ESB


Step 2: Enable VFS transport on ESB and enable JMS transport.

  • Uncomment the below two lines from /<WSO2_ESB>/repository/conf/axis2/axis2.xml


<transportSender name="vfs" class="org.apache.synapse.transport.vfs.VFSTransportSender"/>
<transportReceiver name="vfs" class="org.apache.synapse.transport.vfs.VFSTransportListener"/>



Step 3: Add the smooks configuration

  • Copy smook-config-mapping.xml file to the <WSO2_ESB>/repository/resources/smooks/
smook-config-mapping.xml

<smooks-resource-list xmlns="http://www.milyn.org/xsd/smooks-1.1.xsd"
xmlns:csv="http://www.milyn.org/xsd/smooks/csv-1.1.xsd"
xmlns:jms="http://www.milyn.org/xsd/smooks/jms-routing-1.2.xsd"
xmlns:ftl="http://www.milyn.org/xsd/smooks/freemarker-1.1.xsd">

<params>
<param name="stream.filter.type">SAX</param>
<param name="stream.filter.readerPoolSize">100</param>
</params>

<csv:reader fields="firstname,lastname,gender,age,country" separator="," quote="'" skipLines="1" />

<resource-config selector="csv-record">
<resource>org.milyn.delivery.DomModelCreator</resource>
</resource-config>

<ftl:freemarker applyOnElement="csv-record">
<ftl:template>/repository/resources/smooks/csv_record_as_xml.ftl</ftl:template>
<ftl:use>
<ftl:bindTo id="csv_record_as_xml"/>
</ftl:use>
</ftl:freemarker>

<jms:router routeOnElement="csv-record" beanId="csv_record_as_xml" destination="TestQueue">
<jms:message>
<!-- Need to use special FreeMarker variable ".vars" -->
<jms:correlationIdPattern>${.vars["csv-record"].age}</jms:correlationIdPattern>
</jms:message>
<jms:jndi properties="/repository/resources/smooks/activemq.sr.jndi.properties" />
<jms:highWaterMark mark="10000000"/>
</jms:router>
</smooks-resource-list>



  • To add the config key in local entries , please copy smook_config.xml <WSO2-ESB>/repository/deployment/server/synapse-configs/default/local-entries

smook_config.xml

<?xml version="1.0" encoding="UTF-8"?>
<localEntry xmlns="http://ws.apache.org/ns/synapse"
key="smook_config"
src="file:repository/resources/smooks/smooks-config-mapping.xml">
<description/>
</localEntry>




Step 4: Add JMS configuration and FTL template which used by smook mediator to split the message and make out put format.

  • Add the 2 files called activemq.sr.jndi.properties and csv_record_as_xml.ftl into <WSO2-ESB>/repository/resources/smooks.

activemq.sr.jndi.properties

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://localhost:61616
queue.TestQueue = TestQueue



csv_record_as_xml.ftl

<#assign csvrec = .vars["csv-record"]> <#-- special assignment because order-item has a hyphen -->
<Customer>
<firstname>${csvrec.firstname}</firstname>
<lastname>${csvrec.lastname}</lastname>
<gender>${csvrec.gender}</gender>
<age>${csvrec.age}</age>
<country>${csvrec.country}</country>
</Customer>




Step 5: Create your VFS listener proxy to read the file and pass it to the smooks mediator

  • Now copy the TestSmookRecVFS.xml proxy into the folder      <WSO2_ESB>/repository/deployment/server/synapse-configs/default/proxy-services
TestSmookRecVFS.xml
<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="TestSmookRecVFS"
transports="vfs"
startOnLoad="true"
trace="disable">
<description/>
<target>
<inSequence>
<log>
<property name="JK*********" value="INSIDE VFS******"/>
</log>
<property name="DISABLE_SMOOKS_RESULT_PAYLOAD" value="true"/>
<smooks config-key="smook_config">
<input type="text"/>
<output type="xml"/>
</smooks>
</inSequence>
</target>
<parameter name="transport.vfs.ActionAfterProcess">MOVE</parameter>
<parameter name="transport.PollInterval">10</parameter>
<parameter name="transport.vfs.MoveAfterProcess">file:///home/jayalal/Work/SmookRes/out</parameter>
<parameter name="transport.vfs.FileURI">file:///home/jayalal/Work/SmookRes/in</parameter>
<parameter name="transport.vfs.MoveAfterFailure">file:///home/jayalal/Work/SmookRes/failed</parameter>
<parameter name="transport.vfs.FileNamePattern">.*.txt</parameter>
<parameter name="transport.vfs.ContentType">text/plain</parameter>
<parameter name="transport.vfs.ActionAfterFailure">MOVE</parameter>
</proxy>




Step 6 : Testing and Verify


Put your flat file containing CSV records into the “ transport.vfs.FileURI” location. You will notice VFS lister will pick the file and transfer record into the JMS queue.


Input.txt

name1,lastname1,Male,30,country1
name2,lastname2,Female,40,country2
name3,lastname3,Female,40,country3
name4,lastname4,Female,30,country4



Please note, active MQ will not store message more than 200 in default, so better you have reading listener to read from other end (as explain in next step) , so you can verify all of your millions of records successfully queued into the JMS queue.

Step 7: Verify with JMS Listener


You can read messages coming to the “ TestQueue” by using JMS listener proxy inside the ESB and further process the message.

Add the JMS Listner to process the messages from the queue;
- Put the proxy service file called JKTestJMSListner.xml into <WSO2_ESB>/repository/deployment/server/synapse-configs/default/proxy-services


JKTestJMSListner.xml


<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="JKTestJMSListner"
transports="jms"
startOnLoad="true"
trace="disable">
<description/>
<target>
<inSequence>
<log/>
</inSequence>
</target>
<parameter name="transport.jms.ContentType">
<rules>
<jmsProperty>contentType</jmsProperty>
<default>application/xml</default>
</rules>
</parameter>
<parameter name="transport.jms.ConcurrentConsumers">1</parameter>
<parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
<parameter name="transport.jms.SessionTransacted">false</parameter>
<parameter name="transport.jms.Destination">TestQueue</parameter>
<parameter name="transport.jms.CacheLevel">consumer</parameter>
<parameter name="transport.jms.MaxConcurrentConsumers">1</parameter>
</proxy>




NOTES:

We have added below property to avoid the large file content into the payload since we already have consumed the file content and routed into the JMS queue.

<property name="DISABLE_SMOOKS_RESULT_PAYLOAD" value="true"/>


If this parameter is set to false (default value is false). Your file content will be loaded into the message context and may give out of memory error as below , this property will be available in ESB 4.9 onward or you have patch for fixes (https://wso2.org/jira/browse/ESBJAVA-3031) for your old ESB version.

You will be getting below exception if you have not included disable smook flag and your JVM memory is out.

java.lang.OutOfMemoryError: Java heap space
Dumping heap to /home/jayalal/Software/wso2esb-4.7.0/repository/logs/heap-dump.hprof ...
Heap dump file created [2015812579 bytes in 11.531 secs]
[2014-03-13 06:23:42,973] ERROR - NativeWorkerPool Uncaught exception
java.lang.OutOfMemoryError: Java heap space
at org.apache.axiom.om.impl.llom.factory.OMLinkedListImplFactory.createOMText(OMLinkedListImplFactory.java:192)
at org.apache.axiom.om.impl.builder.StAXBuilder.createOMText(StAXBuilder.java:294)
at org.apache.axiom.om.impl.builder.StAXBuilder.createOMText(StAXBuilder.java:250)
at org.apache.axiom.om.impl.builder.StAXOMBuilder.next(StAXOMBuilder.java:252)
at org.apache.axiom.om.impl.llom.OMSerializableImpl.build(OMSerializableImpl.java:78)
at org.apache.axiom.om.impl.llom.OMElementImpl.build(OMElementImpl.java:722)
at org.apache.axiom.om.impl.llom.OMElementImpl.detach(OMElementImpl.java:700)
at org.apache.axiom.om.impl.llom.OMNodeImpl.setParent(OMNodeImpl.java:105)
at org.apache.axiom.om.impl.llom.OMElementImpl.addChild(OMElementImpl.java:296)
at org.apache.axiom.om.impl.llom.OMElementImpl.addChild(OMElementImpl.java:212)
at org.apache.axiom.soap.impl.llom.SOAPBodyImpl.addChild(SOAPBodyImpl.java:231)
at org.wso2.carbon.mediator.transform.Output.setXMLPayload(Output.java:228)
at org.wso2.carbon.mediator.transform.Output.process(Output.java:95)
at org.wso2.carbon.mediator.transform.SmooksMediator.mediate(SmooksMediator.java:125)
at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:71)
at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:114)
at org.apache.synapse.core.axis2.ProxyServiceMessageReceiver.receive(ProxyServiceMessageReceiver.java:162)
at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
at org.apache.axis2.transport.base.AbstractTransportListener.handleIncomingMessage(AbstractTransportListener.java:328)
at org.apache.synapse.transport.vfs.VFSTransportListener.processFile(VFSTransportListener.java:590)
at org.apache.synapse.transport.vfs.VFSTransportListener.scanFileOrDirectory(VFSTransportListener.java:324)
at org.apache.synapse.transport.vfs.VFSTransportListener.poll(VFSTransportListener.java:158)
at org.apache.synapse.transport.vfs.VFSTransportListener.poll(VFSTransportListener.java:107)
at org.apache.axis2.transport.base.AbstractPollingTransportListener$1$1.run(AbstractPollingTransportListener.java:67)
at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)