Publishing Apache Avro messages on a Apache Kafka topic

In earlier posts I played around with both Apache Avro and Apache Kafka. The next goal was naturally to combine both and start publishing binary Apache Avro data on a Apache Kafka topic.


Generating Java from the Avro schema

I use the  Avro schema “location.avsc” from my earlier post.

$ java -jar avro-tools-1.8.1.jar compile schema location.avsc .

Which results in the for our project.

* Autogenerated by Avro
package nl.rubix.avro;

import org.apache.avro.specific.SpecificData;
// ... and more stuff

Make sure we have the maven dependencies right in our pom.xml:


We can now use the Location object in Java to build our binary Avro message

public ByteArrayOutputStream GenerateAvroStream() throws IOException
    // Schema
    String schemaDescription = Location.getClassSchema().toString();
    Schema s = Schema.parse(schemaDescription);
    System.out.println("Schema parsed: " + s);

    // Encode the data using JSON schema and embed the schema as metadata along with the data.
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(s);
    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer);
    dataFileWriter.create(s, outputStream);

    // Build AVRO message
    Location location = new Location();
    location.setVehicleId(new org.apache.avro.util.Utf8("VHC-001"));
    location.setTimestamp(System.currentTimeMillis() / 1000L);
    System.out.println("Message location " + location.toString());

    System.out.println("Encode outputStream: " + outputStream);

    return outputStream;

When we have our byteArrayOutput stream we can start publishing it on a Apache Kafka topic.

public void ProduceKafkaByte()
        // Get the Apache AVRO message
        ByteArrayOutputStream data = GenerateAvroStream();
        System.out.println("Here comes the data: " + data);

        // Start KAFKA publishing
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props);
        ProducerRecord<String, byte[]> producerRecord = null;
        producerRecord = new ProducerRecord<String, byte[]>("test","1",data.toByteArray());
    catch(IOException ex)
        System.out.println ("Well this error happened: " + ex.toString());

When we subscribe on our topic we can see the bytestream cruising by:

INFO Processed session termination for sessionid: 0x157d8bec7530002 (org.apache.zookeeper.server.PrepRequestProcessor)
Objavro.schema#####ype":"record","name":"Location","namespace":"nl.rubix.avro","fields":[{"name":"vehicle_id","type":"string","doc":"id of the vehicle"},{"name":"timestamp","type":"long","doc":"time in seconds"},{"name":"latitude","type":"double"},{"name":"longtitude","type":"double"}],"doc:":"A schema for vehicle movement events"}##<##O#P#######HC-001#ڲ#
=######@#####;@##<##O#P#######016-10-18 19:06:24,005] INFO Expiring session 0x157d8bec7530005, timeout of 30000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2016-10-18 19:06:24,005] INFO Processed session termination for sessionid: 0x157d8bec7530005 (org.apache.zookeeper.server.PrepRequestProcessor)

All code available in github here.

Leave a comment

Posted by on 19-10-2016 in Uncategorized


Tags: , , , , , , ,

Playing around with Apache Avro

When entering the world of Apache Kafka, Apache Spark and data streams, sooner or later you will find mentioning of another Apache project; namely Apache AVRO. So ….

What is Apache Avro ?


Avro is a remote procedure call and data serialization framework developed within Apache’s Hadoop project (source: wikipedia). It is much the same as Apache Thrift and Google Protocol Buffers. Probably the main reason for Avro to gain in popularity is due to the fact that Hadoop-based big data platforms natively support serialization and deserialization of data in Avro format. Avro is based upon JSON based schemas and messages can be sent in both JSON and binary format. If binary is used then the schema is sent together with the actual data,

Playing with Apache Avro from the command line

So first let’s create a Avro schema “location.avsc” for the data records

