Sunday, July 26, 2015

Spring Integration - simple example


1. intro 

From wiki: Spring Integration is an open source framework for enterprise application integration. It is a lightweight framework that builds upon the core Spring framework. It is designed to enable the development of integration solutions typical of event-driven architectures and messaging-centric architectures.

In shorts, Spring integration is a framework wich can help in creation application with complex data flows, with a lot of different data transformation, filtering, routing and others. 

Schema of flow in this example : 



Description : 
we have collection of payments, this collection is splitting and sending "one by one" into next step : Filter. Filter just allows to go ahead payments with amount!=nll. After that(router), we have 2 types of workflow regarding property isVip: one workflow for regular payments(regular transformer) , another one for vip payments : vip transformer. Results of transformation are sending into paymentProcessor in which has to main logic of our application. In case when we have errors in processing - we are processing errors in fail processor.  


2. Component description. 


    • Inbound Gateways --- bring data from an external system to the integration network, and expect a response message to be provided by some other components, through some channel, to be forward to the external system
    • Channels --- the connectors between the other types of components, i.e. the endpoints. (These are the "pipes" in the "pipes&filters" architectural characterization.) Channels provide a simple API defined in-terms of message passing operations, such as send() and recv() methods.
    •  message router is an endpoint that forwards incoming messages to one (or more) among several configured output channels. The criteria for selecting the output channel is usually based on the message payload and/or the message headers. 

    • Message filters are endpoints that selectively relay messages from an input to an output channel. They can be used to discard messages based on payload or header values, detect invalid messages, or relay non-conforming messages to a separte channel.
    • Message transformers are endpoints that transform messages payload and/or headers. The can be used to perform arbitrary transformations on the payload, convert payload types, and add, change, or remove headers. 
    • The Splitter is a component whose role is to partition a message in several parts, and send the resulting messages to be processed independently. Very often, they are upstream producers in a pipeline that includes an Aggregator.

3. Project structure


4. Main project (pom) file



<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demien</groupId>
    <artifactId>spring-integration-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>3.2.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>2.2.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>13.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>6.5.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

5. Context file

Here described all component of application.  Please take a look how implemented logging: interceptors in channels.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:task="http://www.springframework.org/schema/task"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
                           http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://www.springframework.org/schema/task
                           http://www.springframework.org/schema/task/spring-task.xsd
                           http://www.springframework.org/schema/context
                           http://www.springframework.org/schema/context/spring-context.xsd
                           http://www.springframework.org/schema/integration
                           http://www.springframework.org/schema/integration/spring-integration.xsd"
        >

    <bean id="auditInterceptor" class="com.demien.spring.integration.interceptor.LogInterceptor"/>

    <int:channel id = "newPaymentChannel">
        <int:interceptors>
            <ref bean="auditInterceptor"/>
        </int:interceptors>
    </int:channel>

    <int:gateway id="inPaymentGateway"
                 service-interface="com.demien.spring.integration.gateways.LoadPaymentsGateway">
        <int:method name="loadPayments" request-channel="newPaymentChannel" />
    </int:gateway>

    <int:splitter
            input-channel="newPaymentChannel"
            output-channel="singlePaymentChannel" />
    <int:channel id = "singlePaymentChannel">
        <int:interceptors>
            <ref bean="auditInterceptor"/>
        </int:interceptors>
    </int:channel>


    <int:filter
            input-channel="singlePaymentChannel"
            output-channel="filteredPaymentChannel"
            ref="paymentFilter" />
    <int:channel id = "filteredPaymentChannel">
        <int:interceptors>
            <ref bean="auditInterceptor"/>
        </int:interceptors>
    </int:channel>
    <bean id="paymentFilter" class="com.demien.spring.integration.filters.PaymentFilter"/>

    <int:recipient-list-router input-channel="filteredPaymentChannel">
        <int:recipient channel = "regularPayments" selector-expression="!payload.isVip()"/>
        <int:recipient channel = "vipPayments" selector-expression="payload.isVip()" />
    </int:recipient-list-router>
    <int:channel id = "regularPayments">
        <int:interceptors>
            <ref bean="auditInterceptor"/>
        </int:interceptors>
    </int:channel>
    <int:channel id = "vipPayments">
        <int:interceptors>
            <ref bean="auditInterceptor"/>
        </int:interceptors>
    </int:channel>
    <int:transformer
            input-channel="regularPayments"
            output-channel="processingChannel"
            ref="regularPaymentTransformer" />
    <int:transformer
            input-channel="vipPayments"
            output-channel="processingChannel"
            ref="vipPaymentTransformer" />
    <int:channel id = "processingChannel">
        <int:queue capacity="10" />
        <int:interceptors>
            <ref bean="auditInterceptor"/>
        </int:interceptors>
    </int:channel>
    <bean id="regularPaymentTransformer" class="com.demien.spring.integration.transformers.RegularPaymentTransformer"/>
    <bean id="vipPaymentTransformer" class="com.demien.spring.integration.transformers.VipPaymentTransformer"/>


    <int:service-activator input-channel="processingChannel" ref="paymentProcessor">
        <int:poller fixed-rate="100" error-channel="failedPaymentsChannel" />
    </int:service-activator>
    <bean id="paymentProcessor" class="com.demien.spring.integration.activators.PaymentServiceActivator"/>

    <int:channel id = "failedPaymentsChannel">
        <int:interceptors>
            <ref bean="auditInterceptor"/>
        </int:interceptors>
    </int:channel>


    <int:service-activator input-channel="failedPaymentsChannel" ref="failProcessor" />
    <bean id="failProcessor" class="com.demien.spring.integration.activators.FailedPaymentActivator"/>

