Saturday, February 25, 2017

Apache Kafka: Multiple ways for Produce or Push Message to Kafka topics

Today, I am going to describe what are the various ways in Apache kafka, for put the messages into topics. Apache Kafka have supports for several languages and also provide api's for Java, one of the reason is, Java is the primary language of JVM and most of the JVM based languages have full support for using Java libraries easily.

Kafka have a concept of topics, partitions etc. which you can explore from Apache kafka documentation or Confluent documentation. For put messages in kafka queue, kafka supports serialization and various formats for messages. Some of the formats kafka provides by default, but for kafka recommended format is Apache Avro. Avro is a lightweight and type safe format for serialized data. For more, you can explore apache avro.

Kafka have a concept of Producer/Consumer. Producer produce the data to queue and Consumer consume the data from queue. Today we are creating various Kafka Producers for produce data in kafka topic.

Prerequisite:

  • Install JDK 8.
  • Download Apache Kafka.
  • Download Zookeeper.
  • Download Confluent Kafka Kit. 
  • IDE
  • Build Tool ( We are using SBT) 

I. Simple Producer: 

First, we are creating kafka simple producer for producing messages to the kafka topic using java. 

public class SimpleProducer {

    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }


    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: "+ recordMetaData.offset());
            System.out.println("Topic: "+ recordMetaData.topic());
            System.out.println("Partition: "+ recordMetaData.partition());
            System.out.println("Timestamp: "+ recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException {
        ProducerRecord record1 = new ProducerRecord<>("CustomerCountry",
        "Record 1", "Japan1"
        );

        ProducerRecord record2 = new ProducerRecord<>("CustomerCountry",
                "Record 2", "Punjab1"
        );

        fireAndForget(record1);
        asyncSend(record2);

        Thread.sleep(10000);
    }
}

In this example, we are just produce the data into queue with Kafka Default serialization `StringSerializer`.

II. Apache Avro Serialization Generic Format: 

For using Apache Avro, we need to create schema for our messages, because that schema help us for deserialize messages with type safety. For using avro, there are various build tools plugins, provide's us for generate our POJO classes from avro schema file. For good practices, we must use that tools, because some time our messages may too complex and for manually, we are always going with a mistake.

NOTE: For using avro, we need to start confluent registry server, because that registry server is used to manage messages schema's and our messages are managed by kafka queues. For more details, please visit documentation.

For SBT, I am using sbt-avro plugin. Its depends on you, which build tool you are using or you can write manually also. 
Like we discuss, for avro we need to create schema first like in my example i have following schema: 

{"namespace": "com.harmeetsingh13.java",
    "type": "record",
    "name": "Customer",
    "fields": [{"name": "id","type": "int"},
            {"name": "name","type": "string"}]
}

By using sbt-avro plugin, my pojo class is generated automatically. Now, we are creating our Kafka Producer by using Avro.

public class AvroSpecificProducerOne {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer1
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer2
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

In this example, the one thing we need to note about is that, for serializing our message key, we are still using Kafka  `StringSerializer` class, because when we deserialize our string value using Avro `KafkaAvroDeserializer` we are facing this issue:

Error deserializing Avro message for id 351 org.apache.kafka.common.errors.SerializationException: 
Error deserializing Avro message for id 351 Caused org.apache.kafka.common.errors.SerializationException: 
string specified by the writers schema could not be instantiated to find the readers schema

For more details, you look into this discussion. 

As we are discuss, There are various ways for  serialize messages to kafka queue using avro. I the above example we are using Generic way for message serialization but we can serialize messages with more specific way also. Which we are discuss in our next examples.

III: Apache Avro Serialization Specific Format One: 

Another avro example is for serialize messages with one specific way as below:

public class AvroSpecificProducerOne {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer1
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerOneTopic",
                "KeyOne", customer2
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

IV: Apache Avro Serialization Specific Format Two: 

Another way for serialize messages using avro as below: 

public class AvroSpecificProducerTwo {

    private static Properties kafkaProps = new Properties();
    private static KafkaProducer kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: " + recordMetaData.offset());
            System.out.println("Topic: " + recordMetaData.topic());
            System.out.println("Partition: " + recordMetaData.partition());
            System.out.println("Timestamp: " + recordMetaData.timestamp());
        });
    }

    private static byte[] convertCustomerToAvroBytes(Customer customer) throws IOException {
        Parser parser = new Parser();
        Schema schema = parser.parse(AvroSpecificProducerOne.class
                .getClassLoader().getResourceAsStream("avro/customer.avsc"));

        SpecificDatumWriter writer = new SpecificDatumWriter<>(schema);
        try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
            writer.write(customer, encoder);
            encoder.flush();

            return os.toByteArray();
        }
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1001, "Jimmy");
        Customer customer2 = new Customer(1002, "James");


        byte[] customer1AvroBytes = convertCustomerToAvroBytes(customer1);
        byte[] customer2AvroBytes = convertCustomerToAvroBytes(customer2);

        ProducerRecord record1 = new ProducerRecord<>("AvroSpecificProducerTwoTopic",
                "KeyOne", customer1AvroBytes
        );
        ProducerRecord record2 = new ProducerRecord<>("AvroSpecificProducerTwoTopic",
                "KeyOne", customer2AvroBytes
        );

        asyncSend(record1);
        asyncSend(record2);

        Thread.sleep(1000);
    }
}

There are still multiple ways for serialize messages using avro or other message serializer for kafka. In the next post, we will discuss kafka consumers and various way for consume messages from kafka queue using avro.

For above examples, you can download code from github repo also. 

No comments:

Post a Comment