Advertisements
RSS

Tag Archives: Avro

Publishing Apache Avro messages on a Apache Kafka topic

In earlier posts I played around with both Apache Avro and Apache Kafka. The next goal was naturally to combine both and start publishing binary Apache Avro data on a Apache Kafka topic.

screen-shot-2016-09-11-at-3-28-49-pm

Generating Java from the Avro schema

I use the  Avro schema “location.avsc” from my earlier post.

$ java -jar avro-tools-1.8.1.jar compile schema location.avsc .

Which results in the Location.java for our project.

/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package nl.rubix.avro;

import org.apache.avro.specific.SpecificData;
// ... and more stuff

Make sure we have the maven dependencies right in our pom.xml:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.8.1</version>
    </dependency>
  <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.8.1</version>
    </dependency>
</dependencies>

We can now use the Location object in Java to build our binary Avro message

public ByteArrayOutputStream GenerateAvroStream() throws IOException
{
    // Schema
    String schemaDescription = Location.getClassSchema().toString();
    Schema s = Schema.parse(schemaDescription);
    System.out.println("Schema parsed: " + s);

    // Encode the data using JSON schema and embed the schema as metadata along with the data.
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(s);
    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer);
    dataFileWriter.create(s, outputStream);

    // Build AVRO message
    Location location = new Location();
    location.setVehicleId(new org.apache.avro.util.Utf8("VHC-001"));
    location.setTimestamp(System.currentTimeMillis() / 1000L);
    location.setLatitude(51.687402);
    location.setLongtitude(5.307759);
    System.out.println("Message location " + location.toString());

    dataFileWriter.append(location);
    dataFileWriter.close();
    System.out.println("Encode outputStream: " + outputStream);

    return outputStream;
}

When we have our byteArrayOutput stream we can start publishing it on a Apache Kafka topic.

public void ProduceKafkaByte()
{
    try
    {
        // Get the Apache AVRO message
        ByteArrayOutputStream data = GenerateAvroStream();
        System.out.println("Here comes the data: " + data);

        // Start KAFKA publishing
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props);
        ProducerRecord<String, byte[]> producerRecord = null;
        producerRecord = new ProducerRecord<String, byte[]>("test","1",data.toByteArray());
        messageProducer.send(producerRecord);
        messageProducer.close();
    }
    catch(IOException ex)
    {
        System.out.println ("Well this error happened: " + ex.toString());
    }
}

When we subscribe on our topic we can see the bytestream cruising by:

INFO Processed session termination for sessionid: 0x157d8bec7530002 (org.apache.zookeeper.server.PrepRequestProcessor)
Objavro.schema#####ype":"record","name":"Location","namespace":"nl.rubix.avro","fields":[{"name":"vehicle_id","type":"string","doc":"id of the vehicle"},{"name":"timestamp","type":"long","doc":"time in seconds"},{"name":"latitude","type":"double"},{"name":"longtitude","type":"double"}],"doc:":"A schema for vehicle movement events"}##<##O#P#######HC-001#ڲ#
=######@#####;@##<##O#P#######016-10-18 19:06:24,005] INFO Expiring session 0x157d8bec7530005, timeout of 30000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2016-10-18 19:06:24,005] INFO Processed session termination for sessionid: 0x157d8bec7530005 (org.apache.zookeeper.server.PrepRequestProcessor)

All code available in github here.
github

Advertisements
 
Leave a comment

Posted by on 19-10-2016 in Uncategorized

 

Tags: , , , , , , ,

Playing around with Apache Avro

When entering the world of Apache Kafka, Apache Spark and data streams, sooner or later you will find mentioning of another Apache project; namely Apache AVRO. So ….

What is Apache Avro ?

avro-logo

Avro is a remote procedure call and data serialization framework developed within Apache’s Hadoop project (source: wikipedia). It is much the same as Apache Thrift and Google Protocol Buffers. Probably the main reason for Avro to gain in popularity is due to the fact that Hadoop-based big data platforms natively support serialization and deserialization of data in Avro format. Avro is based upon JSON based schemas and messages can be sent in both JSON and binary format. If binary is used then the schema is sent together with the actual data,

Playing with Apache Avro from the command line

So first let’s create a Avro schema “location.avsc” for the data records

{"namespace": "nl.rubix.avro",
  "type": "record",
  "name": "Location",
  "fields": [
    {"name": "vehicle_id", "type": "string", "doc" : "id of the vehicle"},
    {"name": "timestamp", "type": "long", "doc" : "time in seconds"},
    {"name": "latitude", "type": "double"},
    {"name": "longtitude",  "type": "double"}
  ],
  "doc:" : "A schema for vehicle movement events"
}

And we have this example file “location1.json” with a valid data record

{"vehicle_id": "1", "timestamp": 1476005672, "latitude": 51.687402, "longtitude": 5.307759}

Working with the Avro tools

Download the latest version of the Avro tools (currently 1.8.1) from the Avro Releases page.

$ java -jar avro-tools-1.8.1.jar
Version 1.8.1 of Apache Avro
Copyright 2010-2015 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
----------------
Available tools:
          cat extracts samples from files
      compile Generates Java code for the given schema.
       concat Concatenates avro files without re-compressing.
   fragtojson Renders a binary-encoded Avro datum as JSON.
     fromjson Reads JSON records and writes an Avro data file.
     fromtext Imports a text file into an avro data file.
      getmeta Prints out the metadata of an Avro data file.
    getschema Prints out schema of an Avro data file.
          idl Generates a JSON schema from an Avro IDL file
 idl2schemata Extract JSON schemata of the types from an Avro IDL file
       induce Induce schema/protocol from Java class/interface via reflection.
   jsontofrag Renders a JSON-encoded Avro datum as binary.
       random Creates a file with randomly generated instances of a schema.
      recodec Alters the codec of a data file.
       repair Recovers data from a corrupt Avro Data file
  rpcprotocol Output the protocol of a RPC service
   rpcreceive Opens an RPC Server and listens for one message.
      rpcsend Sends a single RPC message.
       tether Run a tethered mapreduce job.
       tojson Dumps an Avro data file as JSON, record per line or pretty.
       totext Converts an Avro data file to a text file.
     totrevni Converts an Avro data file to a Trevni file.
  trevni_meta Dumps a Trevni file's metadata as JSON.
trevni_random Create a Trevni file filled with random instances of a schema.
trevni_tojson Dumps a Trevni file as JSON.

Generate data record from JSON to Avro

$ java -jar avro-tools-1.8.1.jar fromjson --schema-file location.avsc location1.json > location.avro

The result is an output “location.avro” with the Avro binary. The interesting about Avro is that is encapsulates both the schema and the content in it’s binary message.

Objavro.schema#####ype":"record","name":"Location","namespace":"nl.rubix.avro","fields":[{"name":"vehicle_id","type":"string","doc":"id of the vehicle"},{"name":"timestamp","type":"long","doc":"time in seconds"},{"name":"latitude","type":"double"},{"name":"longtitude","type":"double"}],"doc:":"A schema for vehicle movement events"}avro.codenull~##5############.1м##

Retrieving the JSON message from Avro data

$ java -jar avro-tools-1.8.1.jar tojson location.avro > location_output.json

Retrieving the Avro schema from Avro data

And because the schema is present in the data we can retrieve the schema as well.

$ java -jar avro-tools-1.8.1.jar getschema location.avro > location_output.avsc

References:

 
1 Comment

Posted by on 18-10-2016 in Uncategorized

 

Tags: , , , , , ,