apache-kafka
Sérialiseur / Désérialiseur personnalisé
Recherche…
Introduction
Kafka stocke et transporte des tableaux d'octets dans sa file d'attente. Les sérialiseurs (de) sont responsables de la traduction entre le tableau d'octets fourni par Kafka et les POJO.
Syntaxe
- public void configure (Map <String,?> config, boolean isKey);
- public T deserialize (Sujet de chaîne, octet [] octets);
- octet public [] sérialiser (sujet de chaîne, T obj);
Paramètres
| paramètres | détails |
|---|---|
| config | les propriétés de configuration ( Properties ) transmises au Producer ou au Consumer lors de la création, sous forme de carte. Il contient des configurations standard de kafka, mais peut également être complété par une configuration définie par l'utilisateur. C'est le meilleur moyen de transmettre des arguments au sérialiseur (de). |
| C est la clé | Les sérialiseurs (de) personnalisés peuvent être utilisés pour les clés et / ou les valeurs. Ce paramètre vous indique lequel des deux cette instance va traiter. |
| sujet | le sujet du message actuel. Cela vous permet de définir une logique personnalisée en fonction du sujet source / destination. |
| octets | Le message brut à désérialiser |
| obj | Le message à sérialiser. Sa classe réelle dépend de votre sérialiseur. |
Remarques
Avant la version 0.9.0.0, l'API Java Kafka utilisait les Encoders et les Decoders . Ils ont été remplacés par Serializer et Deserializer dans la nouvelle API.
Sérialiseur Gson (de)
Cet exemple utilise la bibliothèque gson pour mapper des objets Java sur des chaînes json. Les sérialiseurs (de) sont génériques, mais ils n'ont pas toujours besoin d'être!
Sérialiseur
Code
public class GsonSerializer<T> implements Serializer<T> {
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
// this is called right after construction
// use it for initialisation
}
@Override
public byte[] serialize(String s, T t) {
return gson.toJson(t).getBytes();
}
@Override
public void close() {
// this is called right before destruction
}
}
Usage
Les sérialiseurs sont définis via les propriétés de production key.serializer et value.serializer .
Supposons que nous ayons une classe POJO nommée SensorValue et que nous voulons produire des messages sans aucune clé (les clés sont définies sur null ):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ...
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());
Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ...
producer.close();
( key.serializer est une configuration requise. Comme nous ne key.serializer pas les clés de message, nous conservons le StringSerializer livré avec kafka, qui peut gérer les StringSerializer null ).
désérialiseur
Code
public class GsonDeserializer<T> implements Deserializer<T> {
public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
private Class<T> cls;
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
String clsName = String.valueOf(config.get(configKey));
try {
cls = (Class<T>) Class.forName(clsName);
} catch (ClassNotFoundException e) {
System.err.printf("Failed to configure GsonDeserializer. " +
"Did you forget to specify the '%s' property ?%n",
configKey);
}
}
@Override
public T deserialize(String topic, byte[] bytes) {
return (T) gson.fromJson(new String(bytes), cls);
}
@Override
public void close() {}
}
Usage
Les désérialiseurs sont définis via les propriétés de consommateur key.deserializer et value.deserializer .
Supposons que nous ayons une classe POJO nommée SensorValue et que nous voulons produire des messages sans aucune clé (les clés sont définies sur null ):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ...
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());
try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
// ... consume messages ...
}
Ici, nous ajoutons une propriété personnalisée à la configuration du consommateur, à savoir CONFIG_VALUE_CLASS . GsonDeserializer l'utilisera dans la méthode configure() pour déterminer la classe POJO à gérer (toutes les propriétés ajoutées aux props seront transmises à la méthode configure sous la forme d'une carte).