apache-kafka
커스텀 시리얼 라이저 / 디시리얼라이저
수색…
소개
Kafka는 대기열에 바이트 배열을 저장하고 전송합니다. (de) serializer는 Kafka와 POJO가 제공하는 바이트 배열 간 변환을 담당합니다.
통사론
- public void configure (Map <String,?> config, boolean isKey);
- public T deserialize (String topic, byte [] bytes);
- 공용 바이트 [] serialize (문자열 주제, T obj);
매개 변수
| 매개 변수들 | 세부 |
|---|---|
| 구성 | 구성 속성 ( Properties )를 전달 Producer 또는 Consumer 맵으로, 생성시. 일반 kafka 구성을 포함하지만 사용자 정의 구성으로 보강 할 수도 있습니다. 인수를 (de) serializer에 전달하는 가장 좋은 방법입니다. |
| isKey | custom (de) serializer는 키 및 / 또는 값에 사용할 수 있습니다. 이 매개 변수는이 인스턴스가 처리 할 인스턴스를 지정합니다. |
| 이야기 | 현재 메시지의 주제 이를 통해 소스 / 대상 주제를 기반으로 사용자 정의 논리를 정의 할 수 있습니다. |
| 바이트 | 비 직렬화하는 원시 메시지 |
| obj | 직렬화 할 메시지입니다. 실제 클래스는 serializer에 따라 다릅니다. |
비고
버전 0.9.0.0 이전에는 Kafka 자바 API가 Encoders 와 Decoders . 그들은 새로운 API에서 Serializer 와 Deserializer 로 대체되었습니다.
Gson (de) 시리얼 라이저
이 예제는 gson 라이브러리를 사용하여 java 객체를 json 문자열에 매핑합니다. (비) 시리얼 라이저는 일반적이지만 꼭 그래야 할 필요는 없습니다!
시리얼 화기
암호
public class GsonSerializer<T> implements Serializer<T> {
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
// this is called right after construction
// use it for initialisation
}
@Override
public byte[] serialize(String s, T t) {
return gson.toJson(t).getBytes();
}
@Override
public void close() {
// this is called right before destruction
}
}
용법
serializer는 필수 key.serializer 및 value.serializer 생성자 속성을 통해 정의됩니다.
우리가 SensorValue 라는 이름의 POJO 클래스를 가지고 있고 키없이 메세지를 생성하기를 SensorValue 가정하자 (키는 null 설정된다) :
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ...
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());
Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ...
producer.close();
( key.serializer 는 필수 구성입니다. 우리는 메시지 키를 지정하지 않기 때문에 null 을 처리 할 수있는 kafka와 함께 제공된 StringSerializer 유지합니다.
디시리얼라이저
암호
public class GsonDeserializer<T> implements Deserializer<T> {
public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
private Class<T> cls;
private Gson gson = new GsonBuilder().create();
@Override
public void configure(Map<String, ?> config, boolean isKey) {
String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
String clsName = String.valueOf(config.get(configKey));
try {
cls = (Class<T>) Class.forName(clsName);
} catch (ClassNotFoundException e) {
System.err.printf("Failed to configure GsonDeserializer. " +
"Did you forget to specify the '%s' property ?%n",
configKey);
}
}
@Override
public T deserialize(String topic, byte[] bytes) {
return (T) gson.fromJson(new String(bytes), cls);
}
@Override
public void close() {}
}
용법
디시리얼라이저는 필수 key.deserializer 및 value.deserializer 소비자 속성을 통해 정의됩니다.
우리가 SensorValue 라는 이름의 POJO 클래스를 가지고 있고 키없이 메세지를 생성하기를 SensorValue 가정하자 (키는 null 설정된다) :
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ...
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());
try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
// ... consume messages ...
}
여기서는 소비자 구성에 사용자 정의 속성, 즉 CONFIG_VALUE_CLASS 합니다. GsonDeserializer 는 configure() 메소드에서 그것을 사용하여 처리해야하는 POJO 클래스를 판별합니다 ( props 추가 된 모든 특성은 맵의 형태로 configure 메소드로 전달됩니다).