Advertisements
RSS

Tag Archives: Java

Publishing Apache Avro messages on a Apache Kafka topic

In earlier posts I played around with both Apache Avro and Apache Kafka. The next goal was naturally to combine both and start publishing binary Apache Avro data on a Apache Kafka topic.

screen-shot-2016-09-11-at-3-28-49-pm

Generating Java from the Avro schema

I use the  Avro schema “location.avsc” from my earlier post.

$ java -jar avro-tools-1.8.1.jar compile schema location.avsc .

Which results in the Location.java for our project.

/**
* Autogenerated by Avro
*
* DO NOT EDIT DIRECTLY
*/
package nl.rubix.avro;

import org.apache.avro.specific.SpecificData;
// ... and more stuff

Make sure we have the maven dependencies right in our pom.xml:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.8.1</version>
    </dependency>
  <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.8.1</version>
    </dependency>
</dependencies>

We can now use the Location object in Java to build our binary Avro message

public ByteArrayOutputStream GenerateAvroStream() throws IOException
{
    // Schema
    String schemaDescription = Location.getClassSchema().toString();
    Schema s = Schema.parse(schemaDescription);
    System.out.println("Schema parsed: " + s);

    // Encode the data using JSON schema and embed the schema as metadata along with the data.
    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(s);
    DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(writer);
    dataFileWriter.create(s, outputStream);

    // Build AVRO message
    Location location = new Location();
    location.setVehicleId(new org.apache.avro.util.Utf8("VHC-001"));
    location.setTimestamp(System.currentTimeMillis() / 1000L);
    location.setLatitude(51.687402);
    location.setLongtitude(5.307759);
    System.out.println("Message location " + location.toString());

    dataFileWriter.append(location);
    dataFileWriter.close();
    System.out.println("Encode outputStream: " + outputStream);

    return outputStream;
}

When we have our byteArrayOutput stream we can start publishing it on a Apache Kafka topic.

public void ProduceKafkaByte()
{
    try
    {
        // Get the Apache AVRO message
        ByteArrayOutputStream data = GenerateAvroStream();
        System.out.println("Here comes the data: " + data);

        // Start KAFKA publishing
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props);
        ProducerRecord<String, byte[]> producerRecord = null;
        producerRecord = new ProducerRecord<String, byte[]>("test","1",data.toByteArray());
        messageProducer.send(producerRecord);
        messageProducer.close();
    }
    catch(IOException ex)
    {
        System.out.println ("Well this error happened: " + ex.toString());
    }
}

When we subscribe on our topic we can see the bytestream cruising by:

INFO Processed session termination for sessionid: 0x157d8bec7530002 (org.apache.zookeeper.server.PrepRequestProcessor)
Objavro.schema#####ype":"record","name":"Location","namespace":"nl.rubix.avro","fields":[{"name":"vehicle_id","type":"string","doc":"id of the vehicle"},{"name":"timestamp","type":"long","doc":"time in seconds"},{"name":"latitude","type":"double"},{"name":"longtitude","type":"double"}],"doc:":"A schema for vehicle movement events"}##<##O#P#######HC-001#ڲ#
=######@#####;@##<##O#P#######016-10-18 19:06:24,005] INFO Expiring session 0x157d8bec7530005, timeout of 30000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
[2016-10-18 19:06:24,005] INFO Processed session termination for sessionid: 0x157d8bec7530005 (org.apache.zookeeper.server.PrepRequestProcessor)

All code available in github here.
github

Advertisements
 
Leave a comment

Posted by on 19-10-2016 in Uncategorized

 

Tags: , , , , , , ,

Getting started with Apache Kafka

Apache Kafka is a publish-subscribe messaging solution rethought as a distributed commit log.

kafka-logo-wide

The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means site activity (page views, searches, or other actions users may take) is published to central topics with one topic per activity type. These feeds are available for subscription for a range of use cases including real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting.

Some use cases for Kafka are stream processing, event sourcing, metrics and all other (large sets of) data that go from publisher to 1-n subscriber(s). A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients making it a very efficient (and also easy to scale) high volume messaging solution.

