1. 程式人生 > >Robust Message Serialization in Apache Kafka Using Apache Avro, Part 3

Robust Message Serialization in Apache Kafka Using Apache Avro, Part 3

Part 3: Configuring Clients

Earlier, we introduced Kafka Serializers and Deserializers that are capable of writing and reading Kafka records in Avro format. In this part we will going to see how to configure producers and consumers to use them.

Setting up a Kafka Topic for use as a Schema Store

KafkaTopicSchemaProvider

works with a Kafka topic as its persistent store. This topic will contain at most thousands of records: the schemas. It does not need multiple partitions, but it needs to be available even when one of the brokers is down. That’s why we configure it with replication factor of 3 and at least 2 in-sync replicas.

1234 $kafka-topics--create--topic __com_cloudera_schemaprovider\--partitions1--replication-factor3--config min.insync.replicas=2\--config retention.ms=-1--config retention.bytes=-1\--zookeeper$(hostname):2181

Of course, in a production environment, we would set up Apache Sentry rules to allow only certain principals to add schemas.

As a next step, let’s add a schema to this topic with the administration tool created in Part 2.

123 $kafkatopic-schematool--add--name user--version1\--schema-file./user_v1_1.avsc--servers$(hostname):9092\--topic __com_cloudera_schemaprovider

Example Schema

The first version of our schema is a simplistic record that captures some attributes of a user.

123456789 {"namespace":"com.cloudera.examples.avroserialization","type":"record","name":"User","fields":[{"name":"identifier","type":"string"},{"name":"display_name","type":"string"},{"name":"registration_time","type":"long"}]}

The later version defines a new field and changes some types. For convenience we set the record’s name to User2 in that schema so we can generate classes for both of them in the same project. But in a real life scenario, User2 would be a later version of the same class instead of a different class coexisting with User.

Configuring the Producer

We add some general producer config:

12345 Map<String,Object>producerProps=newHashMap<>();producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,...);producerProps.put(ProducerConfig.ACKS_CONFIG,"-1");producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,IntegerSerializer.class.getName());

then we need to configure our producer to use our Serializer:

12 producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,KafkaSpecificRecordSerializer.class.getName());

and configure the Serializer to serialize User objects:

12 producerProps.put(KafkaSpecificRecordSerializer.VALUE_RECORD_CLASSNAME,User.class.getName());

Next we configure the SchemaProvider to use Kafka for schema storage and set the topic name. We also have to set bootstrap servers. This allow us to use a different Kafka cluster as the Schema-provider backend than what we are producing to.

123456 producerProps.put(SchemaUtils.SCHEMA_PROVIDER_FACTORY_CONFIG,KafkaTopicSchemaProviderFactory.class.getName());producerProps.put(KafkaTopicSchemaProvider.SCHEMA_PROVIDER_CONF_PREFIX+CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,KAFKA_CLUSTER);producerProps.put(KafkaTopicSchemaProvider.SCHEMA_TOPIC_NAME_CONF,"__com_cloudera_schemaprovider");

We can now use this configuration to create a producer and produce User object to a Kafka topic.

123 Producer<Integer,User>producer=newKafkaProducer<>(producerProps);Useru=newUser("user3","User, Third",0L);producer.send(newProducerRecord<>(TOPIC,42,u)).get();

Configuring the consumer

We set up a consumer in a quite similar way. After some general configuration

12345 Map<String,Object>consumerProps=newHashMap<>();consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_CLUSTER);consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"user_reader_group");consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,IntegerDeserializer.class.getName());

we specify our Deserializer and the class it will read. We use a different version of the class: User2.

1234 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaSpecificRecordDeserializer.class.getName());consumerProps.put(KafkaSpecificRecordDeserializer.VALUE_RECORD_CLASSNAME,User2.class.getName());

Then we need to set up the Schema provider just like we did above.

12345 consumerProps.put(SchemaUtils.SCHEMA_PROVIDER_FACTORY_CONFIG,KafkaTopicSchemaProviderFactory.class.getName());consumerProps.put(KafkaTopicSchemaProvider.SCHEMA_TOPIC_NAME_CONF,SCHEMAPROVIDER_TOPIC_NAME);consumerProps.put(KafkaTopicSchemaProvider.SCHEMA_PROVIDER_CONF_PREFIX+CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,KAFKA_CLUSTER);

With this configuration, we can set up a consumer and start polling for records.

123456789 KafkaConsumer<Integer,User2>consumer=newKafkaConsumer<>(consumerProps);consumer.subscribe(Collections.singletonList(TOPIC));while(true){consumer.poll(1000).forEach(record->{User2u=record.value();System.out.println(u);});}

Conclusion

We have shown how Avro can be used in conjunction with Kafka to track evolving versions of schema over time.  In situations where the entire lifespan of data is managed by different groups, this technique provides a clean way to allow each such managing group to use schemas to manage the data evolution handoff.

The code for this blog post can be found in Cloudera’s kafka-examples Github repository.

Facebooktwittergoogle_pluslinkedinmailFacebooktwittergoogle_pluslinkedinmail