खोज…


परिचय

कफ़्का अपनी कतार में बाइट एरेज़ को स्टोर और ट्रांसपोर्ट करती है। काफ़्का और पीओजेओ द्वारा प्रदान की जाने वाली बाइट सरणी के बीच अनुवाद करने के लिए धारावाहिक (डी) धारावाहिक जिम्मेदार हैं।

वाक्य - विन्यास

  • सार्वजनिक शून्य कॉन्फ़िगर करें (मानचित्र <स्ट्रिंग;> कॉन्फ़िगरेशन, बूलियन isey);
  • सार्वजनिक टी deserialize (स्ट्रिंग विषय, बाइट [] बाइट्स);
  • सार्वजनिक बाइट [] क्रमबद्ध करें (स्ट्रिंग विषय, टी obj);

पैरामीटर

मापदंडों विवरण
config कॉन्फ़िगरेशन गुण ( Properties ) Producer या Consumer एक मानचित्र के रूप में, निर्माण के लिए दिए गए। इसमें नियमित रूप से काफ्का कॉन्फ़िगरेशन शामिल है, लेकिन उपयोगकर्ता-परिभाषित कॉन्फ़िगरेशन के साथ भी संवर्धित किया जा सकता है। यह (डी) धारावाहिक के लिए तर्कों को पारित करने का सबसे अच्छा तरीका है।
यह कुंजी है कस्टम (डी) धारावाहिकों का उपयोग कुंजी और / या मूल्यों के लिए किया जा सकता है। यह पैरामीटर आपको बताता है कि दोनों में से कौन से उदाहरण से निपटेगा।
विषय वर्तमान संदेश का विषय। यह आपको स्रोत / गंतव्य विषय के आधार पर कस्टम तर्क को परिभाषित करने देता है।
बाइट्स कच्चे संदेश deserialize करने के लिए
obj सीरियल करने का संदेश। इसका वास्तविक वर्ग आपके धारावाहिक पर निर्भर करता है।

टिप्पणियों

संस्करण 0.9.0.0 से पहले काफ्का जावा API का उपयोग किया Encoders और Decoders । उन्हें नए API में Serializer और Deserializer द्वारा प्रतिस्थापित किया गया है।

गन्सन (डी) के धारावाहिक निर्माता

यह उदाहरण जावा लाइब्रेरी का उपयोग करता है जावा वस्तुओं को जॉगिंग स्ट्रिंग में मैप करने के लिए। (डी) धारावाहिकों सामान्य हैं, लेकिन वे हमेशा होने की जरूरत नहीं है!

serializer

कोड

public class GsonSerializer<T> implements Serializer<T> {

    private Gson gson = new GsonBuilder().create();
    
    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        // this is called right after construction
        // use it for initialisation
    }
    
    @Override
    public byte[] serialize(String s, T t) {
        return gson.toJson(t).getBytes();
    }
    
    @Override
    public void close() {
        // this is called right before destruction
    }
}

प्रयोग

Serializers आवश्यक key.serializer और value.serializer निर्माता गुणों के माध्यम से परिभाषित किए गए हैं।

मान लें कि हम नाम के एक POJO वर्ग है SensorValue और हम किसी भी कुंजी (कुंजियों पर सेट के बिना उत्पादन संदेशों करना चाहते हैं कि null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other producer properties ... 
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", GsonSerializer.class.getName());

Producer<String, SensorValue> producer = new KafkaProducer<>(properties);
// ... produce messages ... 
producer.close();

( key.serializer एक आवश्यक कॉन्फ़िगरेशन है। चूँकि हम संदेश कुंजियाँ निर्दिष्ट नहीं करते हैं, हम StringSerializer को kafka के साथ StringSerializer रहते हैं, जो null को संभालने में सक्षम है)।


deserializer

कोड

public class GsonDeserializer<T> implements Deserializer<T> {

    public static final String CONFIG_VALUE_CLASS = "value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "key.deserializer.class";
    private Class<T> cls;

    private Gson gson = new GsonBuilder().create();


    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));

        try {
            cls = (Class<T>) Class.forName(clsName);
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                    "Did you forget to specify the '%s' property ?%n",
                    configKey);
        }
    }


    @Override
    public T deserialize(String topic, byte[] bytes) {
        return (T) gson.fromJson(new String(bytes), cls);
    }


    @Override
    public void close() {}
}

प्रयोग

Deserializers को आवश्यक key.deserializer और value.deserializer उपभोक्ता गुणों के माध्यम से परिभाषित किया गया है।

मान लें कि हम नाम के एक POJO वर्ग है SensorValue और हम किसी भी कुंजी (कुंजियों पर सेट के बिना उत्पादन संदेशों करना चाहते हैं कि null ):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// ... other consumer properties ... 
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", GsonDeserializer.class.getName());
props.put(GsonDeserializer.CONFIG_VALUE_CLASS, SensorValue.class.getName());

try (KafkaConsumer<String, SensorValue> consumer = new KafkaConsumer<>(props)) {
    // ... consume messages ... 
}

यहां, हम उपभोक्ता कॉन्फ़िगरेशन में एक कस्टम संपत्ति जोड़ते हैं, अर्थात् CONFIG_VALUE_CLASSGsonDeserializer इसका उपयोग configure() विधि में यह निर्धारित करने के लिए करेगा कि इसे कौन से POJO वर्ग को संभालना चाहिए ( props जोड़े गए सभी गुण नक्शे के रूप में configure विधि में पारित हो जाएंगे)।



Modified text is an extract of the original Stack Overflow Documentation
के तहत लाइसेंस प्राप्त है CC BY-SA 3.0
से संबद्ध नहीं है Stack Overflow