Quickstart Guide - Part 2

If you have not already seen it start with Quickstart Part 1 . This page expands on the example and expands its capabilities.

Introduction

We are now going to build a simple producer/consumer example. The consumer process handles messages generated by one or more producers. To do this we must move from the in-process persistent store to using the remote store and JemmServer.

Step 1 - Initialise a JemmServer

This relies on the classpath being set (as in part one) to include the jemm jar and dependencies. To start a server you need to run the below command specifying the data directory and the port to run on. The data directory must exist before running the server. The command sequence is thus:

> mkdir datadir/
> java org.sourceforge.jemm.JemmServer -a start -d datadir/ 

Will start a server running on the default port (5001) and using datadir to store its persistent data. This server can be stopped by running the command:

> java org.sourceforge.jemm.JemmServer -a stop

Running with the -h switch will show help and all the configuration options.

Step 2 - Create a message class

Next we need to create a message class as below. This class stores a String message which will be the communication mechanism between the producer and consumer.

package com.test;

import org.sourceforge.jemm.Entity;

@Entity
public class Message {
        protected final String message;
        
        public Message(String message) {
                this.message = message;
        }
        
        public String getMessage() {
                return message;
        }       
}

Step 3 - Create a consumer of messages

Next we create a consumer. The consumer checks the queue every second for 2 minutes and if there are messages on the queue prints them out.

package com.test;

import org.sourceforge.jemm.RemoteStore;
import org.sourceforge.jemm.Session;
import org.sourceforge.jemm.collections.JemmList;

public class Consumer {

        public static void main( String[] args ) {
                Session.setStore(new RemoteStore(args[0],Integer.parseInt(args[1])));

                System.out.println("Consumer started");
                JemmList<Message> queue = getQueue();
                
                for(int i=0;i<120;i++) {
                        while(queue.size() > 0) {
                                Message message = queue.remove(0);
                                System.out.println("Got message: " + message.getMessage());
                        }
                        
                        try {
                                Thread.sleep(1000);
                        } catch (InterruptedException e) {
                                // do nothing
                        }
                }
                
                Session.shutdown();
                System.out.println("Consumer finished");
    }

        // put here so the code can be shared with Producer
        @SuppressWarnings("unchecked")
        public static JemmList<Message> getQueue() {
                JemmList<Message> queue = (JemmList<Message>) Session.getRoot("messageQueue");                          
                if(queue == null)
                        queue = (JemmList<Message>) Session.setRootIfNull("messageQueue", 
                                        new JemmList<Message>());
                return queue;
        }
}

Step 4 - Create a message producer

Next we create a producer as below. This Producer is creating 10 messages to the same queue as the Consumer. It produces one of these messages with a delay of 1ms up to 4 seconds. The message produced contains the delay introduced.

package com.test;

import java.util.Random;

import org.sourceforge.jemm.RemoteStore;
import org.sourceforge.jemm.Session;
import org.sourceforge.jemm.collections.JemmList;

public class Producer {

        public static void main( String[] args ) {
                Session.setStore(new RemoteStore(args[0],Integer.parseInt(args[1])));
                
                System.out.println("Producer started");
                JemmList<Message> queue = Consumer.getQueue();

                Random random = new Random();
                for(int i=0;i<10;i++) {
                        try {
                                int delay = random.nextInt(4000)+1;
                                System.out.println("Waiting " + delay + "ms");
                                Thread.sleep(delay);
                        } catch (InterruptedException e) {
                                // do nothing
                        }
                        
                        Message message = new Message("Hello " + i);
                        System.out.println("Sending message " + message.getMessage());
                        queue.add(message);
                }

                Session.shutdown();
                System.out.println("Producer finished");
    }
}

Step 5 - Compile it

To run the example first compile.

> mvn install

Step 6 - Run it!

All thats left to do is to run the application. In this version you should only run one consumer (otherwise there is a race condition between queue.size() and queue.remove(0) . The producers do not need to running at the same time as the consumers (the queue is persistent). And any number of producers can be running at once as adding to a JemmList is threadsafe.

To run a producer simply:

> java -javaagent:<path to jemm jar> com.test.Producer localhost 5001

To run the consumer:

> java -javaagent:<path to jemm jar> com.test.Consumer localhost 5001

Thats it. You now have a working multi-process producer/consumer running in less than 100 lines of code. JEMM makes it easy to do some otherwise complex things. For example you could within your message pass another message queue for replies. Creating a process farm can be done with a queue and lots of consumers.

JEMM also makes your domain logic cleaner. Whether the List is running within the same process or in a different one does not change the code needed.