Saturday, February 14, 2015

JBehave - getting started

First of all - what is JBehave ? About that, it's better to read from it original site : http://jbehave.org/ : 

JBehave is a framework for Behaviour-Driven Development (BDD). BDD is an evolution of test-driven development (TDD) and acceptance-test driven design, and is intended to make these practices more accessible and intuitive to newcomers and experts alike. It shifts the vocabulary from being test-based to behaviour-based, and positions itself as a design philosophy. 

In shorts, it's addon for JUnit,which is giving possibilities to make testing process more transparent to business(non-developer) users. For example, with JBehave we can write test descriptions in plain text (.story) files. 

0. Goal. 

As a developer , I want to test my ActiveMQ application using BDD methodology.
Component of my application:
Active MQ getting started -1: Producer/Consumer(Sender/Receiver) 



So, I want to re-write my JUnit tests using JBehave.

My test cases :
1. Producer/Consumer : simple test, 1 message have to be received by 1 Consumer.
2. Producer/Consumer : multiple test, with several consumers of one queue, 1 message have to be received by only 1 Consumer.
3. Publisher/Subscriber : all subscribers of topic have to receive the message. 

1. Stories

What is "story"?
From jbehave.com:
Behaviour-Driven Development encourages you to start defining the stories via scenarios that express the desired behaviour in a textual format, e.g.:
Given a stock of symbol STK1 and a threshold of 10.0
When the stock is traded at 5.0
Then the alert status should be OFF
The textual scenario should use the language of the business domain and shield away as much as possible the details of the technical implementation. Also, it should be given a name that is expressive of the functionality that is being verified, i.e. trader_is_alerted_of_status.story.

So,let's re-write our test cases from above,to "stories" syntax.
For that I created 2 files in test/resource directory: ProducerConsumer.story and PublisherSubscriber.story. In these files, I have to re-formulate test cases,using BDD syntax(Scenario, Give, When, Then).

My test stories files: 

ProducerConsumer.story : 
Scenario: simple message receiving by one consumer
Given one producer, one consumer, one message
When producer sends message to queue
Then consumer receives message from queue

Scenario: simple message receiving by several consumers:only one of them have to receive message
Given one producer, several consumers, one message
When producer sends message to queue
Then only one consumer have to receive message from queue

PublisherSubscriber.story: 
Scenario: broadcast message receiving by all subscribers of topic
Given one publisher, several subscribers, one message
When publisher sends message
Then all subscribers have to receive this message

2. Test classes

Now we need JUnit tests which have to cover our stories.  I created 2 files (each for each story file) : PublisherSubscriberJB and ProducesConsumerJB. They are empty now, we will implement them later.

ProducesConsumerJB:

package com.demien.amq.jbehave;

/**
 * Created by dmitry on 14.02.15.
 */
public class ProducerConsumerJB {
}


PublisherSubscriberJB:

package com.demien.amq.jbehave;

/**
 * Created by dmitry on 09.02.15.
 */
public class PublisherSubscriberJB {


}


3. JBehave runner.

As I mentioned before, JBehave  - addon on JUnit. So, we have to create jbehave-runner file,which will make our JUnit tests consider our stories files.
I copy/pasted it from one of examples from jbehave site.

package com.demien.amq.jbehave;

import org.jbehave.core.Embeddable;
import org.jbehave.core.configuration.Configuration;
import org.jbehave.core.configuration.MostUsefulConfiguration;
import org.jbehave.core.i18n.LocalizedKeywords;
import org.jbehave.core.io.LoadFromClasspath;
import org.jbehave.core.io.StoryFinder;
import org.jbehave.core.junit.JUnitStories;
import org.jbehave.core.model.ExamplesTableFactory;
import org.jbehave.core.parsers.RegexStoryParser;
import org.jbehave.core.reporters.StoryReporterBuilder;
import org.jbehave.core.steps.InjectableStepsFactory;
import org.jbehave.core.steps.InstanceStepsFactory;
import org.jbehave.core.steps.ParameterConverters;

