apache-kafka
Aangepaste serializer / deserializer
Zoeken…
Invoering
Kafka bewaart en vervoert byte-arrays in de wachtrij. De (de) serializers zijn verantwoordelijk voor de vertaling tussen de byte-array van Kafka en POJO's.
Syntaxis
- public void configure (Map <String,?> config, boolean isKey);
- public T deserialize (Stringonderwerp, byte [] bytes);
- public byte [] serialize (String-onderwerp, T obj);
parameters
| parameters | gegevens |
|---|---|
| config | de configuratie-eigenschappen ( Properties ) die bij het aanmaken aan de Producer of Consumer doorgegeven als een kaart. Het bevat reguliere kafka-configs, maar kan ook worden uitgebreid met een door de gebruiker gedefinieerde configuratie. Het is de beste manier om argumenten door te geven aan de (de) serializer. |
| is essentieel | aangepaste (de) serializers kunnen worden gebruikt voor sleutels en / of waarden. Deze parameter vertelt u welke van de twee deze instantie zal behandelen. |
| onderwerp | het onderwerp van het huidige bericht. Hiermee kunt u aangepaste logica definiëren op basis van het onderwerp bron / bestemming. |
| bytes | De onbewerkte boodschap om te deserialiseren |
| obj | Het bericht om te serialiseren. De werkelijke klasse hangt af van uw serializer. |
Opmerkingen
Vóór versie 0.9.0.0 gebruikte de Kafka Java API Encoders en Decoders . Ze zijn vervangen door Serializer en Deserializer in de nieuwe API.
Gson (de) serializer
In dit voorbeeld wordt de gson- bibliotheek gebruikt om java-objecten toe te wijzen aan json-strings. De (de) serializers zijn generiek, maar dat hoeven ze niet altijd te zijn!
serializer
Code
public class GsonSerializer<T> implements Serializer<T> {
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
// this is called right after construction
// use it for initialisation
}
@Override
public byte[] serialize(String s, T t) {
return gson.toJson(t).getBytes();
}
@Override
public void close() {
// this is called right before destruction
}
}
Gebruik
Serializers worden gedefinieerd door de vereiste eigenschappen van de key.serializer en value.serializer producenten.
Stel dat we een POJO-klasse met de naam SensorValue en dat we berichten willen produceren zonder sleutel (sleutels ingesteld op null ):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ...
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());
Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ...
producer.close();
( key.serializer is een vereiste configuratie. Omdat we geen berichtsleutels specificeren, houden we de StringSerializer geleverd met kafka, die null aankan).
deserializer
Code
public class GsonDeserializer<T> implements Deserializer<T> {
public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
private Class<T> cls;
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
String clsName = String.valueOf(config.get(configKey));
try {
cls = (Class<T>) Class.forName(clsName);
} catch (ClassNotFoundException e) {
System.err.printf("Failed to configure GsonDeserializer. " +
"Did you forget to specify the '%s' property ?%n",
configKey);
}
}
@Override
public T deserialize(String topic, byte[] bytes) {
return (T) gson.fromJson(new String(bytes), cls);
}
@Override
public void close() {}
}
Gebruik
Deserializers worden gedefinieerd via de vereiste consumenteigenschappen key.deserializer en value.deserializer .
Stel dat we een POJO-klasse met de naam SensorValue en dat we berichten willen produceren zonder sleutel (sleutels ingesteld op null ):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ...
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());
try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
// ... consume messages ...
}
Hier voegen we een aangepaste eigenschap toe aan de consumentenconfiguratie, namelijk CONFIG_VALUE_CLASS . De GsonDeserializer zal het gebruiken in de methode configure() om te bepalen met welke POJO-klasse het moet omgaan (alle eigenschappen die aan props worden toegevoegd, worden doorgegeven aan de configure in de vorm van een kaart).