</beans> 
 

6. Test dto(Payment)

Very simple object. Please take a look at method isVip- we will use it later.

package com.demien.spring.integration.dto;

import java.math.BigDecimal;
import java.util.Date;

public class Payment {
    BigDecimal amount;
    String description;

    public Payment(String description, BigDecimal amount) {
        this.description = description;
        this.amount = amount;
    }

    public boolean isVip() {
        if (amount!=null && amount.compareTo(new BigDecimal(10000))==1) {
            return true;
        } else {
            return false;
        }
    }

    public BigDecimal getAmount() {
        return amount;
    }

    public void setAmount(BigDecimal amount) {
        this.amount = amount;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }


    @Override
    public String toString() {
        return "Payment{" +
                "amount=" + amount +
                ", description='" + description + '\'' +
                '}';
    }
}

 

7. Interceptors

As I mentioned before - we will use interceptor for logging messages which came into channel. 

package com.demien.spring.integration.interceptor;

import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.channel.interceptor.ChannelInterceptorAdapter;

public class LogInterceptor extends ChannelInterceptorAdapter {

    public Message<?> preSend(Message<?> message,
                              MessageChannel channel) {
        System.out.println("[["+channel.toString()+"]] "+message.getPayload());
        return message;
    }

}

8. Filters

We can filter object which came into channel. In this example, if received object don't have amount - it will be filtered (and will not move forward).

package com.demien.spring.integration.filters;

import com.demien.spring.integration.dto.Payment;
import org.springframework.integration.annotation.Filter;

public class PaymentFilter {

    @Filter
    public boolean checkMandatoryFields(Payment payment) {
        if (payment==null || payment.getAmount()==null) {
            System.out.println("REJECTED:"+payment);
            return false;
        } else {
            return true;
        }
    }
}

9. Transformers

I made 2 branches of workflow here : 2 transformers : for regular and vip payments. 


package com.demien.spring.integration.transformers;

import com.demien.spring.integration.dto.Payment;

public interface PaymentTransformer {
    String paymentToSting(Payment payment);
} 
 
 

package com.demien.spring.integration.transformers;

import com.demien.spring.integration.dto.Payment;
import org.springframework.integration.annotation.Transformer;

public class RegularPaymentTransformer implements PaymentTransformer {

    @Override
    @Transformer
    public String paymentToSting(Payment payment) {
        return payment.getDescription()+" "+payment.getAmount();
    }


} 
 
 

package com.demien.spring.integration.transformers;

import com.demien.spring.integration.dto.Payment;
import org.springframework.integration.annotation.Transformer;

public class VipPaymentTransformer implements PaymentTransformer {

