apache-kafka
Serializzatore / deserializzatore personalizzato
Ricerca…
introduzione
Kafka memorizza e trasporta gli array di byte nella sua coda. I (de) serializzatori sono responsabili della traduzione tra l'array di byte fornito da Kafka e POJO.
Sintassi
- public void configure (Map <String,?> config, boolean isKey);
- public T deserialize (argomento String, byte [] byte);
- byte pubblico [] serialize (argomento String, T obj);
Parametri
| parametri | dettagli |
|---|---|
| config | le proprietà di configurazione ( Properties ) passate al Producer o al Consumer momento della creazione, come una mappa. Contiene configurazioni kafka regolari, ma può anche essere ampliato con la configurazione definita dall'utente. È il modo migliore per passare argomenti al (de) serializzatore. |
| isKey | i serializzatori personalizzati (de) possono essere utilizzati per chiavi e / o valori. Questo parametro indica quale dei due questa istanza tratterà. |
| argomento | l'argomento del messaggio corrente. Ciò consente di definire la logica personalizzata in base all'argomento sorgente / destinazione. |
| byte | Il messaggio grezzo da deserializzare |
| obj | Il messaggio da serializzare. La sua classe effettiva dipende dal tuo serializzatore. |
Osservazioni
Prima della versione 0.9.0.0 l'API Java di Kafka utilizzava Encoders and Decoders . Sono stati sostituiti da Serializer e Deserializer nella nuova API.
Gson (de) serializzatore
Questo esempio usa la libreria gson per mappare oggetti java su stringhe json. I (de) serializzatori sono generici, ma non sempre devono essere!
Serializer
Codice
public class GsonSerializer<T> implements Serializer<T> {
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
// this is called right after construction
// use it for initialisation
}
@Override
public byte[] serialize(String s, T t) {
return gson.toJson(t).getBytes();
}
@Override
public void close() {
// this is called right before destruction
}
}
uso
I serializzatori vengono definiti tramite le proprietà del produttore key.serializer e value.serializer richieste.
Supponiamo di avere una classe POJO denominata SensorValue e che vogliamo produrre messaggi senza alcuna chiave (le chiavi sono impostate su null ):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ...
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());
Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ...
producer.close();
( key.serializer è una configurazione richiesta. Poiché non specifichiamo le chiavi dei messaggi, manteniamo StringSerializer fornito con StringSerializer , che è in grado di gestire null ).
deserializzatore
Codice
public class GsonDeserializer<T> implements Deserializer<T> {
public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
private Class<T> cls;
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
String clsName = String.valueOf(config.get(configKey));
try {
cls = (Class<T>) Class.forName(clsName);
} catch (ClassNotFoundException e) {
System.err.printf("Failed to configure GsonDeserializer. " +
"Did you forget to specify the '%s' property ?%n",
configKey);
}
}
@Override
public T deserialize(String topic, byte[] bytes) {
return (T) gson.fromJson(new String(bytes), cls);
}
@Override
public void close() {}
}
uso
I deserializzatori sono definiti attraverso le proprietà necessarie di key.deserializer e value.deserializer .
Supponiamo di avere una classe POJO denominata SensorValue e che vogliamo produrre messaggi senza alcuna chiave (le chiavi sono impostate su null ):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ...
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());
try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
// ... consume messages ...
}
Qui, aggiungiamo una proprietà personalizzata alla configurazione del consumatore, ovvero CONFIG_VALUE_CLASS . GsonDeserializer lo utilizzerà nel metodo configure() per determinare quale classe POJO deve gestire (tutte le proprietà aggiunte agli props saranno passate al metodo configure sotto forma di una mappa).