import java.text.SimpleDateFormat;
import java.util.List;

import static org.jbehave.core.io.CodeLocations.codeLocationFromClass;
import static org.jbehave.core.reporters.Format.CONSOLE;
import static org.jbehave.core.reporters.Format.HTML;
import static org.jbehave.core.reporters.Format.TXT;
import static org.jbehave.core.reporters.Format.XML;

/**
 * Created by dmitry on 09.02.15.
 */
public class JBehaveRunner extends JUnitStories {

        public JBehaveRunner() {
            configuredEmbedder().embedderControls().doGenerateViewAfterStories(true).doIgnoreFailureInStories(true)
                    .doIgnoreFailureInView(true).useThreads(1).useStoryTimeoutInSecs(60);
        }

        @Override
        public Configuration configuration() {
            Class<? extends Embeddable> embeddableClass = this.getClass();
            // Start from default ParameterConverters instance
            ParameterConverters parameterConverters = new ParameterConverters();
            // factory to allow parameter conversion and loading from external resources (used by StoryParser too)
            ExamplesTableFactory examplesTableFactory = new ExamplesTableFactory(new LocalizedKeywords(), new LoadFromClasspath(embeddableClass), parameterConverters);
            // add custom converters
            parameterConverters.addConverters(new ParameterConverters.DateConverter(new SimpleDateFormat("yyyy-MM-dd")),
                    new ParameterConverters.ExamplesTableConverter(examplesTableFactory));
            return new MostUsefulConfiguration()
                    .useStoryLoader(new LoadFromClasspath(embeddableClass))
                    .useStoryParser(new RegexStoryParser(examplesTableFactory))
                    .useStoryReporterBuilder(new StoryReporterBuilder()
                            .withCodeLocation(codeLocationFromClass(embeddableClass))
                            .withDefaultFormats()
                            .withFormats(CONSOLE, TXT, HTML, XML))
                    .useParameterConverters(parameterConverters);
        }

        @Override
        public InjectableStepsFactory stepsFactory() {
            return new InstanceStepsFactory(configuration(), new PublisherSubscriberJB(), new ProducerConsumerJB());
        }

        @Override
        protected List<String> storyPaths() {
            return new StoryFinder().findPaths(codeLocationFromClass(this.getClass()), "*.story", "*excluded*.story");

        }
}

Most interesting thing here is function stepsFactory - here we have to list our "steps"  - JUnit tests which have to cover our stories, which we created on previous steps.


4. First run

Now we can run our JBehave runner(just as regular JUnit test) to see if everything is fine: we have to be sure that JBehave have found our .story files.
My output after run:

Processing system properties {}
Using controls EmbedderControls[batch=false,skip=false,generateViewAfterStories=true,ignoreFailureInStories=true,ignoreFailureInView=true,verboseFailures=false,verboseFiltering=false,storyTimeoutInSecs=60,failOnStoryTimeout=false,threads=1]

(BeforeStories)

Running story ProducerConsumer.story

(ProducerConsumer.story)
Scenario: simple message receiving by one consumer
Given one producer, one consumer, one message (PENDING)
When producer sends message to queue (PENDING)
Then consumer receives message from queue (PENDING)
@Given("one producer, one consumer, one message")
@Pending
public void givenOneProducerOneConsumerOneMessage() {
  // PENDING
}

@When("producer sends message to queue")
@Pending
public void whenProducerSendsMessageToQueue() {
  // PENDING
}

@Then("consumer receives message from queue")
@Pending
public void thenConsumerReceivesMessageFromQueue() {
  // PENDING
}


Scenario: simple message receiving by several consumers:only one of them have to receive message
Given one producer, several consumers, one message (PENDING)
When producer sends message to queue (PENDING)
Then only one consumer have to receive message from queue (PENDING)
@Given("one producer, several consumers, one message")
@Pending
public void givenOneProducerSeveralConsumersOneMessage() {
  // PENDING
}