    @Override
    @Transformer
    public String paymentToSting(Payment payment) {
        return "!!! "+ " VIP PAYMENT:"+payment.getDescription()+" "+payment.getAmount();
    }
}

10. Activators

Usually, the main processing logic. In this example I just printed in-object.

package com.demien.spring.integration.activators;

import org.springframework.integration.annotation.ServiceActivator;

public class PaymentServiceActivator {

    @ServiceActivator
    public void processPayment(String payment) throws Exception {
         if (payment.toUpperCase().contains("TEST")) {
             throw new Exception("Test payment was not processed");
         }
        // some logic have to be here
        System.out.println("PROCESSED:"+payment);
    }
}

package com.demien.spring.integration.activators;

import org.springframework.integration.Message;
import org.springframework.integration.MessageHandlingException;
import org.springframework.integration.annotation.ServiceActivator;

public class FailedPaymentActivator {
    @ServiceActivator
    public void handleFailedOrder(Message<MessageHandlingException> message) {
        String payment=(String)message.getPayload().getFailedMessage().getPayload();
        System.out.println("FAILED:"+payment+" WITH ERROR:"+message.getPayload().getMessage());
    }
}

11. Main starter file

Here I'm creating 4 test objects and sending them into workflow by gateway. 

package com.demien.spring.integration;

import com.demien.spring.integration.dto.Payment;
import com.demien.spring.integration.gateways.LoadPaymentsGateway;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;

public class App {

    static LoadPaymentsGateway gateway;

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context=new ClassPathXmlApplicationContext("/expences-context.xml");

        Payment payment1=new Payment("First payment", new BigDecimal(666) );
        Payment payment2=new Payment("Second payment", new BigDecimal(20000) );
        Payment payment3=new Payment("Null payment", null );
        Payment payment4=new Payment("Test payment", new BigDecimal(1) );


        List<Payment> payments=new ArrayList<Payment>();
        payments.add(payment1);
        payments.add(payment2);
        payments.add(payment3);
        payments.add(payment4);

        gateway=(LoadPaymentsGateway)context.getBean("inPaymentGateway");


        gateway.loadPayments(payments);
        context.close();
    }
}

12. Results

I ordered strings in log in order of processing. So we can see what newPaymentChannel received collection of 4 objects. After that they were sent one-by-one into singlePaymentChannel. Filtered payment channel got only 3 objects : one was rejected because of null amount. Later based on value of amount (and property isVip) 2objects were sent into regularPayments and 1 - vipPayments, and transformed by corresponding transformers into another objects(into Strings in this example). At the end transformed objects were processed by PaymentServiceActivators(processingChannel). Rejected objects are moving forward to failedPaymentChannel and processed by FailedServiceActivator.   
 

[[newPaymentChannel]] [Payment{amount=666, description='First payment'}, Payment{amount=20000, description='Second payment'}, Payment{amount=null, description='Null payment'}, Payment{amount=1, description='Test payment'}]

[[singlePaymentChannel]] Payment{amount=666, description='First payment'}

[[singlePaymentChannel]] Payment{amount=20000, description='Second payment'}
[[singlePaymentChannel]] Payment{amount=null, description='Null payment'}
[[singlePaymentChannel]] Payment{amount=1, description='Test payment'}
  
[[filteredPaymentChannel]] Payment{amount=666, description='First payment'}
[[filteredPaymentChannel]] Payment{amount=20000, description='Second payment'}

[[filteredPaymentChannel]] Payment{amount=1, description='Test payment'}
REJECTED:Payment{amount=null, description='Null payment'}

[[regularPayments]] Payment{amount=666, description='First payment'}
[[regularPayments]] Payment{amount=1, description='Test payment'}

[[vipPayments]] Payment{amount=20000, description='Second payment'}

[[processingChannel]] First payment 666
PROCESSED:First payment 666


[[processingChannel]] !!!  VIP PAYMENT:Second payment 20000
PROCESSED:!!!  VIP PAYMENT:Second payment 20000
 

[[processingChannel]] Test payment 1
FAILED:Test payment 1 WITH ERROR:java.lang.Exception: Test payment was not processed

[[failedPaymentsChannel]] org.springframework.integration.MessageHandlingException: java.lang.Exception: Test payment was not processed
 


13. The end

Source code can be downloaded from here.