国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

Java kafka如何實(shí)現(xiàn)自定義分區(qū)類和攔截器

瀏覽:13日期:2022-08-31 13:14:07

生產(chǎn)者發(fā)送到對(duì)應(yīng)的分區(qū)有以下幾種方式:

(1)指定了patition,則直接使用;(可以查閱對(duì)應(yīng)的java api, 有多種參數(shù))

(2)未指定patition但指定key,通過對(duì)key的value進(jìn)行hash出一個(gè)patition;

(3)patition和key都未指定,使用輪詢選出一個(gè)patition。

但是kafka提供了,自定義分區(qū)算法的功能,由業(yè)務(wù)手動(dòng)實(shí)現(xiàn)分布:

1、實(shí)現(xiàn)一個(gè)自定義分區(qū)類,CustomPartitioner實(shí)現(xiàn)Partitioner

import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner { /** * * @param topic 當(dāng)前的發(fā)送的topic * @param key 當(dāng)前的key值 * @param keyBytes 當(dāng)前的key的字節(jié)數(shù)組 * @param value 當(dāng)前的value值 * @param valueBytes 當(dāng)前的value的字節(jié)數(shù)組 * @param cluster * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //這邊根據(jù)返回值就是分區(qū)號(hào), 這邊就是固定發(fā)送到三號(hào)分區(qū) return 3; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { }}

2、producer配置文件指定,具體的分區(qū)類

// 具體的分區(qū)類props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner');

技巧:可以使用ProducerConfig中提供的配置ProducerConfig

kafka producer攔截器

攔截器(interceptor)是在Kafka 0.10版本被引入的。

interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機(jī)會(huì)對(duì)消息做一些定制化需求,比如修改消息等。

許用戶指定多個(gè)interceptor按序作用于同一條消息從而形成一個(gè)攔截鏈(interceptor chain)。

所使用的類為:

org.apache.kafka.clients.producer.ProducerInterceptor

我們可以編碼測(cè)試下:

1、定義消息攔截器,實(shí)現(xiàn)消息處理(可以是加時(shí)間戳等等,unid等等。)

import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Map;import java.util.UUID;public class MessageInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> configs) { System.out.println('這是MessageInterceptor的configure方法'); } /** * 這個(gè)是消息發(fā)送之前進(jìn)行處理 * * @param record * @return */ @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { // 創(chuàng)建一個(gè)新的record,把uuid入消息體的最前部 System.out.println('為消息添加uuid'); return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),UUID.randomUUID().toString().replace('-', '') + ',' + record.value()); } /** * 這個(gè)是生產(chǎn)者回調(diào)函數(shù)調(diào)用之前處理 * @param metadata * @param exception */ @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { System.out.println('MessageInterceptor攔截器的onAcknowledgement方法'); } @Override public void close() { System.out.println('MessageInterceptor close 方法'); }}

2、定義計(jì)數(shù)攔截器

import java.util.Map;import org.apache.kafka.clients.producer.ProducerInterceptor;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;public class CounterInterceptor implements ProducerInterceptor<String, String>{ private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { System.out.println('這是CounterInterceptor的configure方法'); } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { System.out.println('CounterInterceptor計(jì)數(shù)過濾器不對(duì)消息做任何操作'); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 統(tǒng)計(jì)成功和失敗的次數(shù) System.out.println('CounterInterceptor過濾器執(zhí)行統(tǒng)計(jì)失敗和成功數(shù)量'); if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存結(jié)果 System.out.println('Successful sent: ' + successCounter); System.out.println('Failed sent: ' + errorCounter); }}

3、producer客戶端:

import org.apache.kafka.clients.producer.*;import java.util.ArrayList;import java.util.List;import java.util.Properties;public class Producer1 { public static void main(String[] args) throws Exception { Properties props = new Properties(); // Kafka服務(wù)端的主機(jī)名和端口號(hào) props.put('bootstrap.servers', 'localhost:9092'); // 等待所有副本節(jié)點(diǎn)的應(yīng)答 props.put('acks', 'all'); // 消息發(fā)送最大嘗試次數(shù) props.put('retries', 0); // 一批消息處理大小 props.put('batch.size', 16384); // 請(qǐng)求延時(shí),可能生產(chǎn)數(shù)據(jù)太快了 props.put('linger.ms', 1); // 發(fā)送緩存區(qū)內(nèi)存大小,數(shù)據(jù)是先放到生產(chǎn)者的緩沖區(qū) props.put('buffer.memory', 33554432); // key序列化 props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // value序列化 props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer'); // 具體的分區(qū)類 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 'kafka.CustomPartitioner'); //定義攔截器 List<String> interceptors = new ArrayList<>(); interceptors.add('kafka.MessageInterceptor'); interceptors.add('kafka.CounterInterceptor'); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 1; i++) { producer.send(new ProducerRecord<String, String>('test_0515', i + '', 'xxx-' + i), new Callback() {public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println('這是producer回調(diào)函數(shù)');} }); } /*System.out.println('現(xiàn)在執(zhí)行關(guān)閉producer'); producer.close();*/ producer.close(); }}

總結(jié),我們可以知道攔截器鏈各個(gè)方法的執(zhí)行順序,假如有A、B攔截器,在一個(gè)攔截器鏈中:

(1)執(zhí)行A的configure方法,執(zhí)行B的configure方法

(2)執(zhí)行A的onSend方法,B的onSend方法

(3)生產(chǎn)者發(fā)送完畢后,執(zhí)行A的onAcknowledgement方法,B的onAcknowledgement方法。

(4)執(zhí)行producer自身的callback回調(diào)函數(shù)。

(5)執(zhí)行A的close方法,B的close方法。

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持好吧啦網(wǎng)。

標(biāo)簽: Java
相關(guān)文章:
主站蜘蛛池模板: 永久免费毛片在线播放 | 欧美另类69xxxxx极品 | 俄罗斯三级毛片 | 日本加勒比网站 | 日本免费毛片在线高清看 | 看全色黄大色黄大片毛片 | 久久黄色一级视频 | 亚洲第一视频在线播放 | 亚洲国产成人久久笫一页 | 日本三级香港三级三级人 | 国产一级黄色网 | 亚洲人成网7777777国产 | 国产三级精品久久三级国专区 | 五月久久噜噜噜色影 | 日本成人免费在线观看 | 日韩中文字幕免费在线观看 | 国产精品欧美一区二区 | 亚洲成a人片毛片在线 | 成年午夜一级毛片视频 | 国产91成人精品亚洲精品 | 亚洲国产剧情在线精品视 | 久久免费播放 | 国产99久久精品 | 很黄的网站在线观看 | 欧美一级特黄aa大片视频 | 国产欧美日韩成人 | 久久视屏这里只有精品6国产 | 九九免费精品视频 | 亚洲综合日韩精品欧美综合区 | 国产精品高清视亚洲一区二区 | 中文字幕日本一区波多野不卡 | 欧美性色黄大片在线观看 | 一级做a爰片久久毛片看看 一级做a爰片久久毛片鸭王 | 国产精品久久久久久小说 | 亚洲国产欧美一区 | 午夜在线播放免费人成无 | 一区二区成人国产精品 | baby在线观看免费观看 | 欧美野外性xxxxfeexxxxx | 国产三级毛片视频 | 美女扒开腿让男生桶爽网站 |