Friday, July 12, 2013

Efficient Multi-threading application with JPA Hibernate, Ehcache, C3P0


Scenario: You want to develop a multi-threading standalone application in which every worker thread can make database transactions (insert or update). These transactions must not delay the thread in any case.


Thoughts: A simple typical transaction can be between 4ms to 6ms for localhost databases. Of course this not a catastrophic overhead in most applications but in applications that the concurrency is important, this overhead should be managed, let alone when the database is not in the same server.

Recommendation: To develop that, there must be an independent thread that will manage all transactions. That thread should have a Queue which will be filled up from the worker threads.

Notes: JPA EntityManager are not synchronized. That means that you cannot create an instance of EntityManager and do all the transactions from that instance. However, JPA EntityManagerFactory is a synchronized object and hence you can create as many EntityManager you want from an instance of EntityManagerFactory.

Java offers a bunch of special Queues that are capable of staying synchronized and waiting until new elements added to that kind of Queue. All that Queues (ArrayBlockingQueue, DelayQueue, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue) impelement the BlockingQueue interface of the java.util.concurrent.

In this example, I am using MySql as Database with Hibernate 4 as Persistence Provider.

First of all, we should create an eclipse JPA project. If you have already created a simple java project you can add JPA Facet from its project properties. Below is a recommended persistence.xml.
 <?xml version="1.0" encoding="UTF-8"?>   
  <persistence version="2.0" xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd">   
    <persistence-unit name="BlogExample">   
    <provider>org.hibernate.ejb.HibernatePersistence</provider>   
       <shared-cache-mode>ALL</shared-cache-mode>   
       <properties>   
         <property name="hibernate.dialect" value="org.hibernate.dialect.MySQLDialect" />   
         <property name="hibernate.show_sql" value="false" />   
         <property name="hibernate.connection.url" value="jdbc:mysql://localhost/Database?characterEncoding=UTF-8" />   
         <property name="hibernate.connection.username" value="root" />   
         <property name="hibernate.connection.driver_class" value="com.mysql.jdbc.Driver" />   
         <property name="hibernate.connection.password" value="***" />   
         <property name="hibernate.cache.region.factory_class" value="org.hibernate.cache.ehcache.SingletonEhCacheRegionFactory"/>   
         <property name="hibernate.cache.use_second_level_cache" value="true"/>   
         <property name="hibernate.cache.use_query_cache" value="true"/>   
         <property name="hibernate.connection.provider_class" value="org.hibernate.connection.C3P0ConnectionProvider" />   
         <property name="hibernate.c3p0.testConnectionOnCheckin" value="true"/>   
         <property name="hibernate.c3p0.idleConnectionTestPeriod" value="300"/>   
         <property name="hibernate.c3p0.min_size" value="5" />   
         <property name="hibernate.c3p0.max_size" value="50" />   
         <property name="hibernate.c3p0.timeout" value="1800" />   
         <property name="hibernate.c3p0.max_statements" value="50" />   
       </properties>   
    </persistence-unit>   
  </persistence>   
That persistence includes all the properties for Second Level Caching using EhCache as well as a connection provider (C3P0) that is responsible for the connection with the database.
Afterwards, we create a class that main task is to instantiate an EntityManagerFactory, which will provide the required EntityManagers.
 package gr.doeyetea.blog.database;   
  import javax.persistence.EntityManager;   
  import javax.persistence.EntityManagerFactory;   
  import javax.persistence.Persistence;   
  public class EntityManagerHelper {   
    private static EntityManagerFactory emf;   
    public EntityManagerHelper(){   
    }   
     public void closeEmf() {   
      if(emf.isOpen() || emf != null) {   
       emf.close();   
      }   
      emf = null;   
     }   
     public static EntityManager getEntity(){   
         if(emf==null){   
            emf=Persistence.createEntityManagerFactory("BlogExample");   
         }   
         return emf.createEntityManager();   
       }   
  }   

Next step is to create our database service that manages the transactions and the calls from our database. Because this is a standalone application, outside of a container, we are responsible for the transactions and the rollback procedures. That's why, we get the transaction from the entity manager.
 package gr.doeyetea.blog.service;  
 import gr.doeyetea.blog.entities.AnEntity;  
 import javax.persistence.EntityManager;  
 public class DatabaseService {  
      public static void insertEntity(AnEntity entity){  
           EntityManager em = EntityManagerHelper.getEntity();  
           try{  
                em.getTransaction().begin();  
                em.merge(entity);  
                em.getTransaction().commit();  
           }catch(Exception e){  
                logger.fatal("Entity Insert Exception:",e);  
                em.getTransaction().rollback();  
           }  
           finally{  
                em.close();  
           }  
      }  
      public static AnEntity findEntitybyId(int id){  
           EntityManager em = EntityManagerHelper.getEntity();  
           AnEntity anEntity = em.find(AnEntity.class, id);  
           em.close();  
           return anEntity;  
      }  
 }  


The important class which makes the calls in our service, that runs on an independent thread, has to implement the Runnable interface and have a static BlockingQueue. Our example uses a LinkedBlockingQueue. For details about every BlockingQueue, you can see the Java API. As most of you know, a class that implements the Runnable interface must override the public void run(){} function. In the run method, we create a loop that will call the BlockingQueue take() function. That function takes an element from the Queue if any or waits until one is added by the worker Thread.
 package gr.doeyetea.blog.handler;  
 import gr.doeyetea.blog.entities.AnEntity;  
 import java.util.concurrent.BlockingQueue;  
 import java.util.concurrent.LinkedBlockingQueue;  
 public class DatabaseHandler implements Runnable {  
      private static BlockingQueue<Object> databaseQueue = new LinkedBlockingQueue<Object>();  
      private boolean running;  
      public DatabaseHandler(){  
           running = true;  
      }  
      @Override  
      public void run() {  
           while(running){  
                try {  
                     Object anObject = databaseQueue.take();  
                     if(anObject instanceof AnEntity){  
                          DatabaseController.insertEntity((AnEntity)anObject);  
                     }  
                } catch (InterruptedException e) {  
                     logger.fatal("Couldn't retrieve from Queue", e);  
                     continue;  
                }  
           }  
      }  
      public static BlockingQueue<Object> getDatabaseQueue() {  
           return databaseQueue;  
      }  
      public static void setDatabaseQueue(BlockingQueue<Object> databaseQueue) {  
           DatabaseHandler.databaseQueue = databaseQueue;  
      }       
 }  
 }  

Now the worker Threads can call the static method DatabaseHandler.getDatabaseQueue().put(AnEntity entity). Remember to instantiate the DatabaseHandler in your main method and start the thread . Whenever a worker thread puts an element to the queue, the DatabaseHandler will be responsible to make the calls to the service, without any delay for the worker thread. The delay to put an object in a queue is smaller than 1ms.

Have a nice day...