{"namespace": "nl.rubix.avro",
  "type": "record",
  "name": "Location",
  "fields": [
    {"name": "vehicle_id", "type": "string", "doc" : "id of the vehicle"},
    {"name": "timestamp", "type": "long", "doc" : "time in seconds"},
    {"name": "latitude", "type": "double"},
    {"name": "longtitude",  "type": "double"}
  "doc:" : "A schema for vehicle movement events"

And we have this example file “location1.json” with a valid data record

{"vehicle_id": "1", "timestamp": 1476005672, "latitude": 51.687402, "longtitude": 5.307759}

Working with the Avro tools

Download the latest version of the Avro tools (currently 1.8.1) from the Avro Releases page.

$ java -jar avro-tools-1.8.1.jar
Version 1.8.1 of Apache Avro
Copyright 2010-2015 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (
Available tools:
          cat extracts samples from files
      compile Generates Java code for the given schema.
       concat Concatenates avro files without re-compressing.
   fragtojson Renders a binary-encoded Avro datum as JSON.
     fromjson Reads JSON records and writes an Avro data file.
     fromtext Imports a text file into an avro data file.
      getmeta Prints out the metadata of an Avro data file.
    getschema Prints out schema of an Avro data file.
          idl Generates a JSON schema from an Avro IDL file
 idl2schemata Extract JSON schemata of the types from an Avro IDL file
       induce Induce schema/protocol from Java class/interface via reflection.
   jsontofrag Renders a JSON-encoded Avro datum as binary.
       random Creates a file with randomly generated instances of a schema.
      recodec Alters the codec of a data file.
       repair Recovers data from a corrupt Avro Data file
  rpcprotocol Output the protocol of a RPC service
   rpcreceive Opens an RPC Server and listens for one message.
      rpcsend Sends a single RPC message.
       tether Run a tethered mapreduce job.
       tojson Dumps an Avro data file as JSON, record per line or pretty.
       totext Converts an Avro data file to a text file.
     totrevni Converts an Avro data file to a Trevni file.
  trevni_meta Dumps a Trevni file's metadata as JSON.
trevni_random Create a Trevni file filled with random instances of a schema.
trevni_tojson Dumps a Trevni file as JSON.

Generate data record from JSON to Avro

$ java -jar avro-tools-1.8.1.jar fromjson --schema-file location.avsc location1.json > location.avro

The result is an output “location.avro” with the Avro binary. The interesting about Avro is that is encapsulates both the schema and the content in it’s binary message.

Objavro.schema#####ype":"record","name":"Location","namespace":"nl.rubix.avro","fields":[{"name":"vehicle_id","type":"string","doc":"id of the vehicle"},{"name":"timestamp","type":"long","doc":"time in seconds"},{"name":"latitude","type":"double"},{"name":"longtitude","type":"double"}],"doc:":"A schema for vehicle movement events"}avro.codenull~##5############.1м##

Retrieving the JSON message from Avro data

$ java -jar avro-tools-1.8.1.jar tojson location.avro > location_output.json

Retrieving the Avro schema from Avro data

And because the schema is present in the data we can retrieve the schema as well.

$ java -jar avro-tools-1.8.1.jar getschema location.avro > location_output.avsc


1 Comment

Posted by on 18-10-2016 in Uncategorized


Tags: , , , , , ,

Using the Oracle Database to store and present XML data

Because we investigated the possibilities to store our (old) BPM Human Task data outside the SOAINFRA database (for archive, metrics and search queries on short history) we looked into a few possibilities in a spike / PoC. Because the Task data is actually structured XML data of which we do not know yet what future needs would require, the most safe solution was to store the complete XML document in a datastore.

Luckily the Oracle database has the option to store XML data and use views to represent the data “the old fashion way”. So this design in high-level looks like this.


First we create a table:


Then insert a HumanTask (task) XML element into the table.
To make sure we don’t get any errors like:

  • “ORA-31011: XML parsing failed”
  • “SQL Error: ORA-01704: string literal too long; Cause: The string literal is longer than 4000 characters.”

we declare a variable to hold the XML string before we update/insert it.

Declare vXmlStr xmltype:=xmltype('&lt;task&gt;&lt;title&gt;My Task&lt;/title&gt;&lt;payload&gt;&lt;CaseNumber&gt;Case-1&lt;/CaseNumber&gt;&lt;DocumentUrl&gt;http://mydocument&lt;/DocumentUrl&gt;&lt;DocumentNaam&gt;myDocument&lt;/DocumentNaam&gt;&lt;/payload&gt;&lt;taskDefinitionURI&gt;default/Process_1.0!1600.93239/htMyTask&lt;/taskDefinitionURI&gt;&lt;ownerRole&gt;MyCasus_1.0.Users&lt;/ownerRole&gt;&lt;priority&gt;3&lt;/priority&gt;&lt;identityContext&gt;;/identityContext&gt;&lt;systemAttributes&gt;&lt;xmlstuff&gt;much stuff&lt;/xmlstuff&gt;&lt;taskDefinitionName&gt;htMyTask&lt;/taskDefinitionName&gt;&lt;xmlstuff&gt;more stuff&lt;/xmlstuff&gt;&lt;/systemAttributes&gt;&lt;systemMessageAttributes&gt;&lt;numberAttribute1&gt;0.0&lt;/numberAttribute1&gt;&lt;/systemMessageAttributes&gt;&lt;sca&gt;&lt;applicationName&gt;default&lt;/applicationName&gt;&lt;xmlstuff&gt;more stuff&lt;/xmlstuff&gt;&lt;/sca&gt;&lt;/task&gt;');
  Update TEST_TAAK set PAYLOAD = vXmlStr where ID=1;


, TASKDEFINITIONNAME VARCHAR2(40) PATH 'systemAttributes/taskDefinitionName'

And the result, voila:




Leave a comment

Posted by on 22-09-2016 in Oracle


Tags: ,

Getting started with Apache Kafka

Apache Kafka is a publish-subscribe messaging solution rethought as a distributed commit log.


The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means site activity (page views, searches, or other actions users may take) is published to central topics with one topic per activity type. These feeds are available for subscription for a range of use cases including real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting.

Some use cases for Kafka are stream processing, event sourcing, metrics and all other (large sets of) data that go from publisher to 1-n subscriber(s). A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients making it a very efficient (and also easy to scale) high volume messaging solution.

So actually Kafka is a good alternative for any more traditional (JMS / MQ) message broker. Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc). In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications. And this all, is free.

Getting Started

The Kafka website has an excellent quickstart tutorial here. Download the latest version here and work through the tutorial to send and receive your first messages from console.

Playing around with Java

First we create a test topic.

bin/ –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic testIteration
Created topic “testIteration”.

The earlier versions of Kafka came with default serializer but that created lot of confusion. With 0.8.2, you would need to pick a serializer yourself from StringSerializer or ByteArraySerializer that comes with API or build your own. Since both our key and value in the example will be a string, we use the StringSerializer.

Use the following Apache Kafka library as a Maven dependency (pom.xml).


The following lines of code produces / publishes 10 messages on the Kafka Topic.

public void ProduceIteration()
int amountMessages = 10; // 10 is enough for the demo

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(props);

for(int i = 1; i <= amountMessages; i++)
ProducerRecord<String, String> data = new ProducerRecord<String, String>("testIteration", Integer.toString(i), Integer.<em>toString</em>(i));
System.out.println ("Publish message " + Integer.toString(i) + " - " + data);


The messages can be received from the topic:

jvzoggel$ bin/ –zookeeper localhost:2181 –topic testIteration –property print.key=true –property print.timestamp=true

CreateTime:1474354960268        1       1
CreateTime:1474354960284        2       2
CreateTime:1474354960284        3       3
CreateTime:1474354960285        4       4
CreateTime:1474354960285        5       5
CreateTime:1474354960285        6       6
CreateTime:1474354960285        7       7
CreateTime:1474354960285        8       8
CreateTime:1474354960285        9       9
CreateTime:1474354960285        10      10



Posted by on 20-09-2016 in Uncategorized


Tags: , , ,

What about CMMN (Case Management Model and Notation) ?

When we started our Case Management adventure the Case Management Model and Notation (CMMN) standard was still relatively new and unknown. (May 2014 the official 1.0 release was released). Both our design tool (Enterprise Architect) and implementation software (Oracle  Adaptive Case Management) did not support the CMMN notation so we created our own “way-of-modelling”.

In our teams that means that the process analysts/designers use powerful tools like SparxSystem Enterprise Architect (EA) with its integrated BPM(N) and SOA support. However manually adding text documents describing the relation the specific process or task has in the whole case. You would think that the CMMN notation could easily be integrated in one of the leading architecture and design tools out there. There are some niche products out there that see a market for CMMN modelling like Trisotech, however SparxSystem seems to have no plans at all.

What about the CMMN adoption by software vendors

It seems that only Camunda and IBM adopted CMMN in their business process management offering. Camunda even supports the beta CMMN 1.1 definition. The other (major) BPM vendors seem to hold back. If a small company like Camunda can do it, you would expect the other large vendors to be able to adopt the standard fairly easy as well.

I assume that the move to cloud, trends like big data and mobile (including new challenges like better API management) has been the primary focus for most software vendors. A big distraction away from extending the core functionality of their current BPM/CM offering. A shame, especially since Gartner states that currently case management software is a 8 billion dollar untapped market. Adoption of standards and especially gaining market share should sound interesting.

So these questions pop up and make me wonder:

  • What is the future of CMMN ?
  • Will companies like Oracle, Appian, TIBCO and PEGA eventually adopt CMMN in their on-premise BPM offering?
  • And with the Gartner prediction in mind and the unstoppable move to cloud, can we expect any Case Management cloud based solution in the near future ? And if so, will it support CMMN ?

I guess only time will tell.


1 Comment

Posted by on 08-09-2016 in Uncategorized


Tags: ,

Case management for the knowledge worker era

Companies have been using workflow and content management for many, many years. In IT we got used to the term Business Process Management (BPM) whenever we spoke of automated process execution. Not so strange since the term BPM was adopted by all the major software vendors and has received a lot of attention as “the” way to achieve process automation and integration.

There are however many different definitions of BPM, where some are technical and others are business oriented. But if we look at the Gartner definition:

Business process management (BPM) is a discipline that uses various methods to discover, model, analyze, measure, improve, and optimize business processes.  A business process coordinates the behavior of people, systems, information, and things to produce business outcomes in support of a business strategy. Processes can be structured and repeatable or unstructured and variable. Though not required, technologies are often used with BPM. BPM is key to align IT/OT investments to business strategy

 An interesting (and correct) remark by Gartner that technology is not required for BPM, however the general idea (in IT) has been that BPM equals process automation. The business process is summarized to a complete definition until the level that it can be executed as a program. The efficiency gains of automating the business process much exceed the up-front effort to fully identify it and make it executable. An effort which can initially be very large where the cost is only repaid over many instances of the process. So this approach will only work on highly predictable which is highly repeatable. It’s in some way identical to a factory production line where a huge amount of products justifies the initial up-front factory costs.


The Tesla factory production line

Using this approach to discover, implement, and execute complex and dynamic processes will be less economically beneficial. We assume that the less complex a process is, the more likely a lower initial investment is required. And the more often the process will be executed, the greater the financial gain and/or business case.

When we talk about low complexity high volume processes, we can also identify this as so called “routine work”. Routine work is well known and can be planned to a high level of detail. The process uses a general pattern and is done in a very similar way every time. Because routine work is so predictable and repeatable it has been an ideal candidate for successful BPM implementations. Simply due to the fact that the return of investment for routine work is more likely and easily gained.

Agility and Adaptability

Back in 1958 James March & Herbert Simon wrote the management science classis Organizations [2] and concluded that an organization is successful not because it does everything in exactly the same way every time, but because it is flexible enough to adapt and respond to the changes around it. Looking at the present, with changing regulations, new laws, new markets and changing business models this statement seems more true than ever. We therefore can conclude that a rigidly specified organization, that has every process fully planned in detail, will find itself unable to respond to changes.

Maybe we might accept that in real life there is never work that is 100% predictable, nor is there work that is 100% unpredictable. So it seems that the approach to automation of more complex and dynamic business processes requires more flexibility.

In 1992 the legendary business management guru Peter Drucker explains that matching a company’s strengths to the changes that have already taken place produces, in effect, a plan of action. Competitive advantage comes not from steely corporate rigor, but from organizational agility and the adaptability of support systems. [4]

Unpredictable work

Scientific management as once defined by Frederick Winslow Taylor has been at the heart of business process automation to date. And we have seen how it works for predictable and repeatable processes since they are an ideal candidate for executable BPM. But not all work is predictable and many organizations hold processes which are even unpredictable. When we talk about actions and events in processes being unpredictable, we mean that the sequence of human acts is not known in advance, and the course of the process may vary greatly for every instance. We can easily think of examples like medical care, law enforcement and complex industrial and financial processes. 

Rise of the Knowledge Worker

Whenever we read up on the knowledge worker terminology we always come across one individual, namely management guru Peter F. Drucker. Drucker made the first reference to knowledge work in his 1959 book Landmarks of Tomorrow. He loosely defined a knowledge worker as “someone who knows more about his or her job than anyone else in the organization.” What is important is that Drucker already then understood the uniqueness of the job that a knowledge worker performs.


Peter F. Drucker 

Drucker predicted during the 90’s that that the most valuable asset of a 21st-century institution (business or non-business) will be its knowledge workers and their productivity [5]. He even wends further and claimed that the productivity of knowledge and knowledge workers is likely to become the decisive competitive factor in the world economy [6].

So even back then there was already an understanding by business management leaders that the most valuable assets during the industrial revolution were a company’s mass production equipment. Nowadays we can recognize high volume business processes executed in BPM as a form of mass production. Upfront investment which generates value overtime by repetitively producing the same expected result. So if we recognize common grounds between industrial mass production and BPM production work nowadays, what can we say about automation knowledge work in the present or near future? Keeping in mind that Davenport more recently (2005) expresses the importance of knowledge-worker productivity stating that “Within organizations, knowledge workers tend to be closely aligned with the organization’s growth prospects [7]. So with this in mind, how can we efficiently support these knowledge workers ?

Today’s jobs are more dynamic, more ad hoc and require more skills and interaction with other specialists. Besides that, the amount of information available and necessary for a single worker has increased dramatically. So knowledge work cannot be matched on a traditional rigid business process (where work is performed according to a detailed plan prepared in advance). Because as knowledge work proceeds, the sequence of activities depends on the situational information. This is the nature of knowledge work. As the available information or external influences change the process has to be dynamic. It is not simply a matter of plan and execute but a continuous dynamic set of activities in an adaptive flow of execution. And while rules like regulations, laws and organization standards will certainly constrain the actions of the knowledge worker no plan is ever final until the end is reached.

Case Management

 To digitally support the knowledge workers to dynamically act in their cases, we need some sort of case management solution. A way to dynamically start activities like processes and tasks throughout the case lifecycle. In such a way that the knowledge worker can act on the information available to determine which actions to take. The case management solution should therefore be “information” driven.


Information drives the case

Forrester’s [3] defines case management as a highly structured but collaborative, dynamic, and information-intensive process driven by outside events requiring incremental and progressive responses from the business domain handling the case.

(Unstructured) data like documents in an enterprise content management (ECM) system and processes in business process management (BPM) suites are clearly key requirements for any case management offering. But only processes and content are insufficient. By adding advanced business rules and clear business analytics in the mix, you get the 4 essential components of a case management solution.


The 4 essential components for case management

Advanced business rules allow for guidance to make sure certain actions are made available or withdrawn during the case lifecycle. And dashboards with case analytics allow for real-time insight on all running cases.

These 4 core components combined with good (knowledge) user experience and the ability to integrate with your service oriented landscape are basically the case management product you want. Add some policies management and even some social collaboration and you have a set of existing proven technologies, which together have a very interesting proposition for the modern knowledge worker needs.



  1. Keith D. Swenson; Mastering the Unpredictable: How Adaptive Case Management Will Revolutionize the Way That Knowledge Workers Get Things Done; 2010
  2. James G. March & Herbert A. Simon; Organizations; 1958
  3. Connie Moore, Craig Le Chair & Ralph Vitti (Forrester Research); Dynamic Case Management, an old idea catches new fire; 2009
  4. Peter F. Drucker; Planning for Uncertainty; 1992
  5. Peter F. Drucker; Knowledge Worker Productivity, The Biggest Challenge; 1999
  6. Peter F. Drucker; Rise of the Knowledge Worker; 1998
  7. Thomas H. Davenport; Process Management for Knowledge Work; 2005




Posted by on 16-08-2016 in Uncategorized


Tags: ,

Cleaning up your old excess Docker containers

Every docker run command creates a container on your system. When you exit the container (and it is no longer running) it is still there on disk and can be seen with

docker ps -a
ea520a35da1f jvzoggel/kafka "/bin/bash" 2 minutes ago Exited (0) 2 minutes ago kafka1
46067fe01dc4 jvzoggel/kafka "/bin/bash" 3 minutes ago Exited (0) 2 minutes ago kafka2
0b7bcb382e65 jvzoggel/kafka "/bin/bash" 3 minutes ago Exited (0) 3 minutes ago kafka3

The containers that are not running will not consume any system resources except disk space, but it is usually good to clean up after yourself so ..

Automatically clean-up after yourself

The Docker documentation describes how to automatically clean up the container and remove the file system when the container exits:
   –rm=false: Automatically remove the container when it exits (incompatible with -d)
The above shows that by default containers are not removed, so by adding –rm=true or just the short-hand –rm will do the trick:

docker run -i -t --rm jvzoggel/kafka /bin/bash

When you exit from the container it will be automatically removed from disc.

Manually clean-up your stuff

Another method (all credits: Guillaume J. Charmes) is the command:

docker rm `docker ps --no-trunc -aq`

which will remove all containers in a elegant way


Leave a comment

Posted by on 05-08-2016 in Uncategorized