サーチ…


前書き

カフカはその配列にバイト配列を格納し、転送します。 (de)シリアライザは、KafkaとPOJOによって提供されるバイト配列間の変換を行います。

構文

  • public void configure(Map <String、?>設定、ブール値isKey);
  • public T deserialize(文字列トピック、バイト[]バイト);
  • public byte [] serialize(文字列トピック、T obj);

パラメーター

パラメーター詳細
設定作成時にProducerまたはConsumerに渡される構成プロパティ( Properties )。通常のカフカ設定が含まれていますが、ユーザ定義の設定で追加することもできます。引数を(シ)シリアライザに渡す最善の方法です。
isKey カスタム(de)シリアライザは、キーや値に使用できます。このパラメータは、このインスタンスが処理する2つのインスタンスのどちらを処理するかを示します。
トピック現在のメッセージのトピックこれにより、ソース/宛先トピックに基づいてカスタムロジックを定義できます。
バイトデシリアライズする生のメッセージ
オブジェクトシリアライズするメッセージ。実際のクラスはシリアライザによって異なります。

備考

バージョン0.9.0.0以前では、Kafka Java APIはEncodersDecoders使用していました。彼らは置き換えられましたSerializerDeserializer新しいAPIで。

Gson(de)シリアライザ

この例では、 gsonライブラリを使用してJavaオブジェクトをjson文字列にマップします。 (シ)シリアライザは一般的ですが、必ずしもそうである必要はありません!

シリアライザ

コード

public class GsonSerializer<T> implements Serializer<T> {

    private Gson gson = new GsonBuilder().create();
    
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // this is called right after construction
        // use it for initialisation
    }
    
    @Override
    public byte[] serialize(String s, T t) {
        return gson.toJson(t).getBytes();
    }
    
    @Override
    public void close() {
        // this is called right before destruction
    }
}

使用法

シリアライザは、必要なkey.serializerおよびvalue.serializerプロデューサのプロパティによって定義されます。

SensorValueという名前のPOJOクラスがあり、キーなしでメッセージを生成したいとします(キーはnull設定されていnull )。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ... 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());

Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ... 
producer.close();

key.serializerは必須の設定です。私たちはメッセージキーを指定しないので、 nullを扱うことができるStringSerializer出荷されたStringSerializerを保持しnull )。


デシリアライザ

コード

public class GsonDeserializer<T> implements Deserializer<T> {

    public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
    private Class<T> cls;

    private Gson gson = new GsonBuilder().create();


    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));

        try {
            cls = (Class<T>) Class.forName(clsName);
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                    "Did you forget to specify the '%s' property ?%n",
                    configKey);
        }
    }


    @Override
    public T deserialize(String topic, byte[] bytes) {
        return (T) gson.fromJson(new String(bytes), cls);
    }


    @Override
    public void close() {}
}

使用法

デシリアライザは、必要なkey.deserializerおよびvalue.deserializerコンシューマプロパティによって定義されます。

SensorValueという名前のPOJOクラスがあり、キーなしでメッセージを生成したいとします(キーはnull設定されていnull )。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ... 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());

try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
    // ... consume messages ... 
}

ここでは、コンシューマ設定にカスタムプロパティ、つまりCONFIG_VALUE_CLASSを追加します。 GsonDeserializerconfigure()メソッドでそれを使用して、処理する必要があるPOJOクラスを決定しconfigure() props追加されたすべてのプロパティは、マップの形式でconfigureメソッドに渡されます)。



Modified text is an extract of the original Stack Overflow Documentation
ライセンスを受けた CC BY-SA 3.0
所属していない Stack Overflow