Zoeken…


Invoering

Kafka bewaart en vervoert byte-arrays in de wachtrij. De (de) serializers zijn verantwoordelijk voor de vertaling tussen de byte-array van Kafka en POJO's.

Syntaxis

  • public void configure (Map <String,?> config, boolean isKey);
  • public T deserialize (Stringonderwerp, byte [] bytes);
  • public byte [] serialize (String-onderwerp, T obj);

parameters

parameters gegevens
config de configuratie-eigenschappen ( Properties ) die bij het aanmaken aan de Producer of Consumer doorgegeven als een kaart. Het bevat reguliere kafka-configs, maar kan ook worden uitgebreid met een door de gebruiker gedefinieerde configuratie. Het is de beste manier om argumenten door te geven aan de (de) serializer.
is essentieel aangepaste (de) serializers kunnen worden gebruikt voor sleutels en / of waarden. Deze parameter vertelt u welke van de twee deze instantie zal behandelen.
onderwerp het onderwerp van het huidige bericht. Hiermee kunt u aangepaste logica definiëren op basis van het onderwerp bron / bestemming.
bytes De onbewerkte boodschap om te deserialiseren
obj Het bericht om te serialiseren. De werkelijke klasse hangt af van uw serializer.

Opmerkingen

Vóór versie 0.9.0.0 gebruikte de Kafka Java API Encoders en Decoders . Ze zijn vervangen door Serializer en Deserializer in de nieuwe API.

Gson (de) serializer

In dit voorbeeld wordt de gson- bibliotheek gebruikt om java-objecten toe te wijzen aan json-strings. De (de) serializers zijn generiek, maar dat hoeven ze niet altijd te zijn!

serializer

Code

public class GsonSerializer<T> implements Serializer<T> {

    private Gson gson = new GsonBuilder().create();
    
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // this is called right after construction
        // use it for initialisation
    }
    
    @Override
    public byte[] serialize(String s, T t) {
        return gson.toJson(t).getBytes();
    }
    
    @Override
    public void close() {
        // this is called right before destruction
    }
}

Gebruik

Serializers worden gedefinieerd door de vereiste eigenschappen van de key.serializer en value.serializer producenten.

Stel dat we een POJO-klasse met de naam SensorValue en dat we berichten willen produceren zonder sleutel (sleutels ingesteld op null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ... 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());

Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ... 
producer.close();

( key.serializer is een vereiste configuratie. Omdat we geen berichtsleutels specificeren, houden we de StringSerializer geleverd met kafka, die null aankan).


deserializer

Code

public class GsonDeserializer<T> implements Deserializer<T> {

    public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
    private Class<T> cls;

    private Gson gson = new GsonBuilder().create();


    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));

        try {
            cls = (Class<T>) Class.forName(clsName);
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                    "Did you forget to specify the '%s' property ?%n",
                    configKey);
        }
    }


    @Override
    public T deserialize(String topic, byte[] bytes) {
        return (T) gson.fromJson(new String(bytes), cls);
    }


    @Override
    public void close() {}
}

Gebruik

Deserializers worden gedefinieerd via de vereiste consumenteigenschappen key.deserializer en value.deserializer .

Stel dat we een POJO-klasse met de naam SensorValue en dat we berichten willen produceren zonder sleutel (sleutels ingesteld op null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ... 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());

try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
    // ... consume messages ... 
}

Hier voegen we een aangepaste eigenschap toe aan de consumentenconfiguratie, namelijk CONFIG_VALUE_CLASS . De GsonDeserializer zal het gebruiken in de methode configure() om te bepalen met welke POJO-klasse het moet omgaan (alle eigenschappen die aan props worden toegevoegd, worden doorgegeven aan de configure in de vorm van een kaart).



Modified text is an extract of the original Stack Overflow Documentation
Licentie onder CC BY-SA 3.0
Niet aangesloten bij Stack Overflow