@When("producer sends message to queue")
@Pending
public void whenProducerSendsMessageToQueue() {
  // PENDING
}

@Then("only one consumer have to receive message from queue")
@Pending
public void thenOnlyOneConsumerHaveToReceiveMessageFromQueue() {
  // PENDING
}



Running story PublisherSubscriber.story

(PublisherSubscriber.story)
Scenario: broadcast message receiving by all subscribers of topic
Given one publisher, several subscribers, one message (PENDING)
When publisher sends message (PENDING)
Then all subscribers have to receive this message (PENDING)
@Given("one publisher, several subscribers, one message")
@Pending
public void givenOnePublisherSeveralSubscribersOneMessage() {
  // PENDING
}

@When("publisher sends message")
@Pending
public void whenPublisherSendsMessage() {

Processing system properties {}
Using controls EmbedderControls[batch=false,skip=false,generateViewAfterStories=true,ignoreFailureInStories=true,ignoreFailureInView=true,verboseFailures=false,verboseFiltering=false,storyTimeoutInSecs=60,failOnStoryTimeout=false,threads=1]

(BeforeStories)

Running story ProducerConsumer.story

(ProducerConsumer.story)
Scenario: simple message receiving by one consumer
Given one producer, one consumer, one message (PENDING)
When producer sends message to queue (PENDING)
Then consumer receives message from queue (PENDING)
@Given("one producer, one consumer, one message")
@Pending
public void givenOneProducerOneConsumerOneMessage() {
  // PENDING
}

@When("producer sends message to queue")
@Pending
public void whenProducerSendsMessageToQueue() {
  // PENDING
}

@Then("consumer receives message from queue")
@Pending
public void thenConsumerReceivesMessageFromQueue() {
  // PENDING
}


Scenario: simple message receiving by several consumers:only one of them have to receive message
Given one producer, several consumers, one message (PENDING)
When producer sends message to queue (PENDING)
Then only one consumer have to receive message from queue (PENDING)
@Given("one producer, several consumers, one message")
@Pending
public void givenOneProducerSeveralConsumersOneMessage() {
  // PENDING
}

@When("producer sends message to queue")
@Pending
public void whenProducerSendsMessageToQueue() {
  // PENDING
}

@Then("only one consumer have to receive message from queue")
@Pending
public void thenOnlyOneConsumerHaveToReceiveMessageFromQueue() {
  // PENDING
}



Running story PublisherSubscriber.story

(PublisherSubscriber.story)
Scenario: broadcast message receiving by all subscribers of topic
Given one publisher, several subscribers, one message (PENDING)
When publisher sends message (PENDING)
Then all subscribers have to receive this message (PENDING)
@Given("one publisher, several subscribers, one message")
@Pending
public void givenOnePublisherSeveralSubscribersOneMessage() {
  // PENDING
}

@When("publisher sends message")
@Pending
public void whenPublisherSendsMessage() {
  // PENDING
}

@Then("all subscribers have to receive this message")
@Pending
public void thenAllSubscribersHaveToReceiveThisMessage() {
  // PENDING
}


(AfterStories)  // PENDING
}

@Then("all subscribers have to receive this message")
@Pending
public void thenAllSubscribersHaveToReceiveThisMessage() {
  // PENDING
}


(AfterStories)


 Everything looks good: jbehave found both .story files and even generated for us "mockup"(function definitions) of future implementation!

So, we can copy/paste function definitions to our  test classes in to"fill"(implement) them.

5. Stories implementation.

Now, let's put code into generated function definitions.
 
ProducerConsumerJB

package com.demien.amq.jbehave;

import com.demien.amq.Consumer;
import com.demien.amq.Producer;
import com.demien.amq.TestObject;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.jbehave.core.annotations.Given;
import org.jbehave.core.annotations.Then;
import org.jbehave.core.annotations.When;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.*;

/**
 * Created by dmitry on 14.02.15.
 */
public class ProducerConsumerJB {