So actually Kafka is a good alternative for any more traditional (JMS / MQ) message broker. Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc). In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications. And this all, is free.

Getting Started

The Kafka website has an excellent quickstart tutorial here. Download the latest version here and work through the tutorial to send and receive your first messages from console.

Playing around with Java

First we create a test topic.

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic testIteration
Created topic “testIteration”.

The earlier versions of Kafka came with default serializer but that created lot of confusion. With 0.8.2, you would need to pick a serializer yourself from StringSerializer or ByteArraySerializer that comes with API or build your own. Since both our key and value in the example will be a string, we use the StringSerializer.

Use the following Apache Kafka library as a Maven dependency (pom.xml).

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
</dependency></dependencies>

The following lines of code produces / publishes 10 messages on the Kafka Topic.


public void ProduceIteration()
{
int amountMessages = 10; // 10 is enough for the demo

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(props);

for(int i = 1; i <= amountMessages; i++)
{
ProducerRecord<String, String> data = new ProducerRecord<String, String>("testIteration", Integer.toString(i), Integer.<em>toString</em>(i));
System.out.println ("Publish message " + Integer.toString(i) + " - " + data);
producer.send(data);
}

producer.close();
}

The messages can be received from the topic:

jvzoggel$ bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic testIteration –property print.key=true –property print.timestamp=true

CreateTime:1474354960268        1       1
CreateTime:1474354960284        2       2
CreateTime:1474354960284        3       3
CreateTime:1474354960285        4       4
CreateTime:1474354960285        5       5
CreateTime:1474354960285        6       6
CreateTime:1474354960285        7       7
CreateTime:1474354960285        8       8
CreateTime:1474354960285        9       9
CreateTime:1474354960285        10      10

 

 
3 Comments

Posted by on 20-09-2016 in Uncategorized

 

Tags: , , ,

How to Configure WebLogic Server to Send a Notification When Its Configuration is Changed

My former collegue, Java maven (no not that one, this one) and friend Pierluigi contacted me about my post regarding the Weblogic Security Audit Provider. As always Pier is very political correct ;)

Comment

To my positive surprise he found a great solution for the limitation of the security audit provider. He discovered a way to configure WebLogic server to send a notification when it’s configuration is changed [Knowledge Base ID 1377733.1].

Which is awesome if you have a large Oracle environment and maintenance team and want to keep track of all the changes. Wish we knew this last year at the huge envuironment I was working then.

His blogpost contains all the code, scripts, etc so go and check it out!!! :)

And to end with his favourite quote:

Failure is not an option

References:

 
Leave a comment

Posted by on 21-02-2013 in Java, Oracle, Weblogic, WLST

 

Tags: , , , ,

Oracle ADF custom Validator for BSN check

In the Netherlands, all people of ages 14 and up receive a Burgerservicenummer (BSN) (Citizen’s Service Number). It is printed on driving licenses, passports and international ID cards, under the header Personal Number. The number is unique and may not contain any information about the person to whom it is assigned. (no information such as gender or date of birth, etc). (source: Wikipedia)

The BSN consists of 9 digits and uses an algorithm often called “the 11-check”. This 11-check algorithm works like this:

Let's say we perform the 11-check on BSN 123456782
1st digit = 1, 9 * 1 = 9
2nd digit = 2, 8 * 2 = 16
3rd digit = 3, 7 * 3 = 21
4th digit = 4, 6 * 4 = 24
5th digit = 5, 5 * 5 = 25
6th digit = 6, 4 * 6 = 24
7th digit = 7, 3 * 7 = 21
8th digit = 8, 2 * 8 = 16
9th digit = 2, -1 * 2 = -2 (last digit is not added but subtracted)

total: 154

Because 154 can be divided by 11 we can assume 123456782 is valid (154/11=14, no rest 0)

So let’s start with creating a custom Validator in our ADF project. This is excellent documented by Mohammed Jabr on his blog.

Create a Java class that implements the javax.faces.validator.Validator

class

Use the following code:


package nl.rubix;

import javax.faces.application.FacesMessage;
import javax.faces.component.UIComponent;
import javax.faces.context.FacesContext;
import javax.faces.validator.Validator;
import javax.faces.validator.ValidatorException;

