apache-kafka
Benutzerdefinierter Serializer / Deserializer
Suche…
Einführung
Kafka speichert und transportiert Byte-Arrays in seiner Warteschlange. Die (De) Serializer sind für die Übersetzung zwischen dem von Kafka und POJO bereitgestellten Byte-Array verantwortlich.
Syntax
- public void configure (Map <String,?> config, boolean isKey);
- public T deserialize (String-Thema, Byte [] Bytes);
- public byte [] serialize (String-Thema, T obj);
Parameter
| Parameter | Einzelheiten |
|---|---|
| Konfig | die Konfigurationseigenschaften ( Properties ), die bei der Erstellung als Producer oder Producer an den Producer oder Consumer werden Es enthält reguläre Kafka-Konfigurationen, kann jedoch auch mit benutzerdefinierten Konfigurationen erweitert werden. Dies ist der beste Weg, um Argumente an den (De) Serializer zu übergeben. |
| isKey | Benutzerdefinierte (De) Serialisierer können für Schlüssel und / oder Werte verwendet werden. Dieser Parameter gibt an, mit welcher der beiden Instanzen diese Instanz umgehen wird. |
| Thema | das Thema der aktuellen Nachricht. Auf diese Weise können Sie eine benutzerdefinierte Logik basierend auf dem Quell- / Zielthema definieren. |
| Bytes | Die rohe Botschaft zur Deserialisierung |
| obj | Die Nachricht, die serialisiert werden soll. Die tatsächliche Klasse hängt von Ihrem Serializer ab. |
Bemerkungen
Vor der Version 0.9.0.0 verwendete die Kafka Java API Encoders und Decoders . Sie wurden in der neuen API durch Serializer und Deserializer ersetzt.
Gson (de) Serialisierer
In diesem Beispiel wird die Gson- Bibliothek verwendet, um Java-Objekte Json-Strings zuzuordnen . Die (De) Serialisierer sind generisch, müssen aber nicht immer!
Serializer
Code
public class GsonSerializer<T> implements Serializer<T> {
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
// this is called right after construction
// use it for initialisation
}
@Override
public byte[] serialize(String s, T t) {
return gson.toJson(t).getBytes();
}
@Override
public void close() {
// this is called right before destruction
}
}
Verwendungszweck
Serialisierer werden über die erforderlichen key.serializer und value.serializer definiert.
Angenommen, wir haben eine POJO-Klasse mit dem Namen SensorValue und möchten, dass Nachrichten ohne Schlüssel erzeugt werden (Schlüssel sind auf null ):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ...
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());
Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ...
producer.close();
( key.serializer ist eine erforderliche Konfiguration. Da wir keine Meldungsschlüssel angeben, wird der StringSerializer mit kafka ausgeliefert, der mit null umgehen kann).
Deserialisierer
Code
public class GsonDeserializer<T> implements Deserializer<T> {
public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
private Class<T> cls;
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
String clsName = String.valueOf(config.get(configKey));
try {
cls = (Class<T>) Class.forName(clsName);
} catch (ClassNotFoundException e) {
System.err.printf("Failed to configure GsonDeserializer. " +
"Did you forget to specify the '%s' property ?%n",
configKey);
}
}
@Override
public T deserialize(String topic, byte[] bytes) {
return (T) gson.fromJson(new String(bytes), cls);
}
@Override
public void close() {}
}
Verwendungszweck
Deserialisierer werden durch die erforderlichen Konsumenteneigenschaften key.deserializer und value.deserializer definiert.
Angenommen, wir haben eine POJO-Klasse mit dem Namen SensorValue und möchten, dass Nachrichten ohne Schlüssel erzeugt werden (Schlüssel sind auf null ):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ...
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());
try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
// ... consume messages ...
}
Hier fügen wir der Consumer-Konfiguration eine benutzerdefinierte Eigenschaft hinzu, nämlich CONFIG_VALUE_CLASS . Der GsonDeserializer verwendet sie in der configure() Methode, um zu bestimmen, welche POJO-Klasse behandelt werden soll (alle Eigenschaften, die zu props hinzugefügt werden, werden in Form einer Map an die configure Methode übergeben).