Thursday, 8 August 2013

Argot, Big-Data and Hazelcast

Hazelcast is an in-memory data grid that provides a scaleable data store with a rich set of features.  Over the past year I've been using it in other applications and I've been impressed with it's capabilities.  A new feature in recently released Hazelcast 3.0, is the ability to plug-in your own serialization code.  By allowing Argot to be used both in the in-memory data grid and as the transport for Internet of Things applications, it allows the same code developed for Argot to be used in Big-Data applications.  The combination of Hazelcast and Argot means less code, less development time and more scale.

Recently the Hazelcast team made available a small benchmark code that provided both examples of how various serialization software could be embedded into Hazelcast and showed a rudimentary benchmark for the different methods.  The code is made available here.  I've forked the code and included Argot as an additional example.

The rest of this post covers the details of how I integrated with Hazelcast.  It also includes a discussion of how Argot's performance compares to the other serialization techniques used in the benchmark.

Argot Hazelcast Integration

In this example the Argot definition for the 'sample' object is defined as follows. The default Argot common dictionary doesn't include the generic long_array or double_array, so I've also created them here:

definition long_array 1.0:
    (meta.sequence [
        (meta.array
            (meta.reference #uvint28)
            (meta.reference #int64))
    ]);
 
definition double_array 1.0:
    (meta.sequence [
        (meta.array
            (meta.reference #uvint28)
            (meta.reference #double))
    ]);


definition sample 1.0:
{
  @intVal #int32;
  @floatVal #float;
  @shortVal #int16;
  @byteArr #u16binary;
  @longArr #long_array;
 /* @dblArr #double_array; */
  @str #u32utf8;
};


The Sample.java class is modified to include Java annotations.  These are used to bind the names of the variables in the definition to the Java fields.

@ArgotMarshaller(TypeAnnotationMarshaller.class)
public class SampleObject implements java.io.Serializable, KryoSerializable {

    public static final String ARGOT_NAME = "sample";
    
    @ArgotTag("intVal")
    public int intVal;
    
    @ArgotTag("floatVal")
    public float floatVal;
    
    @ArgotTag("shortVal")
    public short shortVal;
    
    @ArgotTag("byteArr")
    public byte[] byteArr;
    
    @ArgotTag("longArr")
    public long[] longArr;
    /*
    @ArgotTag("dblArr")
    public double[] dblArr;
    */
    @ArgotTag("str")
    public String str;
 
    ... 


With the definition of the data defined, this is compiled and bound to Java classes using the following an Argot loader class.

public class SampleArgotLoader 
extends ArgotCompilerLoader 
{
    private static final String NAME = "sample.argot";
 
    public SampleArgotLoader() 
    {
        super(NAME);
    }

    @Override
    public void bind( TypeLibrary library )
    throws TypeException
    {
        library.bind( library.getTypeId(SampleObject.ARGOT_NAME, "1.0"), SampleObject.class );
        library.bind( library.getTypeId("long_array", "1.0"), new TypeArrayMarshaller(), new TypeArrayMarshaller(), long[].class);
        library.bind( library.getTypeId("double_array", "1.0"), new TypeArrayMarshaller(), new TypeArrayMarshaller(), double[].class);
    }

The above elements would be required for any Argot based implementation and is not a special requirement of the Hazelcast implementation.  I've included them here for completeness of the example.

The bridge between Hazelcast and Argot is defined by a StreamSerializer implementation.  The ArgotSerializer provides the generic implementation that can be used by any object.  The read and write methods read and write the object to stream.

    public Object read(ObjectDataInput input) 
    throws IOException 
    {
        try 
        {
            TypeInputStream typeIn = new TypeInputStream( (InputStream) input, _map);
            return (SampleObject) typeIn.readObject(_typeId);
        } 
        catch (TypeException e) 
        {
            throw new IOException(e.getMessage(), e);
        }
    }
 
    public void write(ObjectDataOutput output, Object object) 
    throws IOException
    {
        try 
        {
            TypeOutputStream typeOut = new TypeOutputStream( (OutputStream) output, _map );
            typeOut.writeObject( _typeId, object );
        } 
        catch (TypeException e) 
        {
            throw new IOException(e.getMessage(), e);
        }
    }
 
The Hazelcast API makes registering and using custom serialization reasonable simple to configure.  A class is bound to a serializer.  In this example I'm binding the SampleObject class to the ArgotSerializer:

SerializationConfig config = new SerializationConfig();
config.addSerializerConfig(new SerializerConfig().
    setTypeClass(SampleObject.class).
    setImplementation(new ArgotSerializer(typeMap,SampleObject.ARGOT_NAME)));

That's it, the Sample object can now be serialized to the Internet of Things and can also be stored in a Big-Data environment.

Argot Performance

Having not looked at Argot performance for some time, I was very interested to see how Argot would perform in the benchmark.  While performance is not necessarily the number one decision in choosing a serialization protocol, it is reasonably important.

The first test I conducted didn't include either the array of long values or the array of double values.  The results were:

Argot Serialization4172 bytes348 ms
Java Serialization4336 bytes411 ms
DataSerializable4237 bytes263 ms
IdentifiedDataSerializable4186 bytes170 ms
Portable4205 bytes228 ms
Kryo4336 bytes277 ms
Kryo-unsafe4336 bytes256 ms

While Argot didn't come first in the first performance test, it wasn't last which isn't bad, especially considering Argot hasn't had a lot of performance tuning done.  Argot did win at using the least amount of data. This can be achieve as it uses meta data describe the structure of the binary format.  This results in less data stored at runtime.

Being reasonably happy with Argot's performance, the next step I did was include the array of long values in the sample object.  The benchmark stores 3000 long values to the array.  The initial results were very bad for Argot and made me realise that the implementation of int64 in Argot was based on some old code that wrote individual bytes to the stream.  After fixing this up, the following results were achieved:

Argot Serialization28174 bytes5587 ms
Java Serialization28374 bytes862 ms
DataSerializable28241 bytes918 ms
IdentifiedDataSerializable28190 bytes851 ms
Portable28213 bytes904 ms
Kryo28374 bytes786 ms
Kryo-unsafe28374 bytes727 ms

Ouch! The Argot performance of writing an array of 3000 int64's was six times worse than the closest performer.  This test showed me two things; the first is that this benchmark is heavily biased towards arrays, and the second is that Argot's default array marshaller is terrible at dealing with large arrays of simple types.

After some further investigation I discovered that performance for arrays is all gained/lost in the number of calls to read or write to the stream.  If you can minimize stream writes you can improve the performance greatly.  Not to give up, I used Argot's ability to implement custom marshalling to implement a fast int64 array marshaller.  The writer for the marshaller is as follows:

        public void write(TypeOutputStream out, Object o) 
        throws TypeException, IOException 
        {
            long[] longArray = (long[]) o;            
            
            _writer.write(out, longArray.length);
            OutputStream output = out.getStream();
            byte[] bytes = new byte[longArray.length*8];
            int pos = 0;
            
            for (int x=0; x<longArray.length;x++)
            {
                long s = longArray[x];
                bytes[pos] = (byte)(s >>> 56);
                bytes[pos+1] = (byte)(s >>> 48);
                bytes[pos+2] = (byte)(s >>> 40);
                bytes[pos+3] = (byte)(s >>> 32);
                bytes[pos+4] = (byte)(s >>> 24);
                bytes[pos+5] = (byte)(s >>> 16);
                bytes[pos+6] = (byte)(s >>> 8);
                bytes[pos+7] = (byte)(s);
                
                pos+=8;
            }

            output.write(bytes,0,pos);
        }


This implementation minimises writes to the stream by allocating the full byte stream and preparing the data before writing it out to the stream.  A similar reader was written which reads the full array in before parsing it.  To connect the new marshaller to the long array was a simple matter of changing the binding:

  library.bind( library.getTypeId("long_array", "1.0"), 
                new LongArrayReader(), new LongArrayWriter(), long[].class);

After running the benchmark again, I got the following results:

Argot Serialization28174 bytes768 ms
Java Serialization28374 bytes832 ms
DataSerializable28241 bytes899 ms
IdentifiedDataSerializable28190 bytes816 ms
Portable28213 bytes887 ms
Kryo28374 bytes713 ms
Kryo-unsafe28374 bytes703 ms

With the optimized array marshaller Argot comes third in the benchmark!  Not a bad result.

In conclusion, it's been very interesting to see how Argot compares to other serialization techniques in both implementation and performance.  I've concluded that for objects with many simple fields, Argot is performing reasonably, however, additional buffering would likely bring Argot inline with some of the faster serialization implementations.  I will put that on the task list for Argot 1.4.  In addition, the serialization of large arrays are best done by specialised implementations. In implementations where performance is important it's good to know that Argot is flexible enough to allow this to be configured.