Sök…


Introduktion

Kafka lagrar och transporterar byte-matriser i sin kö. (De) serialiserarna ansvarar för att översätta mellan byteuppsättningen som tillhandahålls av Kafka och POJO: er.

Syntax

  • public void-konfigurering (Map <String,?> config, boolean isKey);
  • public T deserialize (String topic, byte [] byte);
  • public byte [] serialisera (Stringämne, T obj);

parametrar

parametrar detaljer
config konfigurationsegenskaperna ( Properties ) som skickas till Producer eller Consumer vid skapelsen, som en karta. Den innehåller vanliga kafka-konfigurationer, men kan också kompletteras med användardefinierad konfiguration. Det är det bästa sättet att överföra argument till (de) serialiseraren.
isKey anpassade (de) serialiserare kan användas för nycklar och / eller värden. Denna parameter berättar vilken av de två den här instansen kommer att hantera.
ämne ämnet för det aktuella meddelandet. Detta låter dig definiera anpassad logik baserat på källan / destinationsämnet.
bitgrupper Det råa meddelandet att deserialisera
obj Meddelandet som ska serialiseras. Dess faktiska klass beror på din serienummer.

Anmärkningar

Före version 0.9.0.0 använde Kafka Java API Encoders och Decoders . De har ersatts av Serializer och Deserializer i det nya API: et.

Gson (de) serialiserare

Detta exempel använder gson- biblioteket för att kartlägga java-objekt till json-strängar. (De) serialiserare är generiska, men de behöver inte alltid vara det!

serializer

Koda

public class GsonSerializer<T> implements Serializer<T> {

    private Gson gson = new GsonBuilder().create();
    
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // this is called right after construction
        // use it for initialisation
    }
    
    @Override
    public byte[] serialize(String s, T t) {
        return gson.toJson(t).getBytes();
    }
    
    @Override
    public void close() {
        // this is called right before destruction
    }
}

Användande

Serializers definieras genom de nödvändiga key.serializer och value.serializer producentegenskaperna.

Antag att vi har en POJO-klass som heter SensorValue och att vi vill producera meddelanden utan någon nyckel (nycklar inställda på null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ... 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());

Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ... 
producer.close();

( key.serializer är en nödvändig konfiguration. Eftersom vi inte anger meddelandenycklar, behåller vi StringSerializer levererad med kafka, som kan hantera null ).


parallellomformaren

Koda

public class GsonDeserializer<T> implements Deserializer<T> {

    public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
    private Class<T> cls;

    private Gson gson = new GsonBuilder().create();


    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));

        try {
            cls = (Class<T>) Class.forName(clsName);
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                    "Did you forget to specify the '%s' property ?%n",
                    configKey);
        }
    }


    @Override
    public T deserialize(String topic, byte[] bytes) {
        return (T) gson.fromJson(new String(bytes), cls);
    }


    @Override
    public void close() {}
}

Användande

Deserializers definieras genom de obligatoriska key.deserializer och value.deserializer .

Antag att vi har en POJO-klass som heter SensorValue och att vi vill producera meddelanden utan någon nyckel (nycklar inställda på null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ... 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());

try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
    // ... consume messages ... 
}

Här lägger vi till en anpassad egenskap till konsumentkonfigurationen, nämligen CONFIG_VALUE_CLASS . GsonDeserializer kommer att använda den i configure() för att bestämma vilken POJO-klass den ska hantera (alla egenskaper som läggs till props överförs till configure i form av en karta).



Modified text is an extract of the original Stack Overflow Documentation
Licensierat under CC BY-SA 3.0
Inte anslutet till Stack Overflow