    public static String brokerURL = "tcp://localhost:61616";
    private final ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
    private final String queueName1="TestQueue1";
    private final String queueName2="TestQueue2";

    Producer<TestObject> producer;
    Consumer<TestObject> consumer;
    List<Consumer<TestObject>> consumers;
/*
    Scenario: simple message receiving by one consumer
    Given one producer, one consumer, one message (PENDING)
    When producer sends message to queue (PENDING)
    Then consumer receives message from queue (PENDING)
*/
    @Given("one producer, one consumer, one message")
    public void givenOneProducerOneConsumerOneMessage() throws JMSException {
        producer=new Producer<TestObject>(factory, queueName1);
        consumer=new Consumer<TestObject>(factory, queueName1);
        consumer.setConsumerId("THE_ONLY_CONSUMER");
    }

    @When("producer sends message to queue")
    public void whenProducerSendsMessageToQueue() throws JMSException, InterruptedException {
        producer.postObjectMessage(new TestObject());
        Thread.sleep(1000);
    }

    @Then("consumer receives message from queue")
    public void thenConsumerReceivesMessageFromQueue() {
        int receivedMessageCount=consumer.getMessages().size();
        assertEquals(1, receivedMessageCount);
    }

/*
    Scenario: simple message receiving by several consumers:only one of them have to receive message
    Given one producer, several consumers, one message (PENDING)
    When producer sends message to queue (PENDING)
    Then only one consumer have to receive message from queue (PENDING)
*/
    @Given("one producer, several consumers, one message")
    public void givenOneProducerSeveralConsumersOneMessage() throws JMSException {
        producer=new Producer<TestObject>(factory, queueName2);
        consumers=new ArrayList<Consumer<TestObject>>();
        for (int i=0;i<10;i++) {
            Consumer<TestObject> eachConsumer=new Consumer<TestObject>(factory, queueName2);
            eachConsumer.setConsumerId("Consumer#"+Integer.toString(i));
            consumers.add(eachConsumer);
        }
    }
/* - already implemented
    @When("producer sends message to queue")
    public void whenProducerSendsMessageToQueue() {
        // PENDING
    }
*/
    @Then("only one consumer have to receive message from queue")
    public void thenOnlyOneConsumerHaveToReceiveMessageFromQueue() {
        int receivedMessageCount=0;
        for (Consumer<TestObject> eachConsumer: consumers) {
            receivedMessageCount=receivedMessageCount+eachConsumer.getMessages().size();
        }
        assertEquals(1, receivedMessageCount);
    }
} 
 
 
PublisherSubscriberJB

package com.demien.amq.jbehave;

import com.demien.amq.Publisher;
import com.demien.amq.Subscriber;
import com.demien.amq.TestObject;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.jbehave.core.annotations.Given;
import org.jbehave.core.annotations.Then;
import org.jbehave.core.annotations.When;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertTrue;

/**
 * Created by dmitry on 09.02.15.
 */
public class PublisherSubscriberJB {
    public static String brokerURL = "tcp://localhost:61616";
    private final ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
    private final String topicName="TestTopic";

    private final int SUBSCRIBERS_COUNT=10;
    Long TEST_ID=-1L;
    String TEST_NAME="test name";

    Publisher<TestObject> publisher;
    List<Subscriber<TestObject>> subscribers;

/*    (PublisherSubscriber.story)
    Scenario: broadcast message receiving by all subscribers of topic
    Given one publisher, several subscribers, one message (PENDING)
    When publisher sends message (PENDING)
    Then all subscribers have to receive this message (PENDING)
*/
    @Given("one publisher, several subscribers, one message")
    public void givenOnePublisherSeveralSubscribersOneMessage() throws JMSException {
        publisher=new Publisher<TestObject>(factory, topicName);

        subscribers=new ArrayList<Subscriber<TestObject>>();
        for (int i=0; i<SUBSCRIBERS_COUNT; i++) {
            Subscriber<TestObject> subscriber=new Subscriber<TestObject>(factory, topicName);
            subscriber.setSubscriberId(Integer.toString(i));
            subscribers.add(subscriber);
        }
    }

