国产成人精品久久免费动漫-国产成人精品天堂-国产成人精品区在线观看-国产成人精品日本-a级毛片无码免费真人-a级毛片毛片免费观看久潮喷

您的位置:首頁技術(shù)文章
文章詳情頁

使用spring boot 整合kafka,延遲啟動消費者

瀏覽:71日期:2023-02-18 15:50:56
spring boot 整合kafka,延遲啟動消費者

spring boot整合kafka的時候一般使用@KafkaListener來設(shè)置消費者,但是這種方式在spring啟動的時候就會立即開啟消費者。如果有需要根據(jù)配置信息延遲開啟指定的消費者就不能使用這種方式。

參考了類:KafkaListenerAnnotationBeanPostProcessor,我提取了一部分代碼。可以根據(jù)需要隨時動態(tài)的開啟消費者。還可以很方便的啟動多個消費者。

為了方便使用,我自定義了一個注解:

import org.springframework.kafka.annotation.TopicPartition; import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target; @Target({ ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)public @interface DelayKafkaConsumer { String id() default ''; String[] topics() default {}; String errorHandler() default ''; String groupId() default ''; TopicPartition[] topicPartitions() default {}; String beanRef() default '__listener';}配合注解使用的factory:

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.aop.framework.Advised;import org.springframework.aop.support.AopUtils;import org.springframework.beans.BeansException;import org.springframework.beans.factory.BeanFactory;import org.springframework.beans.factory.BeanFactoryAware;import org.springframework.beans.factory.ListableBeanFactory;import org.springframework.beans.factory.ObjectFactory;import org.springframework.beans.factory.config.*;import org.springframework.context.expression.StandardBeanExpressionResolver;import org.springframework.core.MethodIntrospector;import org.springframework.core.annotation.AnnotationUtils;import org.springframework.core.convert.converter.Converter;import org.springframework.core.convert.converter.GenericConverter;import org.springframework.format.Formatter;import org.springframework.format.FormatterRegistry;import org.springframework.format.support.DefaultFormattingConversionService;import org.springframework.kafka.annotation.KafkaListenerConfigurer;import org.springframework.kafka.annotation.PartitionOffset;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.kafka.config.KafkaListenerEndpoint;import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;import org.springframework.kafka.config.MethodKafkaListenerEndpoint;import org.springframework.kafka.listener.KafkaListenerErrorHandler;import org.springframework.kafka.support.KafkaNull;import org.springframework.kafka.support.TopicPartitionInitialOffset;import org.springframework.messaging.converter.GenericMessageConverter;import org.springframework.messaging.handler.annotation.support.*;import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;import org.springframework.stereotype.Service;import org.springframework.util.Assert;import org.springframework.util.ReflectionUtils;import org.springframework.util.StringUtils; import java.lang.reflect.Method;import java.util.*;import java.util.concurrent.atomic.AtomicInteger; @Servicepublic class MyKafkaConsumerFactory implements KafkaListenerConfigurer,BeanFactoryAware { private static final Logger logger = LoggerFactory.getLogger(MyKafkaConsumerFactory.class); private KafkaListenerEndpointRegistrar kafkaListenerEndpointRegistrar; private final AtomicInteger counter = new AtomicInteger(); private BeanFactory beanFactory; private BeanExpressionResolver resolver = new StandardBeanExpressionResolver(); private BeanExpressionContext expressionContext; private final ListenerScope listenerScope = new ListenerScope(); private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory = new KafkaHandlerMethodFactoryAdapter(); @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {this.kafkaListenerEndpointRegistrar = registrar;addFormatters(messageHandlerMethodFactory.defaultFormattingConversionService); } public void startConsumer(KafkaListenerEndpoint endpoint){kafkaListenerEndpointRegistrar.registerEndpoint(endpoint); } public void startConsumer(Object target){logger.info('start consumer {} ...',target.getClass());Class<?> targetClass = AopUtils.getTargetClass(target);Map<Method, Set<DelayKafkaConsumer>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,new MethodIntrospector.MetadataLookup<Set<DelayKafkaConsumer>>() { @Override public Set<DelayKafkaConsumer> inspect(Method method) {Set<DelayKafkaConsumer> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null); } });if (annotatedMethods.size()==0) throw new IllegalArgumentException(target.getClass()+' need have method with @DelayKafkaConsumer');for (Map.Entry<Method, Set<DelayKafkaConsumer>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); logger.info('find message listen handler method : {} , object : {}',method.getName(),target.getClass()); for (DelayKafkaConsumer listener : entry.getValue()) {if(listener.topics().length==0) { logger.info('topics value is empty , will skip it , method : {} , target object : {}',method.getName(),target.getClass()); continue;}processKafkaListener(listener,method,target);logger.info('register method {} success , target object : {}',method.getName(),target.getClass()); }}logger.info('{} consumer start complete .',target.getClass()); } protected void processKafkaListener(DelayKafkaConsumer kafkaListener, Method method, Object bean) {Method methodToUse = checkProxy(method, bean);MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint();endpoint.setMethod(methodToUse);endpoint.setBeanFactory(this.beanFactory);String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), 'errorHandler');if (StringUtils.hasText(errorHandlerBeanName)) { endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));}processListener(endpoint, kafkaListener, bean, methodToUse); } protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, DelayKafkaConsumer kafkaListener, Object bean, Object adminTarget) {String beanRef = kafkaListener.beanRef();if (StringUtils.hasText(beanRef)) { this.listenerScope.addListener(beanRef, bean);}endpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(kafkaListener));endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));endpoint.setTopics(resolveTopics(kafkaListener));endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);if (StringUtils.hasText(beanRef)) { this.listenerScope.removeListener(beanRef);} } private String getEndpointId(DelayKafkaConsumer kafkaListener) {if (StringUtils.hasText(kafkaListener.id())) { return resolve(kafkaListener.id());}else { return 'Custom-Consumer' + this.counter.getAndIncrement();} } private String getEndpointGroupId(DelayKafkaConsumer kafkaListener, String id) {String groupId = null;if (StringUtils.hasText(kafkaListener.groupId())) { groupId = resolveExpressionAsString(kafkaListener.groupId(), 'groupId');}if (groupId == null && StringUtils.hasText(kafkaListener.id())) { groupId = id;}return groupId; } private String[] resolveTopics(DelayKafkaConsumer kafkaListener) {String[] topics = kafkaListener.topics();List<String> result = new ArrayList<>();if (topics.length > 0) { for (int i = 0; i < topics.length; i++) {Object topic = resolveExpression(topics[i]);resolveAsString(topic, result); }}return result.toArray(new String[result.size()]); } private void resolveAsString(Object resolvedValue, List<String> result) {if (resolvedValue instanceof String[]) { for (Object object : (String[]) resolvedValue) {resolveAsString(object, result); }}else if (resolvedValue instanceof String) { result.add((String) resolvedValue);}else if (resolvedValue instanceof Iterable) { for (Object object : (Iterable<Object>) resolvedValue) {resolveAsString(object, result); }}else { throw new IllegalArgumentException(String.format( '@DelayKafkaConsumer can’t resolve ’%s’ as a String', resolvedValue));} } private TopicPartitionInitialOffset[] resolveTopicPartitions(DelayKafkaConsumer kafkaListener) {TopicPartition[] topicPartitions = kafkaListener.topicPartitions();List<TopicPartitionInitialOffset> result = new ArrayList<>();if (topicPartitions.length > 0) { for (TopicPartition topicPartition : topicPartitions) {result.addAll(resolveTopicPartitionsList(topicPartition)); }}return result.toArray(new TopicPartitionInitialOffset[result.size()]); } private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {Object topic = resolveExpression(topicPartition.topic());Assert.state(topic instanceof String,'topic in @TopicPartition must resolve to a String, not ' + topic.getClass());Assert.state(StringUtils.hasText((String) topic), 'topic in @TopicPartition must not be empty');String[] partitions = topicPartition.partitions();PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();Assert.state(partitions.length > 0 || partitionOffsets.length > 0,'At least one ’partition’ or ’partitionOffset’ required in @TopicPartition for topic ’' + topic + '’');List<TopicPartitionInitialOffset> result = new ArrayList<>();for (int i = 0; i < partitions.length; i++) { resolvePartitionAsInteger((String) topic, resolveExpression(partitions[i]), result);} for (PartitionOffset partitionOffset : partitionOffsets) { Object partitionValue = resolveExpression(partitionOffset.partition()); Integer partition; if (partitionValue instanceof String) {Assert.state(StringUtils.hasText((String) partitionValue),'partition in @PartitionOffset for topic ’' + topic + '’ cannot be empty');partition = Integer.valueOf((String) partitionValue); } else if (partitionValue instanceof Integer) {partition = (Integer) partitionValue; } else {throw new IllegalArgumentException(String.format('@PartitionOffset for topic ’%s’ can’t resolve ’%s’ as an Integer or String, resolved to ’%s’',topic, partitionOffset.partition(), partitionValue.getClass())); } Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset()); Long initialOffset; if (initialOffsetValue instanceof String) {Assert.state(StringUtils.hasText((String) initialOffsetValue),'’initialOffset’ in @PartitionOffset for topic ’' + topic + '’ cannot be empty');initialOffset = Long.valueOf((String) initialOffsetValue); } else if (initialOffsetValue instanceof Long) {initialOffset = (Long) initialOffsetValue; } else {throw new IllegalArgumentException(String.format('@PartitionOffset for topic ’%s’ can’t resolve ’%s’ as a Long or String, resolved to ’%s’',topic, partitionOffset.initialOffset(), initialOffsetValue.getClass())); } Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent()); Boolean relativeToCurrent; if (relativeToCurrentValue instanceof String) {relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue); } else if (relativeToCurrentValue instanceof Boolean) {relativeToCurrent = (Boolean) relativeToCurrentValue; } else {throw new IllegalArgumentException(String.format('@PartitionOffset for topic ’%s’ can’t resolve ’%s’ as a Boolean or String, resolved to ’%s’',topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass())); } TopicPartitionInitialOffset topicPartitionOffset = new TopicPartitionInitialOffset((String) topic, partition, initialOffset, relativeToCurrent); if (!result.contains(topicPartitionOffset)) {result.add(topicPartitionOffset); } else {throw new IllegalArgumentException(String.format('@TopicPartition can’t have the same partition configuration twice: [%s]',topicPartitionOffset)); }}return result; } private void resolvePartitionAsInteger(String topic, Object resolvedValue, List<TopicPartitionInitialOffset> result) {if (resolvedValue instanceof String[]) { for (Object object : (String[]) resolvedValue) {resolvePartitionAsInteger(topic, object, result); }}else if (resolvedValue instanceof String) { Assert.state(StringUtils.hasText((String) resolvedValue), 'partition in @TopicPartition for topic ’' + topic + '’ cannot be empty'); result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));}else if (resolvedValue instanceof Integer[]) { for (Integer partition : (Integer[]) resolvedValue) {result.add(new TopicPartitionInitialOffset(topic, partition)); }}else if (resolvedValue instanceof Integer) { result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));}else if (resolvedValue instanceof Iterable) { for (Object object : (Iterable<Object>) resolvedValue) {resolvePartitionAsInteger(topic, object, result); }}else { throw new IllegalArgumentException(String.format( '@DelayKafkaConsumer for topic ’%s’ can’t resolve ’%s’ as an Integer or String', topic, resolvedValue));} } private Set<DelayKafkaConsumer> findListenerAnnotations(Method method) {Set<DelayKafkaConsumer> listeners = new HashSet<>();DelayKafkaConsumer ann = AnnotationUtils.findAnnotation(method, DelayKafkaConsumer.class);if (ann != null) { listeners.add(ann);}return listeners; } private Method checkProxy(Method methodArg, Object bean) {Method method = methodArg;if (AopUtils.isJdkDynamicProxy(bean)) { try {method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();for (Class<?> iface : proxiedInterfaces) { try {method = iface.getMethod(method.getName(), method.getParameterTypes());break; } catch (NoSuchMethodException noMethod) { }} } catch (SecurityException ex) {ReflectionUtils.handleReflectionException(ex); } catch (NoSuchMethodException ex) {throw new IllegalStateException(String.format('target method ’%s’ found on bean target class ’%s’, ' +'but not found in any interface(s) for bean JDK proxy. Either ' +'pull the method up to an interface or switch to subclass (CGLIB) ' +'proxies by setting proxy-target-class/proxyTargetClass ' +'attribute to ’true’', method.getName(), method.getDeclaringClass().getSimpleName()), ex); }}return method; } @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException {this.beanFactory = beanFactory;if (beanFactory instanceof ConfigurableListableBeanFactory) { this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver(); this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, this.listenerScope);} } private String resolveExpressionAsString(String value, String attribute) {Object resolved = resolveExpression(value);if (resolved instanceof String) { return (String) resolved;}else { throw new IllegalStateException('The [' + attribute + '] must resolve to a String. ' + 'Resolved to [' + resolved.getClass() + '] for [' + value + ']');} } private Object resolveExpression(String value) {String resolvedValue = resolve(value);return this.resolver.evaluate(resolvedValue, this.expressionContext); } /** * Resolve the specified value if possible. * @param value the value to resolve * @return the resolved value * @see ConfigurableBeanFactory#resolveEmbeddedValue */ private String resolve(String value) {if (this.beanFactory instanceof ConfigurableBeanFactory) { return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);}return value; } private void addFormatters(FormatterRegistry registry) {for (Converter<?, ?> converter : getBeansOfType(Converter.class)) { registry.addConverter(converter);}for (GenericConverter converter : getBeansOfType(GenericConverter.class)) { registry.addConverter(converter);}for (org.springframework.format.Formatter<?> formatter : getBeansOfType(Formatter.class)) { registry.addFormatter(formatter);} } private <T> Collection<T> getBeansOfType(Class<T> type) {if (this.beanFactory instanceof ListableBeanFactory) { return ((ListableBeanFactory) this.beanFactory).getBeansOfType(type).values();}else { return Collections.emptySet();} } private static class ListenerScope implements Scope { private final Map<String, Object> listeners = new HashMap<>(); ListenerScope() { super();} public void addListener(String key, Object bean) { this.listeners.put(key, bean);} public void removeListener(String key) { this.listeners.remove(key);} @Overridepublic Object get(String name, ObjectFactory<?> objectFactory) { return this.listeners.get(name);} @Overridepublic Object remove(String name) { return null;} @Overridepublic void registerDestructionCallback(String name, Runnable callback) {} @Overridepublic Object resolveContextualObject(String key) { return this.listeners.get(key);} @Overridepublic String getConversationId() { return null;} } private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory { private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService(); private MessageHandlerMethodFactory messageHandlerMethodFactory; public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) { this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;} @Overridepublic InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) { return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);} private MessageHandlerMethodFactory getMessageHandlerMethodFactory() { if (this.messageHandlerMethodFactory == null) {this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory(); } return this.messageHandlerMethodFactory;} private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory(); defaultFactory.setBeanFactory(MyKafkaConsumerFactory.this.beanFactory); ConfigurableBeanFactory cbf = (MyKafkaConsumerFactory.this.beanFactory instanceof ConfigurableBeanFactory ? (ConfigurableBeanFactory) MyKafkaConsumerFactory.this.beanFactory : null); defaultFactory.setConversionService(this.defaultFormattingConversionService); List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>(); // Annotation-based argument resolution argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf)); argumentResolvers.add(new HeadersMethodArgumentResolver()); // Type-based argument resolution final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService); argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter)); argumentResolvers.add(new PayloadArgumentResolver(messageConverter) { @Overrideprotected boolean isEmptyPayload(Object payload) { return payload == null || payload instanceof KafkaNull;} }); defaultFactory.setArgumentResolvers(argumentResolvers); defaultFactory.afterPropertiesSet(); return defaultFactory;} }}

