Ricerca…


introduzione

Kafka memorizza e trasporta gli array di byte nella sua coda. I (de) serializzatori sono responsabili della traduzione tra l'array di byte fornito da Kafka e POJO.

Sintassi

  • public void configure (Map <String,?> config, boolean isKey);
  • public T deserialize (argomento String, byte [] byte);
  • byte pubblico [] serialize (argomento String, T obj);

Parametri

parametri dettagli
config le proprietà di configurazione ( Properties ) passate al Producer o al Consumer momento della creazione, come una mappa. Contiene configurazioni kafka regolari, ma può anche essere ampliato con la configurazione definita dall'utente. È il modo migliore per passare argomenti al (de) serializzatore.
isKey i serializzatori personalizzati (de) possono essere utilizzati per chiavi e / o valori. Questo parametro indica quale dei due questa istanza tratterà.
argomento l'argomento del messaggio corrente. Ciò consente di definire la logica personalizzata in base all'argomento sorgente / destinazione.
byte Il messaggio grezzo da deserializzare
obj Il messaggio da serializzare. La sua classe effettiva dipende dal tuo serializzatore.

Osservazioni

Prima della versione 0.9.0.0 l'API Java di Kafka utilizzava Encoders and Decoders . Sono stati sostituiti da Serializer e Deserializer nella nuova API.

Gson (de) serializzatore

Questo esempio usa la libreria gson per mappare oggetti java su stringhe json. I (de) serializzatori sono generici, ma non sempre devono essere!

Serializer

Codice

public class GsonSerializer<T> implements Serializer<T> {

    private Gson gson = new GsonBuilder().create();
    
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // this is called right after construction
        // use it for initialisation
    }
    
    @Override
    public byte[] serialize(String s, T t) {
        return gson.toJson(t).getBytes();
    }
    
    @Override
    public void close() {
        // this is called right before destruction
    }
}

uso

I serializzatori vengono definiti tramite le proprietà del produttore key.serializer e value.serializer richieste.

Supponiamo di avere una classe POJO denominata SensorValue e che vogliamo produrre messaggi senza alcuna chiave (le chiavi sono impostate su null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ... 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());

Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ... 
producer.close();

( key.serializer è una configurazione richiesta. Poiché non specifichiamo le chiavi dei messaggi, manteniamo StringSerializer fornito con StringSerializer , che è in grado di gestire null ).


deserializzatore

Codice

public class GsonDeserializer<T> implements Deserializer<T> {

    public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
    private Class<T> cls;

    private Gson gson = new GsonBuilder().create();


    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));

        try {
            cls = (Class<T>) Class.forName(clsName);
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                    "Did you forget to specify the '%s' property ?%n",
                    configKey);
        }
    }


    @Override
    public T deserialize(String topic, byte[] bytes) {
        return (T) gson.fromJson(new String(bytes), cls);
    }


    @Override
    public void close() {}
}

uso

I deserializzatori sono definiti attraverso le proprietà necessarie di key.deserializer e value.deserializer .

Supponiamo di avere una classe POJO denominata SensorValue e che vogliamo produrre messaggi senza alcuna chiave (le chiavi sono impostate su null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ... 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());

try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
    // ... consume messages ... 
}

Qui, aggiungiamo una proprietà personalizzata alla configurazione del consumatore, ovvero CONFIG_VALUE_CLASS . GsonDeserializer lo utilizzerà nel metodo configure() per determinare quale classe POJO deve gestire (tutte le proprietà aggiunte agli props saranno passate al metodo configure sotto forma di una mappa).



Modified text is an extract of the original Stack Overflow Documentation
Autorizzato sotto CC BY-SA 3.0
Non affiliato con Stack Overflow