    @When("publisher sends message")
    public void whenPublisherSendsMessage() throws JMSException, InterruptedException {

        TestObject testObject=new TestObject();
        testObject.id=new Long(TEST_ID);
        testObject.name=new String(TEST_NAME);

        //post message
        publisher.postObjectMessage(testObject);
        Thread.sleep(1000);
    }

    @Then("all subscribers have to receive this message")
    public void thenAllSubscribersHaveToReceiveThisMessage() {
        for (Subscriber<TestObject> subscriber:subscribers) {
            TestObject received=subscriber.getMessages().get(0);
            assertTrue(received.name.equals(TEST_NAME));
            assertTrue(received.id.equals(TEST_ID));
        }
    }

}


6. Final run and results

Now, we can run our message broker and after the we can finally run our JBehave tests .
My results :

Running story ProducerConsumer.story

(ProducerConsumer.story)
Scenario: simple message receiving by one consumer
log4j:WARN No appenders could be found for logger (org.apache.activemq.transport.WireFormatNegotiator).
log4j:WARN Please initialize the log4j system properly.
Given one producer, one consumer, one message
Sending message:com.demien.amq.TestObject@807bbc
Consumer[THE_ONLY_CONSUMER] Message received:
When producer sends message to queue
Then consumer receives message from queue

Scenario: simple message receiving by several consumers:only one of them have to receive message
Given one producer, several consumers, one message
Sending message:com.demien.amq.TestObject@7b0aef
Consumer[Consumer#0] Message received:
When producer sends message to queue
Then only one consumer have to receive message from queue


Running story PublisherSubscriber.story

(PublisherSubscriber.story)
Scenario: broadcast message receiving by all subscribers of topic
Given one publisher, several subscribers, one message
Subscriber[0] received message.
Subscriber[3] received message.
Subscriber[4] received message.
Subscriber[1] received message.
Subscriber[2] received message.
Subscriber[7] received message.
Subscriber[6] received message.
Subscriber[9] received message.
Subscriber[5] received message.
Subscriber[8] received message.
When publisher sends message
Then all subscribers have to receive this message

As we can see, result are very easy for understanding even for users which are not related with development. Also Jbehave can generate very beautiful html tables with results (whey are located in /target/jbehave directory) such as :
 

Story Reports

StoriesScenariosGivenStory ScenariosSteps

NameExcludedTotalSuccessfulPendingFailedExcludedTotalSuccessfulPendingFailedExcludedTotalSuccessfulPendingFailedNot PerformedIgnorableDuration (hh:mm:ss.SSS)View
AfterStories0000000000000000000:00:00.000stats|html|xml |txt
BeforeStories0000000000000000000:00:00.000stats|xml|html |txt
ProducerConsumer0220000000066000000:00:03.000stats|html|xml |txt
PublisherSubscriber0110000000033000000:00:01.000stats|xml|html |txt
40330000000099000000:00:04.000Totals

Full source code could be downloaded from here.  

Saturday, February 7, 2015

Active MQ getting started -2: Publisher/Subscriber

This is continuing of my previous post about ActiveMQ : Publisher/Subcriber(Sender/Receiver).
Actually it will be copy/paste of previous post, with few changes, so I suggest to read it before reading this topic :)

Theoretical information from wikipedia:
The publish/subscribe model supports publishing messages to a particular message topic. Subscribers may register interest in receiving messages on a particular message topic. In this model, neither the publisher nor the subscriber knows about each other. A good analogy for this is an anonymous bulletin board
  • Zero or more consumers will receive the message.
  • There is a timing dependency between publishers and subscribers. The publisher has to create a message topic for clients to subscribe. The subscriber has to remain continuously active to receive messages, unless it has established a durable subscription. In that case, messages published while the subscriber is not connected will be redistributed whenever it reconnects.
