Szukaj…


Wprowadzenie

Kafka przechowuje i transportuje tablice bajtów w kolejce. (De) serializatory są odpowiedzialne za translację między tablicą bajtów dostarczaną przez Kafkę i POJO.

Składnia

  • public void config (Map <String,?> config, boolean isKey);
  • public T deserialize (ciąg znaków, bajty [] bajty);
  • public byte [] serialize (Temat ciąg, T obj);

Parametry

parametry Detale
config właściwości konfiguracyjne ( Properties ) przekazane Producer lub Consumer podczas tworzenia, jako mapa. Zawiera regularne konfiguracje kafka, ale można go również rozszerzyć o konfigurację zdefiniowaną przez użytkownika. Jest to najlepszy sposób przekazywania argumentów do (de) serializatora.
to klucz Niestandardowe (de) serializatory mogą być używane dla kluczy i / lub wartości. Ten parametr informuje, z którym z dwóch przypadków wystąpi to wystąpienie.
temat temat bieżącej wiadomości. Umożliwia to zdefiniowanie niestandardowej logiki na podstawie źródła / miejsca docelowego.
bajty Surowy komunikat do deserializacji
obj Wiadomość do serializacji. Rzeczywista klasa zależy od twojego serializatora.

Uwagi

Przed wersją 0.9.0.0 Kafki Java API wykorzystywane Encoders i Decoders . Zostały one zastąpione przez Serializer i Deserializer w nowym API.

Serializator Gson (de)

W tym przykładzie użyto biblioteki gson do mapowania obiektów Java na ciągi Json . (De) serializatory są ogólne, ale nie zawsze muszą być!

Serializer

Kod

public class GsonSerializer<T> implements Serializer<T> {

    private Gson gson = new GsonBuilder().create();
    
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // this is called right after construction
        // use it for initialisation
    }
    
    @Override
    public byte[] serialize(String s, T t) {
        return gson.toJson(t).getBytes();
    }
    
    @Override
    public void close() {
        // this is called right before destruction
    }
}

Stosowanie

Serializatory są definiowane za pomocą wymaganych właściwości producenta key.serializer i value.serializer .

Załóżmy, że mamy klasę POJO o nazwie SensorValue i że chcemy tworzyć wiadomości bez żadnego klucza (klucze ustawione na null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ... 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());

Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ... 
producer.close();

( key.serializer jest wymaganą konfiguracją. Ponieważ nie określamy kluczy komunikatów, dostarczamy StringSerializer wraz z StringSerializer , która jest w stanie obsłużyć null ).


deserializer

Kod

public class GsonDeserializer<T> implements Deserializer<T> {

    public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
    private Class<T> cls;

    private Gson gson = new GsonBuilder().create();


    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));

        try {
            cls = (Class<T>) Class.forName(clsName);
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                    "Did you forget to specify the '%s' property ?%n",
                    configKey);
        }
    }


    @Override
    public T deserialize(String topic, byte[] bytes) {
        return (T) gson.fromJson(new String(bytes), cls);
    }


    @Override
    public void close() {}
}

Stosowanie

Deserializatory są definiowane za pomocą wymaganych właściwości konsumenta key.deserializer i value.deserializer .

Załóżmy, że mamy klasę POJO o nazwie SensorValue i że chcemy tworzyć wiadomości bez żadnego klucza (klucze ustawione na null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ... 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());

try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
    // ... consume messages ... 
}

Tutaj dodajemy niestandardową właściwość do konfiguracji konsumenta, a mianowicie CONFIG_VALUE_CLASS . GsonDeserializer użyje go w metodzie configure() , aby określić, jaką klasę POJO powinien obsługiwać (wszystkie właściwości dodane do props zostaną przekazane do metody configure w postaci mapy).



Modified text is an extract of the original Stack Overflow Documentation
Licencjonowany na podstawie CC BY-SA 3.0
Nie związany z Stack Overflow