apache-kafka
Serializador / Deserializador personalizado
Buscar..
Introducción
Kafka almacena y transporta matrices de bytes en su cola. Los (des) serializadores son responsables de traducir entre la matriz de bytes proporcionada por Kafka y POJOs.
Sintaxis
- public void configure (Map <String,?> config, boolean isKey);
- deserializar T público (String topic, byte [] bytes);
- byte público [] serialize (String topic, T obj);
Parámetros
| parámetros | detalles |
|---|---|
| configuración | las propiedades de configuración ( Properties ) pasadas al Producer o al Consumer momento de la creación, como un mapa. Contiene configuraciones kafka regulares, pero también puede aumentarse con la configuración definida por el usuario. Es la mejor manera de pasar argumentos al (des) serializador. |
| es clave | Los (des) serializadores personalizados pueden utilizarse para claves y / o valores. Este parámetro le dice con cuál de los dos tratará esta instancia. |
| tema | El tema del mensaje actual. Esto le permite definir una lógica personalizada basada en el tema de origen / destino. |
| bytes | El mensaje en bruto para deserializar. |
| obj | El mensaje a serializar. Su clase real depende de su serializador. |
Observaciones
Antes de la versión 0.9.0.0, la API de Kafka Java utilizaba Encoders y Decoders . Han sido reemplazados por Serializer y Deserializer en la nueva API.
Gson (de) serializador
Este ejemplo utiliza la biblioteca gson para asignar objetos java a cadenas json. Los (des) serializadores son genéricos, ¡pero no siempre tienen que serlo!
Serializador
Código
public class GsonSerializer<T> implements Serializer<T> {
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
// this is called right after construction
// use it for initialisation
}
@Override
public byte[] serialize(String s, T t) {
return gson.toJson(t).getBytes();
}
@Override
public void close() {
// this is called right before destruction
}
}
Uso
Los serializadores se definen a través de las key.serializer requeridas key.serializer y value.serializer productor.
Supongamos que tenemos una clase POJO llamada SensorValue y que queremos generar mensajes sin ninguna clave (claves configuradas en null ):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ...
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());
Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ...
producer.close();
( key.serializer es una configuración requerida. Como no especificamos las claves de los mensajes, mantenemos el StringSerializer entrega con kafka, que puede manejar el null ).
deserializador
Código
public class GsonDeserializer<T> implements Deserializer<T> {
public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
private Class<T> cls;
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
String clsName = String.valueOf(config.get(configKey));
try {
cls = (Class<T>) Class.forName(clsName);
} catch (ClassNotFoundException e) {
System.err.printf("Failed to configure GsonDeserializer. " +
"Did you forget to specify the '%s' property ?%n",
configKey);
}
}
@Override
public T deserialize(String topic, byte[] bytes) {
return (T) gson.fromJson(new String(bytes), cls);
}
@Override
public void close() {}
}
Uso
Los deserializadores se definen a través de las key.deserializer requeridas de consumidor key.deserializer y value.deserializer .
Supongamos que tenemos una clase POJO llamada SensorValue y que queremos generar mensajes sin ninguna clave (claves configuradas en null ):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ...
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());
try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
// ... consume messages ...
}
Aquí, agregamos una propiedad personalizada a la configuración del consumidor, a saber, CONFIG_VALUE_CLASS . El GsonDeserializer va a usar en el configure() método para determinar qué clase POJO que debe manejar (todas las propiedades añadidas a props se pasará a la configure método en la forma de un mapa).