JMS provides a way of separating the application from the transport layer of providing data. The same Java classes can be used to communicate with different JMS providers by using the Java Naming and Directory Interface (JNDI) information for the desired provider. The classes first use a connection factory to connect to the queue or topic, and then use populate and send or publish the messages. On the receiving side, the clients then receive or subscribe to the messages.
   

1. Publisher.

It's a copy of Producer class from previous post with only one difference: instead of queue we have to create topic.

package com.demien.amq;

import javax.jms.*;
import java.io.Serializable;

public class Publisher<T extends Serializable> {
    private Connection connection;
    private Session session;
    private MessageProducer publisher;

    public Publisher(ConnectionFactory factory, String topicName) throws JMSException {
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        publisher = session.createProducer(topic);
    }

    public void postObjectMessage(T object) throws JMSException {
        Message message = session.createObjectMessage(object);
        publisher.send(message);
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }
} 
 

2. Subscriber.

Subscriber also was not created from scratch :) It's copy of Receiver class. Also I added field subscriberId for test, to make process of message receiving by different subscribers more transparent.
 
package com.demien.amq;

import org.apache.activemq.command.ActiveMQObjectMessage;

import javax.jms.*;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class Subscriber<T extends Serializable> implements MessageListener {

    private Connection connection;
    private Session session;
    private MessageConsumer consumer;
    private String subscriberId="";

    private List<T> messages = new ArrayList<T>();

    public Subscriber(ConnectionFactory factory, String topicName) throws JMSException {
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        consumer = session.createConsumer(topic);
        consumer.setMessageListener(this);
    }

    public void setSubscriberId(String subscriberId) {
        this.subscriberId=subscriberId;
    }

    public List<T> getMessages() {
        return messages;
    }

    public void onMessage(Message message) {
        try {
            if (message instanceof ObjectMessage) {
                System.out.println("Subscriber<"+subscriberId+"> received message.");
                messages.add((T)((ActiveMQObjectMessage) message).getObject());
            } else {
                System.out.println("Invalid message received.");
            }
        } catch (Exception e) {
            System.out.println("Caught:" + e);
            e.printStackTrace();
        }
    }
}
 

3. Test class. 

For test we can use MassageBroker and TestObject from my previous post. So, all we need it to create test class which will post our TestObject into topic and check : every topic subscriber have to receive these message. 
 
 
package com.demien.amq;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertTrue;

public class PublisherSubscriberTest {
    public static String brokerURL = "tcp://localhost:61616";
    private final ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
    private final String topicName="TestTopic";

    private final int SUBSCRIBERS_COUNT=10;

    Publisher<TestObject> publisher;
    List<Subscriber<TestObject>> subscribers;

    @Before
    public void init() throws JMSException {
        publisher=new Publisher<TestObject>(factory, topicName);

        subscribers=new ArrayList<Subscriber<TestObject>>();
        for (int i=0; i<SUBSCRIBERS_COUNT; i++) {
            Subscriber<TestObject> subscriber=new Subscriber<TestObject>(factory, topicName);
            subscriber.setSubscriberId(Integer.toString(i));
            subscribers.add(subscriber);
        }
    }

    @After
    public void finish() throws JMSException {
        publisher.close();
    }

    @Test
    public void multipleMessagesTest() throws JMSException, InterruptedException {
        Long TEST_ID=-1L;
        String TEST_NAME="test name";


        TestObject testObject=new TestObject();
        testObject.id=new Long(TEST_ID);
        testObject.name=new String(TEST_NAME);

        //post message
        publisher.postObjectMessage(testObject);
        Thread.sleep(1000);

        // receive messages
        for (Subscriber<TestObject> subscriber:subscribers) {
            TestObject received=subscriber.getMessages().get(0);
            assertTrue(received.name.equals(TEST_NAME));
            assertTrue(received.id.equals(TEST_ID));
        }

    }


}
 

4. Results.

Of course, before running of our test we have to run our message broker. 
For me, results where : 
 