public class BsnValidator implements Validator
{
 public BsnValidator()
 {
 super();
 }

public void validate(FacesContext facesContext, UIComponent uIComponent,
 Object object) throws ValidatorException
 {
 String BSN = object.toString();
 if (BSN.length()==9)
 {
 int checksum = 0;
 for(int i=0;i<8;i++)
 {
 checksum = checksum + (Character.digit(BSN.charAt(i),10) * (9-i));
 }
 checksum = checksum - Character.digit(BSN.charAt(8),10);
 // System.out.println("checksum total = " + checksum);
 // check Modulus for checksum
 if ( (checksum % 11 ) != 0)
 {
 System.out.println("checksum / 11 is niet 0");
 FacesMessage fm = new FacesMessage("BSN fails 11-check");
 throw new ValidatorException(fm);
 }
 else
 {
 // System.out.println("checksum works !!!");
 }
 }
 else
 {
 FacesMessage fm = new FacesMessage("BSN must be 9 digits");
 throw new ValidatorException(fm);
 }
 }
}

register the custom validator in your WEB-INF/faces-config.xml file

facesConfig

Configure the inputText to use the custom validator.

buttonForm

Go to the properties of your inputText and make sure you use the correct validatorID:


<af:inputText label="BSN:" id="it1">
 <f:validator validatorId="bsnvalidator"/>
 </af:inputText>

And on runtime this is the result (in Dutch I know):
Result

References:

 
Leave a comment

Posted by on 10-01-2013 in ADF, Oracle

 

Tags: , ,

Get value from inputtext in Oracle ADF

No rockit science this blogpost for the ADF gurus out there, but since I’m the new kid in town regarding Oracle ADF I decided to note down some of the stuff I found very usefull.

In our Oracle BPM project we generate our human task screens, however the customer wants them heavy modifed and at one time we needed the value of a inputtext component which had no binding.


FacesContext facesContext = FacesContext.getCurrentInstance();
 UIViewRoot root = facesContext.getViewRoot();
 RichInputText inputText = (RichInputText)root.findComponent("it1");
 String myString = inputText.getValue().toString();

Where value “it1” is the ID of your inputText component

Reference and all credits:

 
1 Comment

Posted by on 19-12-2012 in Oracle

 

Tags: , ,

Oracle Service Bus duplicate message check using Coherence

In a situation where you need some sort of duplicate message check for an Oracle Service Bus project you would need some custom code. Since the Oracle Service Bus is stateless, when it handles a proxy service  call it will not know if this specific message was handled before. So there needs to be some sort of logic in your service for validating it’s a new unique message id.

Giving the fact that every message on our ESB has an unique messageID element in the SOAP header we could store this on disk, database or in memory. With the help of Oracle Coherence this last option, in memory, is relatively simple.

remark: However these changes mentioned above (either disk, database or coherence) makes your stateless OSB a bit more statefull. Be carefull as developer or architect when you encounter requirements like these. The OSB wasn’t designed as a fast and stateless message handling ESB for nothing. We use this mechanism in a process flow with low message load where an old mainframe system with a custom (old) adapter occasionally (and unwanted) triggers duplicate messages. So in our case memory load is not a huge issue and we can proceed with our default monitoring tools).

Ok warnings there, so now let’s check our example custom SOAP header:


<tns:RubixHeader xmlns:tns="http://www.rubix.nl/header">
 <CorrelationId>AF1204DD-17D3-28A3-09A2-0888F2FFC123</CorrelationId>
 <MessageId>3F2504E0-4F89-11D3-9A0C-0305E82C3301</MessageId>
 <MessageType/>
 <Timestamp>2012-08-18T12:10:00</Timestamp>
 <Source/>
</tns:RubixHeader>

Every SOAP message through our OSB contains such a custom SOAP header and every messageID is generated as GUID and can be considered unique. So then a few lines of Java using the Coherence lib:

/**
 *
 */
package nl.rubix.coherence;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import weblogic.logging.LoggingHelper;
import java.util.logging.Logger;
// import com.tangosol.util.ConverterCollections.ConverterCollection;

