수색…


소개

Kafka는 대기열에 바이트 배열을 저장하고 전송합니다. (de) serializer는 Kafka와 POJO가 제공하는 바이트 배열 간 변환을 담당합니다.

통사론

  • public void configure (Map <String,?> config, boolean isKey);
  • public T deserialize (String topic, byte [] bytes);
  • 공용 바이트 [] serialize (문자열 주제, T obj);

매개 변수

매개 변수들 세부
구성 구성 속성 ( Properties )를 전달 Producer 또는 Consumer 맵으로, 생성시. 일반 kafka 구성을 포함하지만 사용자 정의 구성으로 보강 할 수도 있습니다. 인수를 (de) serializer에 전달하는 가장 좋은 방법입니다.
isKey custom (de) serializer는 키 및 / 또는 값에 사용할 수 있습니다. 이 매개 변수는이 인스턴스가 처리 할 인스턴스를 지정합니다.
이야기 현재 메시지의 주제 이를 통해 소스 / 대상 주제를 기반으로 사용자 정의 논리를 정의 할 수 있습니다.
바이트 비 직렬화하는 원시 메시지
obj 직렬화 할 메시지입니다. 실제 클래스는 serializer에 따라 다릅니다.

비고

버전 0.9.0.0 이전에는 Kafka 자바 API가 EncodersDecoders . 그들은 새로운 API에서 SerializerDeserializer 로 대체되었습니다.

Gson (de) 시리얼 라이저

이 예제는 gson 라이브러리를 사용하여 java 객체를 json 문자열에 매핑합니다. (비) 시리얼 라이저는 일반적이지만 꼭 그래야 할 필요는 없습니다!

시리얼 화기

암호

public class GsonSerializer<T> implements Serializer<T> {

    private Gson gson = new GsonBuilder().create();
    
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // this is called right after construction
        // use it for initialisation
    }
    
    @Override
    public byte[] serialize(String s, T t) {
        return gson.toJson(t).getBytes();
    }
    
    @Override
    public void close() {
        // this is called right before destruction
    }
}

용법

serializer는 필수 key.serializervalue.serializer 생성자 속성을 통해 정의됩니다.

우리가 SensorValue 라는 이름의 POJO 클래스를 가지고 있고 키없이 메세지를 생성하기를 SensorValue 가정하자 (키는 null 설정된다) :

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ... 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());

Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ... 
producer.close();

( key.serializer 는 필수 구성입니다. 우리는 메시지 키를 지정하지 않기 때문에 null 을 처리 할 수있는 kafka와 함께 제공된 StringSerializer 유지합니다.


디시리얼라이저

암호

public class GsonDeserializer<T> implements Deserializer<T> {

    public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
    private Class<T> cls;

    private Gson gson = new GsonBuilder().create();


    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));

        try {
            cls = (Class<T>) Class.forName(clsName);
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                    "Did you forget to specify the '%s' property ?%n",
                    configKey);
        }
    }


    @Override
    public T deserialize(String topic, byte[] bytes) {
        return (T) gson.fromJson(new String(bytes), cls);
    }


    @Override
    public void close() {}
}

용법

디시리얼라이저는 필수 key.deserializervalue.deserializer 소비자 속성을 통해 정의됩니다.

우리가 SensorValue 라는 이름의 POJO 클래스를 가지고 있고 키없이 메세지를 생성하기를 SensorValue 가정하자 (키는 null 설정된다) :

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ... 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());

try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
    // ... consume messages ... 
}

여기서는 소비자 구성에 사용자 정의 속성, 즉 CONFIG_VALUE_CLASS 합니다. GsonDeserializerconfigure() 메소드에서 그것을 사용하여 처리해야하는 POJO 클래스를 판별합니다 ( props 추가 된 모든 특성은 맵의 형태로 configure 메소드로 전달됩니다).



Modified text is an extract of the original Stack Overflow Documentation
아래 라이선스 CC BY-SA 3.0
와 제휴하지 않음 Stack Overflow