log4j:WARN No appenders could be found for logger (org.apache.activemq.transport.WireFormatNegotiator).
log4j:WARN Please initialize the log4j system properly.
Subscriber<0> received message.
Subscriber<2> received message.
Subscriber<1> received message.
Subscriber<4> received message.
Subscriber<5> received message.
Subscriber<8> received message.
Subscriber<6> received message.
Subscriber<9> received message.
Subscriber<3> received message.
Subscriber<7> received message.

Process finished with exit code 0

So, each subscriber received message! Everyone is happy :) 

Source code (together with previous topic) can be downloaded from here.

Tuesday, February 3, 2015

Active MQ getting started -1: Producer/Consumer(Sender/Receiver)

Before ActiveMQ, I worked with JMS only on Application Servers such as Weblogic and Glassfish.
When I started my work with ActiveM, I was surprised, how easy is to work with it.

At this post I'm going to show how to use ActiveMQ in simple application : point-to-point message exchange :  Produces/Consumer(Sender/Receiver).

Theoretical information.

JMS : http://en.wikipedia.org/wiki/Java_Message_Service
The Java Message Service (JMSAPI is a Java Message Oriented Middleware (MOM) API[1] for sending messages between two or more clients. JMS is a part of the Java Platform, Enterprise Edition, and is defined by a specification developed under the Java Community Process as JSR 914.[2] It is a messaging standard that allows application components based on the Java Enterprise Edition (Java EE) to create, send, receive, and read messages. It allows the communication between different components of adistributed application to be loosely coupled, reliable, and asynchronous.[3]

ActiveMQ: http://en.wikipedia.org/wiki/Apache_ActiveMQ
Apache ActiveMQ is an open source message broker written in Java together with a full Java Message Service (JMS) client. It provides "Enterprise Features" which in this case means fostering the communication from more than one client or server. Supported clients include Java via JMS 1.1 as well as several other "cross language" clients.[1] The communication is managed with features such as computer clustering and ability to use any database as a JMS persistence provider besides virtual memorycache, andjournal persistency.[2]

Point-to-point model(from JMS wiki)

In point-to-point messaging system, messages are routed to an individual consumer which maintains a queue of "incoming" messages. This messaging type is built on the concept of message queues, senders, and receivers. Each message is addressed to a specific queue, and the receiving clients extract messages from the queues established to hold their messages. While any number of producers can send messages to the queue, each message is guaranteed to be delivered, and consumed by one consumer. Queues retain all messages sent to them until the messages are consumed or until the messages expire. If no consumers are registered to consume the messages, the queue holds them until a consumer registers to consume them.


Let's now create application(actually it will be 2 applications) to try how it(activeMQ) works.  

1. ActiveMQ message broker. 

First of all, before creating our application, we need to run message broker : ActiveMQ "server".
Let's create folder for that : mroker.
Next step - file mbroker/pom.xml :
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.demien.amq</groupId>
  <artifactId>mbroker</artifactId>
  <packaging>jar</packaging>
  <version>0.1</version>
  <name>mbroker</name>
  <url>http://maven.apache.org</url>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.activemq.tooling</groupId>
        <artifactId>maven-activemq-plugin</artifactId>
        <version>5.1</version>
        <configuration>
          <configUri>xbean:file:./conf/activemq.xml</configUri>
          <fork>false</fork>
          <systemProperties>
            <property>
              <name>javax.net.ssl.keyStorePassword</name>
              <value>password</value>
            </property>
            <property>
              <name>org.apache.activemq.default.directory.prefix</name>
              <value>./target/</value>
            </property>
          </systemProperties>
        </configuration>
        <dependencies>
          <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring</artifactId>
            <version>2.5.5</version>
          </dependency>
        </dependencies>
      </plugin>
    </plugins>
  </build>
</project>

Also we need configuration file : mbroker/conf/activemq.xml
<?xml version="1.0"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core
  http://activemq.apache.org/schema/core/activemq-core.xsd
  ">
  <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="./data">
    <!-- The transport connectors ActiveMQ will listen to -->
    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://localhost:61616"/>
    </transportConnectors>
  </broker>
</beans>

Now we can run our message broker by command : 
mvn org.apache.activemq.tooling:maven-activemq-plugin:5.2.0:run


2. Test application. 

Now, when our message broker up and running, let's create our test application: 
Create directory test-app and place inside it next pom.xml file: 
test-app/pom.xml: 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.demien.amq</groupId>
  <artifactId>consumer</artifactId>
  <packaging>jar</packaging>
  <version>0.1</version>
  <name>consumer</name>
  <url>http://maven.apache.org</url>
  <dependencies>
   <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.8.2</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>javax.jms</groupId>
      <artifactId>jms</artifactId>
      <version>1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-core</artifactId>
      <version>5.2.0</version>
    </dependency>
  </dependencies>
  <repositories>
    <repository>
      <id>jboss</id>
      <url>http://repository.jboss.com/maven2</url>
      <releases>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  </repositories>
</project>



3. Producer.

For puplishing our message into message broker we have to create Producer class in testapp/src/main/java/com/demien/amq/

package com.demien.amq;
import javax.jms.*;
import java.io.Serializable;

public class Producer<T extends Serializable> {
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public Producer(ConnectionFactory factory, String queueName) throws JMSException {
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(queueName);
        producer = session.createProducer(destination);
    }

    public void postObjectMessage(T object) throws JMSException {
        Message message = session.createObjectMessage(object);
        producer.send(message);
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }
}


4. Consumer

Consumer class will receive our messages from message broker and store them in local storage(ArrayList).
testapp/src/main/java/com/demien/amq/

package com.demien.amq;

import org.apache.activemq.command.ActiveMQObjectMessage;

import javax.jms.*;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class Consumer<T extends Serializable> implements MessageListener {

    private Connection connection;
    private Session session;
    private MessageConsumer consumer;

    private List<T> messages = new ArrayList<T>();

    public Consumer(ConnectionFactory factory, String queueName) throws JMSException {
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(queueName);
        consumer = session.createConsumer(destination);
        consumer.setMessageListener(this);
    }

    public List<T> getMessages() {
        return messages;
    }

    public void onMessage(Message message) {
        try {
            if (message instanceof ObjectMessage) {
                messages.add((T)((ActiveMQObjectMessage) message).getObject());
            } else {
                System.out.println("Invalid message received.");
            }
        } catch (Exception e) {
            System.out.println("Caught:" + e);
            e.printStackTrace();
        }
    }
}

5. Simple test object class, for using in tests.

Now we can create simple class for testing in our message exchange: 

package com.demien.amq;
import java.io.Serializable;

public class TestObject implements Serializable {
    public Long id;
    public String name;
}

6.Test itself.

In our test, we will try to post data into our query using producer and to read them using consumer.


package com.demien.amq;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import java.util.List;
import static org.junit.Assert.*;

public class AppTest  {

    public static String brokerURL = "tcp://localhost:61616";
    private final ConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
    private final String queueName="TestQueue";

    Producer producer;
    Consumer consumer;

    @Before
    public void init() throws JMSException {
        producer=new Producer<TestObject>(factory, queueName);
        consumer=new Consumer<TestObject>(factory, queueName);
    }

    @After
    public void finish() throws JMSException {
        producer.close();
    }

    @Test
    public void singleMessageTest() throws JMSException, InterruptedException {
        Long TEST_ID=-1L;
        String TEST_NAME="test name";

        TestObject testObject=new TestObject();
        testObject.id=new Long(TEST_ID);
        testObject.name=new String(TEST_NAME);

        //post message
        producer.postObjectMessage(testObject);
        Thread.sleep(1000);

        //receive message
        List<TestObject> messages=consumer.getMessages();
        assertTrue(messages.size()>0);
        TestObject received=messages.get(0);
        assertTrue(received.name.equals(TEST_NAME));
        assertTrue(received.id.equals(TEST_ID));
    }
}

It message broker was started before running of test - it should run successfully. 

Full sources can be downloaded from here.