Suche…


Einführung

Kafka speichert und transportiert Byte-Arrays in seiner Warteschlange. Die (De) Serializer sind für die Übersetzung zwischen dem von Kafka und POJO bereitgestellten Byte-Array verantwortlich.

Syntax

  • public void configure (Map <String,?> config, boolean isKey);
  • public T deserialize (String-Thema, Byte [] Bytes);
  • public byte [] serialize (String-Thema, T obj);

Parameter

Parameter Einzelheiten
Konfig die Konfigurationseigenschaften ( Properties ), die bei der Erstellung als Producer oder Producer an den Producer oder Consumer werden Es enthält reguläre Kafka-Konfigurationen, kann jedoch auch mit benutzerdefinierten Konfigurationen erweitert werden. Dies ist der beste Weg, um Argumente an den (De) Serializer zu übergeben.
isKey Benutzerdefinierte (De) Serialisierer können für Schlüssel und / oder Werte verwendet werden. Dieser Parameter gibt an, mit welcher der beiden Instanzen diese Instanz umgehen wird.
Thema das Thema der aktuellen Nachricht. Auf diese Weise können Sie eine benutzerdefinierte Logik basierend auf dem Quell- / Zielthema definieren.
Bytes Die rohe Botschaft zur Deserialisierung
obj Die Nachricht, die serialisiert werden soll. Die tatsächliche Klasse hängt von Ihrem Serializer ab.

Bemerkungen

Vor der Version 0.9.0.0 verwendete die Kafka Java API Encoders und Decoders . Sie wurden in der neuen API durch Serializer und Deserializer ersetzt.

Gson (de) Serialisierer

In diesem Beispiel wird die Gson- Bibliothek verwendet, um Java-Objekte Json-Strings zuzuordnen . Die (De) Serialisierer sind generisch, müssen aber nicht immer!

Serializer

Code

public class GsonSerializer<T> implements Serializer<T> {

    private Gson gson = new GsonBuilder().create();
    
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // this is called right after construction
        // use it for initialisation
    }
    
    @Override
    public byte[] serialize(String s, T t) {
        return gson.toJson(t).getBytes();
    }
    
    @Override
    public void close() {
        // this is called right before destruction
    }
}

Verwendungszweck

Serialisierer werden über die erforderlichen key.serializer und value.serializer definiert.

Angenommen, wir haben eine POJO-Klasse mit dem Namen SensorValue und möchten, dass Nachrichten ohne Schlüssel erzeugt werden (Schlüssel sind auf null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ... 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());

Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ... 
producer.close();

( key.serializer ist eine erforderliche Konfiguration. Da wir keine Meldungsschlüssel angeben, wird der StringSerializer mit kafka ausgeliefert, der mit null umgehen kann).


Deserialisierer

Code

public class GsonDeserializer<T> implements Deserializer<T> {

    public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
    private Class<T> cls;

    private Gson gson = new GsonBuilder().create();


    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));

        try {
            cls = (Class<T>) Class.forName(clsName);
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                    "Did you forget to specify the '%s' property ?%n",
                    configKey);
        }
    }


    @Override
    public T deserialize(String topic, byte[] bytes) {
        return (T) gson.fromJson(new String(bytes), cls);
    }


    @Override
    public void close() {}
}

Verwendungszweck

Deserialisierer werden durch die erforderlichen Konsumenteneigenschaften key.deserializer und value.deserializer definiert.

Angenommen, wir haben eine POJO-Klasse mit dem Namen SensorValue und möchten, dass Nachrichten ohne Schlüssel erzeugt werden (Schlüssel sind auf null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ... 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());

try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
    // ... consume messages ... 
}

Hier fügen wir der Consumer-Konfiguration eine benutzerdefinierte Eigenschaft hinzu, nämlich CONFIG_VALUE_CLASS . Der GsonDeserializer verwendet sie in der configure() Methode, um zu bestimmen, welche POJO-Klasse behandelt werden soll (alle Eigenschaften, die zu props hinzugefügt werden, werden in Form einer Map an die configure Methode übergeben).



Modified text is an extract of the original Stack Overflow Documentation
Lizenziert unter CC BY-SA 3.0
Nicht angeschlossen an Stack Overflow