Buscar..


Introducción

Kafka almacena y transporta matrices de bytes en su cola. Los (des) serializadores son responsables de traducir entre la matriz de bytes proporcionada por Kafka y POJOs.

Sintaxis

  • public void configure (Map <String,?> config, boolean isKey);
  • deserializar T público (String topic, byte [] bytes);
  • byte público [] serialize (String topic, T obj);

Parámetros

parámetros detalles
configuración las propiedades de configuración ( Properties ) pasadas al Producer o al Consumer momento de la creación, como un mapa. Contiene configuraciones kafka regulares, pero también puede aumentarse con la configuración definida por el usuario. Es la mejor manera de pasar argumentos al (des) serializador.
es clave Los (des) serializadores personalizados pueden utilizarse para claves y / o valores. Este parámetro le dice con cuál de los dos tratará esta instancia.
tema El tema del mensaje actual. Esto le permite definir una lógica personalizada basada en el tema de origen / destino.
bytes El mensaje en bruto para deserializar.
obj El mensaje a serializar. Su clase real depende de su serializador.

Observaciones

Antes de la versión 0.9.0.0, la API de Kafka Java utilizaba Encoders y Decoders . Han sido reemplazados por Serializer y Deserializer en la nueva API.

Gson (de) serializador

Este ejemplo utiliza la biblioteca gson para asignar objetos java a cadenas json. Los (des) serializadores son genéricos, ¡pero no siempre tienen que serlo!

Serializador

Código

public class GsonSerializer<T> implements Serializer<T> {

    private Gson gson = new GsonBuilder().create();
    
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // this is called right after construction
        // use it for initialisation
    }
    
    @Override
    public byte[] serialize(String s, T t) {
        return gson.toJson(t).getBytes();
    }
    
    @Override
    public void close() {
        // this is called right before destruction
    }
}

Uso

Los serializadores se definen a través de las key.serializer requeridas key.serializer y value.serializer productor.

Supongamos que tenemos una clase POJO llamada SensorValue y que queremos generar mensajes sin ninguna clave (claves configuradas en null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ... 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());

Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ... 
producer.close();

( key.serializer es una configuración requerida. Como no especificamos las claves de los mensajes, mantenemos el StringSerializer entrega con kafka, que puede manejar el null ).


deserializador

Código

public class GsonDeserializer<T> implements Deserializer<T> {

    public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
    private Class<T> cls;

    private Gson gson = new GsonBuilder().create();


    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));

        try {
            cls = (Class<T>) Class.forName(clsName);
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                    "Did you forget to specify the '%s' property ?%n",
                    configKey);
        }
    }


    @Override
    public T deserialize(String topic, byte[] bytes) {
        return (T) gson.fromJson(new String(bytes), cls);
    }


    @Override
    public void close() {}
}

Uso

Los deserializadores se definen a través de las key.deserializer requeridas de consumidor key.deserializer y value.deserializer .

Supongamos que tenemos una clase POJO llamada SensorValue y que queremos generar mensajes sin ninguna clave (claves configuradas en null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ... 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());

try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
    // ... consume messages ... 
}

Aquí, agregamos una propiedad personalizada a la configuración del consumidor, a saber, CONFIG_VALUE_CLASS . El GsonDeserializer va a usar en el configure() método para determinar qué clase POJO que debe manejar (todas las propiedades añadidas a props se pasará a la configure método en la forma de un mapa).



Modified text is an extract of the original Stack Overflow Documentation
Licenciado bajo CC BY-SA 3.0
No afiliado a Stack Overflow