通過startConsumer來啟動一個消費者(多次調(diào)用會啟動多個消費者)。target必須至少包含一個有@DelayKafkaConsumer注解的方法。這里類似@KafkaListener。我去掉了一部分功能,保留了比較常用的部分。

這里提供了一個通過注解的方式在spring boot項目中動態(tài)控制consumer的方法。還有其他的方法來達到這種效果,不過我覺得這種方法比較方便。

java項目集成springboot使用kafka消費者,啟動失敗報錯 Failed to construct kafka consumer

之前博客里面提到本公司為物聯(lián)網(wǎng)項目。項目中使用mqtt+kafka進行與設(shè)備端的通訊,之前的協(xié)議格式為json格式,現(xiàn)在改成字節(jié)數(shù)組byte[]格式進行通信。

集成springboot后,具體的demo網(wǎng)上很多,接下來有時間會出一份kafka的demo。

報錯信息如下:

Failed to start bean ’org.springframework.kafka.config.internalKafkaListenerEndpointRegistry’; nested exception is org.apache.kafka.common.KafkaException:Failed to construct kafka consumer

原因分析:

之前json格式通信時候,構(gòu)建kafka消費工廠的時候,其中ConcurrentMessageListenerContainer的key為String類型,而value現(xiàn)在為byte[]類型,所以構(gòu)建消費者工廠的時候需要指定正確的value類型。