/**
 * @author JvZoggel
 *
 */
public class OsbCoherenceHelper
{

 public OsbCoherenceHelper()
 {
 }

public static boolean CheckDuplicate(String messageid, String datetime)
 {
 NamedCache myCache = CacheFactory.getCache("mycache");
 boolean resultaat;
 Logger logger = LoggingHelper.getServerLogger();

 if (myCache.containsKey(messageid))
 {
 logger.warning(">> MessageID=" + messageid + " already exists with value=" + myCache.get(messageid));
 resultaat = true;
 }
 else
 {
 logger.warning(">> MessageID=" + messageid + " doesn't exists yet, now storing with value=" + datetime);
 myCache.put(messageid,datetime);
 resultaat = false;
 }
 return resultaat;
 }

public static String getValue(String messageid)
 {
 NamedCache myCache = CacheFactory.getCache("mycache");
 return myCache.get(messageid).toString();
 }

 public static String showValues()
 {
 NamedCache myCache = CacheFactory.getCache("mycache");
 return myCache.values().toString();
 }

 /**
 * @param args
 */
 public static void main(String[] args)
 {
 // CacheFactory.ensureCluster();
 // System.out.println(">>>> Cluster = " + CacheFactory.getCluster());
 // boolean x1 = CheckDuplicate("id0001","2001-10-26T21:00:00");
 // System.out.println("X1 = " + x1);
 // x1 = CheckDuplicate("id0001","2001-10-27T22:00:00");
 // System.out.println("X1 = " + x1);
 }
}

These few lines of code (ab)use the Coherence memory cache to store the XML message element MessageID as the Key and stores the element timestamp of the XML message as value. You could also store correlationID, or any other element, which might not be unique but this is not an issue for the Coherence values as long as the key is unique. The fact I use timestamp is to be able to create an overview of old messages (and maybe delete old keys with an additional Java method).

So let’s see how this looks like in the Oracle Service Bus:

Input for the callout are these 2 elements from our custom soap header. The result boolean variable is checked in the IF-THEN action. If TRUE then the proxy will throw an error because the message is a duplicate already passed earlier on.

Firing the service request to both managed servers results in the following logging:

Managed Server 1:

####<Aug 18 ..> <Warning> <> <server1.local> <rbx_osb_dev_01> <[ACTIVE] ExecuteThread: '2' for queue: 'weblogic.kernel.Default (self-tuning)'> <<anonymous>> <> <..> <..> <BEA-000000> <>> MessageID=3D2504E0-4F89-11D3-9A0C-0305E82C3301 doesn't exists yet, now storing with value=2012-08-18T12:10:00>

and firing again to Managed Server 2:

####<Aug 18 ..> <Error> <> <server2.local> <rbx_osb_dev_02> <[ACTIVE] ExecuteThread: '6' for queue: 'weblogic.kernel.Default (self-tuning)'> <<anonymous>> <> <..> <..> <BEA-000000> <>> MessageID=3D2504E0-4F89-11D3-9A0C-0305E82C3301 already exists with value=2012-08-18T12:10:00>

Remind yourself that by default we are using the default Coherence caching strategy which means we store these cache entries in the default Weblogic JVM which hosts the Oracle Service Bus services, messages, transformations, jdbc connection & jms modules. To protect your JVM you can use an external coherence server which is explained by William Markito Oliveira in his blog on OTN.

I really love the fact how Oracle integrated Coherence into OSB by extending the Business Service capabilities to use result caching (see blog from Mark Smith if you don’t know this feature). The code used in this blogpost is not rocket science and I hope to see features like this out-of-the-box in future upgrades of the Oracle Service Bus.

References:

 
6 Comments

Posted by on 20-08-2012 in Coherence, Java, Oracle, OSB

 

Tags: , , ,

JavaZone 2012

The JavaZone 2012 conference released this brilliant short movie called “The Java Heist”.
Looking at the amount of curse words, it’s obviously an European conference. ;-)

It’s all ones and zeros nowadays and that’s what we’re gonna be stealing here …

Enjoy!

 
Leave a comment

Posted by on 25-06-2012 in Java

 

Tags: ,