Поиск…


Вступление

Kafka хранит и передает байтовые массивы в свою очередь. Сериализаторы (de) отвечают за перевод между массивом байтов, предоставленным Kafka и POJO.

Синтаксис

  • public void configure (Map <String,?> config, boolean isKey);
  • public T deserialize (String topic, byte [] bytes);
  • public byte [] serialize (String topic, T obj);

параметры

параметры подробности
конфиг свойства конфигурации ( Properties ) переданы Producer или Consumer при создании в качестве карты. Он содержит регулярные конфиги kafka, но также может быть дополнен пользовательской конфигурацией. Это лучший способ передать аргументы в сериализатор (de).
IsKey Пользовательские (де) сериализаторы могут использоваться для ключей и / или значений. Этот параметр указывает, с какими из двух экземпляров будет рассмотрен этот экземпляр.
тема тема текущего сообщения. Это позволяет определить пользовательскую логику на основе темы источника / назначения.
байтов Необработанное сообщение для десериализации
OBJ Сообщение для сериализации. Его фактический класс зависит от вашего сериализатора.

замечания

Перед версией 0.9.0.0 API Java Kafka использовал Encoders и Decoders . Они были заменены Serializer и Deserializer в новом API.

Серийный анализатор Gson (de)

В этом примере используется gson- библиотека для сопоставления объектов java с json-строками. Сериализаторы (de) являются универсальными, но они не всегда должны быть!

Serializer

Код

public class GsonSerializer<T> implements Serializer<T> {

    private Gson gson = new GsonBuilder().create();
    
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // this is called right after construction
        // use it for initialisation
    }
    
    @Override
    public byte[] serialize(String s, T t) {
        return gson.toJson(t).getBytes();
    }
    
    @Override
    public void close() {
        // this is called right before destruction
    }
}

использование

Сериализаторы определяются с помощью необходимых key.serializer и value.serializer .

Предположим, что у нас есть класс POJO с именем SensorValue и что мы хотим создавать сообщения без какого-либо ключа (ключи установлены в null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ... 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());

Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ... 
producer.close();

( key.serializer - требуемая конфигурация. Поскольку мы не указываем ключи сообщения, мы сохраняем StringSerializer поставляемый с kafka, который способен обрабатывать null ).


десериализатор

Код

public class GsonDeserializer<T> implements Deserializer<T> {

    public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
    private Class<T> cls;

    private Gson gson = new GsonBuilder().create();


    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));

        try {
            cls = (Class<T>) Class.forName(clsName);
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                    "Did you forget to specify the '%s' property ?%n",
                    configKey);
        }
    }


    @Override
    public T deserialize(String topic, byte[] bytes) {
        return (T) gson.fromJson(new String(bytes), cls);
    }


    @Override
    public void close() {}
}

использование

Десериализаторы определяются с помощью необходимых key.deserializer value.deserializer key.deserializer и value.deserializer .

Предположим, что у нас есть класс POJO с именем SensorValue и что мы хотим создавать сообщения без какого-либо ключа (ключи установлены в null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ... 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());

try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
    // ... consume messages ... 
}

Здесь мы добавляем настраиваемое свойство к потребительской конфигурации, а именно CONFIG_VALUE_CLASS . GsonDeserializer будет использовать его в методе configure() чтобы определить, какой класс POJO он должен обрабатывать (все свойства, добавленные в props будут переданы методу configure в форме карты).



Modified text is an extract of the original Stack Overflow Documentation
Лицензировано согласно CC BY-SA 3.0
Не связан с Stack Overflow