代碼如下:

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerByteFactory() { ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<String, byte[]>(); factory.setConsumerFactory(consumerByteFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; }

整體kafka生產(chǎn)者+kafka消費者的demo會在接下來的博客中陸續(xù)整理。

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持好吧啦網(wǎng)。

標簽: Spring
主站蜘蛛池模板: 精品真实国产乱文在线 | 亚洲一区二区三区四区在线 | 国产二区自拍 | 亚洲天堂二区 | 日本特爽特黄特刺激大片 | 国内精品一区二区2021在线 | 亚洲美女福利视频在线 | 国产年成美女网站视频免费看 | 全球成人网 | 欧美视频xxxxx | 国产成人精品免费视频软件 | 亚洲黄网址| 国内精品久久久久久久亚洲 | 国产精品亚洲片在线va | 精品午夜久久网成年网 | 国产成人十八黄网片 | 日本国产在线视频 | 在线午夜影院 | 中国老太卖淫播放毛片 | 成人性生免费视频 | 久久99精品久久久久久秒播放器 | 在线国产网站 | 91色久 | 九九九九热精品免费视频 | 成年人色网站 | 国产视频自拍一区 | 欧美aaaa在线观看视频免费 | 亚洲国产精品成 | 久草久在线 | 中国一级淫片aaa毛片毛片 | 免费国产成人高清在线观看不卡 | 俄罗斯一级毛片免费播放 | 亚洲成人偷拍 | 9191精品国产观看 | 亚洲综合一区二区精品久久 | 免费毛片a线观看 | 国产精品成人免费视频 | 韩国一级毛片在线观看 | 521a久久九九久久精品 | 1717she国产精品免费视频 | 中文 日本 免费 高清 |