首页>>后端>>java->聊聊如何实现一个带幂等模板的Kafka消费者

聊聊如何实现一个带幂等模板的Kafka消费者

时间:2023-12-06 本站 点击:0

前言

不知道大家有没有这样的体验,你跟你团队的成员,宣导一些开发时注意事项,比如在使用消息队列时,在消费端处理消息时,需根据业务场景,考虑一下幂等。后面走查代码的时,会发现一些资浅的开发,在需要幂等判断的场景的情况下,仍然没做幂等判断。既然宣导无效,就干脆实现一个带幂等模板的消费者,然后开发基于这个模板进行消费端业务处理。本文就以spring-Kafka举例,聊聊如何实现一个带幂等模板的kafka消费者

实现步骤

1、kafka自动提交改为手动提交

spring:    kafka:        consumer:            #  是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量            enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:false}

2、定义消费端模板抽象基类

@Slf4jpublic abstract class BaseComusmeListener {    @KafkaHandler    public final void receive(@Payload String data, @Header(value = KafkaHeaders.RECEIVED_TOPIC,required = false) String receivedTopic,                              @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY,required = false) String receivedMessageKey, @Header(value = KafkaHeaders.RECEIVED_TIMESTAMP,required = false) long receivedTimestamp, Acknowledgment ack){        KafkaComsumePayLoad kafkaComsumePayLoad = buildKafkaComsumePayLoad(data,receivedTimestamp,receivedTopic,receivedMessageKey);        boolean isRepeateConsume = isRepeateConsume(kafkaComsumePayLoad);        if(isRepeateConsume){            log.warn("messageKey:【{}】,topic:【{}】存在重复消息数据-->【{}】",receivedMessageKey,receivedTopic,data);            //手工确认            ack.acknowledge();            return;        }        if(doBiz(kafkaComsumePayLoad)){            //手工确认            ack.acknowledge();        }    }    /**     * 是否重复消费     * @param kafkaComsumePayLoad     * @return     */    public abstract boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad);    /**     * 业务处理     * @param kafkaComsumerPayLoad     */    public abstract boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad);    private KafkaComsumePayLoad buildKafkaComsumePayLoad(String data, long receivedTimestamp, String receivedTopic, String receivedMessageKey){        return KafkaComsumePayLoad.builder()                .data(data)                .receivedTimestamp(receivedTimestamp)                .receivedTopic(receivedTopic)                .receivedMessageKey(receivedMessageKey)                .build();    }}

3、自定义监听注解【可选】

@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })@Retention(RetentionPolicy.RUNTIME)@KafkaListener@Documented@Componentpublic @interface LybGeekKafkaListener {    @AliasFor(annotation = KafkaListener.class, attribute = "id")    String id() default "";    @AliasFor(annotation = KafkaListener.class, attribute = "containerFactory")    String containerFactory() default "";    @AliasFor(annotation = KafkaListener.class, attribute = "topics")    String[] topics() default {};    @AliasFor(annotation = KafkaListener.class, attribute = "topicPattern")    String topicPattern() default "";    @AliasFor(annotation = KafkaListener.class, attribute = "topicPartitions")    TopicPartition[] topicPartitions() default {};    @AliasFor(annotation = KafkaListener.class, attribute = "containerGroup")    String containerGroup() default "";    @AliasFor(annotation = KafkaListener.class, attribute = "errorHandler")    String errorHandler() default "";    @AliasFor(annotation = KafkaListener.class, attribute = "groupId")    String groupId() default "";    @AliasFor(annotation = KafkaListener.class, attribute = "idIsGroup")    boolean idIsGroup() default true;    @AliasFor(annotation = KafkaListener.class, attribute = "clientIdPrefix")    String clientIdPrefix() default "";    @AliasFor(annotation = KafkaListener.class, attribute = "beanRef")    String beanRef() default "__listener";    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")    String concurrency() default "";    @AliasFor(annotation = KafkaListener.class, attribute = "autoStartup")    String autoStartup() default "";    @AliasFor(annotation = KafkaListener.class, attribute = "properties")    String[] properties() default {};    @AliasFor(annotation = Component.class, attribute = "value")    String value() default "";}

3、重写KafkaListener注解后置处理器【可选】

注: 因示例项目的springboot版本比较低,直接使用@LybGeekKafkaListener不起作用

public class LybGeekKafkaListenerAnnotationBeanPostProcessor<K, V>        implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {    private static final String GENERATED_ID_PREFIX = "org.springframework.kafka.KafkaListenerEndpointContainer#";    /**     * The bean name of the default {@link KafkaListenerContainerFactory}.     */    public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";    private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(64));    private final Log logger = LogFactory.getLog(getClass());    private final ListenerScope listenerScope = new ListenerScope();    private KafkaListenerEndpointRegistry endpointRegistry;    private String defaultContainerFactoryBeanName = DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME;    private DefaultListableBeanFactory beanFactory;    private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =            new KafkaHandlerMethodFactoryAdapter();    private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();    private final AtomicInteger counter = new AtomicInteger();    private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();    private BeanExpressionContext expressionContext;    private Charset charset = StandardCharsets.UTF_8;    @Override    public int getOrder() {        return LOWEST_PRECEDENCE;    }    /**     * Set the {@link KafkaListenerEndpointRegistry} that will hold the created     * endpoint and manage the lifecycle of the related listener container.     * @param endpointRegistry the {@link KafkaListenerEndpointRegistry} to set.     */    public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) {        this.endpointRegistry = endpointRegistry;    }    /**     * Set the name of the {@link KafkaListenerContainerFactory} to use by default.     * <p>If none is specified, "kafkaListenerContainerFactory" is assumed to be defined.     * @param containerFactoryBeanName the {@link KafkaListenerContainerFactory} bean name.     */    public void setDefaultContainerFactoryBeanName(String containerFactoryBeanName) {        this.defaultContainerFactoryBeanName = containerFactoryBeanName;    }    /**     * Set the {@link MessageHandlerMethodFactory} to use to configure the message     * listener responsible to serve an endpoint detected by this processor.     * <p>By default, {@link DefaultMessageHandlerMethodFactory} is used and it     * can be configured further to support additional method arguments     * or to customize conversion and validation support. See     * {@link DefaultMessageHandlerMethodFactory} Javadoc for more details.     * @param messageHandlerMethodFactory the {@link MessageHandlerMethodFactory} instance.     */    public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory messageHandlerMethodFactory) {        this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(messageHandlerMethodFactory);    }    /**     * Making a {@link BeanFactory} available is optional; if not set,     * {@link KafkaListenerConfigurer} beans won't get autodetected and an     * {@link #setEndpointRegistry endpoint registry} has to be explicitly configured.     * @param beanFactory the {@link BeanFactory} to be used.     */    @Override    public void setBeanFactory(BeanFactory beanFactory) {        this.beanFactory = (DefaultListableBeanFactory) beanFactory;        if (beanFactory instanceof ConfigurableListableBeanFactory) {            this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();            this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,                    this.listenerScope);        }    }    /**     * Set a charset to use when converting byte[] to String in method arguments.     * Default UTF-8.     * @param charset the charset.     * @since 2.2     */    public void setCharset(Charset charset) {        Assert.notNull(charset, "'charset' cannot be null");        this.charset = charset;    }    @Override    public void afterSingletonsInstantiated() {        this.registrar.setBeanFactory(this.beanFactory);        if (this.beanFactory instanceof ListableBeanFactory) {            Map<String, KafkaListenerConfigurer> instances =                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);            for (KafkaListenerConfigurer configurer : instances.values()) {                configurer.configureKafkaListeners(this.registrar);            }        }        if (this.registrar.getEndpointRegistry() == null) {            if (this.endpointRegistry == null) {                Assert.state(this.beanFactory != null,                        "BeanFactory must be set to find endpoint registry by bean name");                this.endpointRegistry = this.beanFactory.getBean(                        KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,                        KafkaListenerEndpointRegistry.class);            }            this.registrar.setEndpointRegistry(this.endpointRegistry);        }        if (this.defaultContainerFactoryBeanName != null) {            this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);        }        // Set the custom handler method factory once resolved by the configurer        MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();        if (handlerMethodFactory != null) {            this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);        }        else {            addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);        }        // Actually register all listeners        this.registrar.afterPropertiesSet();        beanFactory.removeBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME);    }    @Override    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {        return bean;    }    @Override    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {        if (!this.nonAnnotatedClasses.contains(bean.getClass())) {            Class<?> targetClass = AopUtils.getTargetClass(bean);            Collection<LybGeekKafkaListener> classLevelListeners = findListenerAnnotations(targetClass);            final boolean hasClassLevelListeners = classLevelListeners.size() > 0;            final List<Method> multiMethods = new ArrayList<>();            Map<Method, Set<LybGeekKafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,                    (MethodIntrospector.MetadataLookup<Set<LybGeekKafkaListener>>) method -> {                        Set<LybGeekKafkaListener> listenerMethods = findListenerAnnotations(method);                        return (!listenerMethods.isEmpty() ? listenerMethods : null);                    });            if (hasClassLevelListeners) {                Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,                        (ReflectionUtils.MethodFilter) method ->                                AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);                multiMethods.addAll(methodsWithHandler);            }            if (annotatedMethods.isEmpty()) {                this.nonAnnotatedClasses.add(bean.getClass());                if (this.logger.isTraceEnabled()) {                    this.logger.trace("No @LybGeekKafkaListener annotations found on bean type: " + bean.getClass());                }            }            else {                // Non-empty set of methods                for (Map.Entry<Method, Set<LybGeekKafkaListener>> entry : annotatedMethods.entrySet()) {                    Method method = entry.getKey();                    for (LybGeekKafkaListener listener : entry.getValue()) {                        processKafkaListener(listener, method, bean, beanName);                    }                }                if (this.logger.isDebugEnabled()) {                    this.logger.debug(annotatedMethods.size() + " @LybGeekKafkaListener methods processed on bean '"                            + beanName + "': " + annotatedMethods);                }            }            if (hasClassLevelListeners) {                processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);            }        }        return bean;    }    /*     * AnnotationUtils.getRepeatableAnnotations does not look at interfaces     */    private Collection<LybGeekKafkaListener> findListenerAnnotations(Class<?> clazz) {        Set<LybGeekKafkaListener> listeners = new HashSet<>();        LybGeekKafkaListener ann = AnnotationUtils.findAnnotation(clazz, LybGeekKafkaListener.class);        if (ann != null) {            listeners.add(ann);        }        return listeners;    }    /*     * AnnotationUtils.getRepeatableAnnotations does not look at interfaces     */    private Set<LybGeekKafkaListener> findListenerAnnotations(Method method) {        Set<LybGeekKafkaListener> listeners = new HashSet<>();        LybGeekKafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, LybGeekKafkaListener.class);        if (ann != null) {            listeners.add(ann);        }        return listeners;    }    private void processMultiMethodListeners(Collection<LybGeekKafkaListener> classLevelListeners, List<Method> multiMethods,            Object bean, String beanName) {        List<Method> checkedMethods = new ArrayList<>();        Method defaultMethod = null;        for (Method method : multiMethods) {            Method checked = checkProxy(method, bean);            KafkaHandler annotation = AnnotationUtils.findAnnotation(method, KafkaHandler.class);            if (annotation != null && annotation.isDefault()) {                final Method toAssert = defaultMethod;                Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "                        + toAssert.toString() + " and " + method.toString());                defaultMethod = checked;            }            checkedMethods.add(checked);        }        for (LybGeekKafkaListener classLevelListener : classLevelListeners) {            MultiMethodKafkaListenerEndpoint<K, V> endpoint =                    new MultiMethodKafkaListenerEndpoint<>(checkedMethods, defaultMethod, bean);            processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);        }    }    protected void processKafkaListener(LybGeekKafkaListener kafkaListener, Method method, Object bean, String beanName) {        Method methodToUse = checkProxy(method, bean);        MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();        endpoint.setMethod(methodToUse);        processListener(endpoint, kafkaListener, bean, methodToUse, beanName);    }    private Method checkProxy(Method methodArg, Object bean) {        Method method = methodArg;        if (AopUtils.isJdkDynamicProxy(bean)) {            try {                // Found a @LybGeekKafkaListener method on the target class for this JDK proxy ->                // is it also present on the proxy itself?                method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());                Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();                for (Class<?> iface : proxiedInterfaces) {                    try {                        method = iface.getMethod(method.getName(), method.getParameterTypes());                        break;                    }                    catch (NoSuchMethodException noMethod) {                    }                }            }            catch (SecurityException ex) {                ReflectionUtils.handleReflectionException(ex);            }            catch (NoSuchMethodException ex) {                throw new IllegalStateException(String.format(                        "@LybGeekKafkaListener method '%s' found on bean target class '%s', " +                                "but not found in any interface(s) for bean JDK proxy. Either " +                                "pull the method up to an interface or switch to subclass (CGLIB) " +                                "proxies by setting proxy-target-class/proxyTargetClass " +                                "attribute to 'true'", method.getName(),                        method.getDeclaringClass().getSimpleName()), ex);            }        }        return method;    }    protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, LybGeekKafkaListener kafkaListener,            Object bean, Object adminTarget, String beanName) {        String beanRef = kafkaListener.beanRef();        if (StringUtils.hasText(beanRef)) {            this.listenerScope.addListener(beanRef, bean);        }        endpoint.setBean(bean);        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);        endpoint.setId(getEndpointId(kafkaListener));        endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));        endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));        endpoint.setTopics(resolveTopics(kafkaListener));        endpoint.setTopicPattern(resolvePattern(kafkaListener));        endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));        String group = kafkaListener.containerGroup();        if (StringUtils.hasText(group)) {            Object resolvedGroup = resolveExpression(group);            if (resolvedGroup instanceof String) {                endpoint.setGroup((String) resolvedGroup);            }        }        String concurrency = kafkaListener.concurrency();        if (StringUtils.hasText(concurrency)) {            endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));        }        String autoStartup = kafkaListener.autoStartup();        if (StringUtils.hasText(autoStartup)) {            endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));        }        resolveKafkaProperties(endpoint, kafkaListener.properties());        KafkaListenerContainerFactory<?> factory = null;        String containerFactoryBeanName = resolve(kafkaListener.containerFactory());        if (StringUtils.hasText(containerFactoryBeanName)) {            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");            try {                factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);            }            catch (NoSuchBeanDefinitionException ex) {                throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget                        + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()                        + " with id '" + containerFactoryBeanName + "' was found in the application context", ex);            }        }        endpoint.setBeanFactory(this.beanFactory);        String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");        if (StringUtils.hasText(errorHandlerBeanName)) {            endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));        }        this.registrar.registerEndpoint(endpoint, factory);        if (StringUtils.hasText(beanRef)) {            this.listenerScope.removeListener(beanRef);        }    }    private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint, String[] propertyStrings) {        if (propertyStrings.length > 0) {            Properties properties = new Properties();            for (String property : propertyStrings) {                String value = resolveExpressionAsString(property, "property");                if (value != null) {                    try {                        properties.load(new StringReader(value));                    }                    catch (IOException e) {                        this.logger.error("Failed to load property " + property + ", continuing...", e);                    }                }            }            endpoint.setConsumerProperties(properties);        }    }    private String getEndpointId(LybGeekKafkaListener kafkaListener) {        if (StringUtils.hasText(kafkaListener.id())) {            return resolveExpressionAsString(kafkaListener.id(), "id");        }        else {            return GENERATED_ID_PREFIX + this.counter.getAndIncrement();        }    }    private String getEndpointGroupId(LybGeekKafkaListener kafkaListener, String id) {        String groupId = null;        if (StringUtils.hasText(kafkaListener.groupId())) {            groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");        }        if (groupId == null && kafkaListener.idIsGroup() && StringUtils.hasText(kafkaListener.id())) {            groupId = id;        }        return groupId;    }    private TopicPartitionInitialOffset[] resolveTopicPartitions(LybGeekKafkaListener kafkaListener) {        TopicPartition[] topicPartitions = kafkaListener.topicPartitions();        List<TopicPartitionInitialOffset> result = new ArrayList<>();        if (topicPartitions.length > 0) {            for (TopicPartition topicPartition : topicPartitions) {                result.addAll(resolveTopicPartitionsList(topicPartition));            }        }        return result.toArray(new TopicPartitionInitialOffset[0]);    }    private String[] resolveTopics(LybGeekKafkaListener kafkaListener) {        String[] topics = kafkaListener.topics();        List<String> result = new ArrayList<>();        if (topics.length > 0) {            for (String topic1 : topics) {                Object topic = resolveExpression(topic1);                resolveAsString(topic, result);            }        }        return result.toArray(new String[0]);    }    private Pattern resolvePattern(LybGeekKafkaListener kafkaListener) {        Pattern pattern = null;        String text = kafkaListener.topicPattern();        if (StringUtils.hasText(text)) {            Object resolved = resolveExpression(text);            if (resolved instanceof Pattern) {                pattern = (Pattern) resolved;            }            else if (resolved instanceof String) {                pattern = Pattern.compile((String) resolved);            }            else if (resolved != null) {                throw new IllegalStateException(                        "topicPattern must resolve to a Pattern or String, not " + resolved.getClass());            }        }        return pattern;    }    private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {        Object topic = resolveExpression(topicPartition.topic());        Assert.state(topic instanceof String,                "topic in @TopicPartition must resolve to a String, not " + topic.getClass());        Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty");        String[] partitions = topicPartition.partitions();        PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();        Assert.state(partitions.length > 0 || partitionOffsets.length > 0,                "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");        List<TopicPartitionInitialOffset> result = new ArrayList<>();        for (String partition : partitions) {            resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);        }        for (PartitionOffset partitionOffset : partitionOffsets) {            TopicPartitionInitialOffset topicPartitionOffset =                    new TopicPartitionInitialOffset((String) topic,                            resolvePartition(topic, partitionOffset),                            resolveInitialOffset(topic, partitionOffset),                            isRelative(topic, partitionOffset));            if (!result.contains(topicPartitionOffset)) {                result.add(topicPartitionOffset);            }            else {                throw new IllegalArgumentException(                        String.format("@TopicPartition can't have the same partition configuration twice: [%s]",                                topicPartitionOffset));            }        }        return result;    }    private Integer resolvePartition(Object topic, PartitionOffset partitionOffset) {        Object partitionValue = resolveExpression(partitionOffset.partition());        Integer partition;        if (partitionValue instanceof String) {            Assert.state(StringUtils.hasText((String) partitionValue),                    "partition in @PartitionOffset for topic '" + topic + "' cannot be empty");            partition = Integer.valueOf((String) partitionValue);        }        else if (partitionValue instanceof Integer) {            partition = (Integer) partitionValue;        }        else {            throw new IllegalArgumentException(String.format(                    "@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'",                    topic, partitionOffset.partition(), partitionValue.getClass()));        }        return partition;    }    private Long resolveInitialOffset(Object topic, PartitionOffset partitionOffset) {        Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());        Long initialOffset;        if (initialOffsetValue instanceof String) {            Assert.state(StringUtils.hasText((String) initialOffsetValue),                    "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");            initialOffset = Long.valueOf((String) initialOffsetValue);        }        else if (initialOffsetValue instanceof Long) {            initialOffset = (Long) initialOffsetValue;        }        else {            throw new IllegalArgumentException(String.format(                    "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'",                    topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));        }        return initialOffset;    }    private boolean isRelative(Object topic, PartitionOffset partitionOffset) {        Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());        Boolean relativeToCurrent;        if (relativeToCurrentValue instanceof String) {            relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);        }        else if (relativeToCurrentValue instanceof Boolean) {            relativeToCurrent = (Boolean) relativeToCurrentValue;        }        else {            throw new IllegalArgumentException(String.format(                    "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'",                    topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));        }        return relativeToCurrent;    }    @SuppressWarnings("unchecked")    private void resolveAsString(Object resolvedValue, List<String> result) {        if (resolvedValue instanceof String[]) {            for (Object object : (String[]) resolvedValue) {                resolveAsString(object, result);            }        }        else if (resolvedValue instanceof String) {            result.add((String) resolvedValue);        }        else if (resolvedValue instanceof Iterable) {            for (Object object : (Iterable<Object>) resolvedValue) {                resolveAsString(object, result);            }        }        else {            throw new IllegalArgumentException(String.format(                    "@LybGeekKafkaListener can't resolve '%s' as a String", resolvedValue));        }    }    @SuppressWarnings("unchecked")    private void resolvePartitionAsInteger(String topic, Object resolvedValue,            List<TopicPartitionInitialOffset> result) {        if (resolvedValue instanceof String[]) {            for (Object object : (String[]) resolvedValue) {                resolvePartitionAsInteger(topic, object, result);            }        }        else if (resolvedValue instanceof String) {            Assert.state(StringUtils.hasText((String) resolvedValue),                    "partition in @TopicPartition for topic '" + topic + "' cannot be empty");            result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));        }        else if (resolvedValue instanceof Integer[]) {            for (Integer partition : (Integer[]) resolvedValue) {                result.add(new TopicPartitionInitialOffset(topic, partition));            }        }        else if (resolvedValue instanceof Integer) {            result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));        }        else if (resolvedValue instanceof Iterable) {            for (Object object : (Iterable<Object>) resolvedValue) {                resolvePartitionAsInteger(topic, object, result);            }        }        else {            throw new IllegalArgumentException(String.format(                    "@LybGeekKafkaListener for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));        }    }    private String resolveExpressionAsString(String value, String attribute) {        Object resolved = resolveExpression(value);        if (resolved instanceof String) {            return (String) resolved;        }        else if (resolved != null) {            throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "                    + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");        }        return null;    }    private Integer resolveExpressionAsInteger(String value, String attribute) {        Object resolved = resolveExpression(value);        Integer result = null;        if (resolved instanceof String) {            result = Integer.parseInt((String) resolved);        }        else if (resolved instanceof Number) {            result = ((Number) resolved).intValue();        }        else if (resolved != null) {            throw new IllegalStateException(                    "The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. "                            + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");        }        return result;    }    private Boolean resolveExpressionAsBoolean(String value, String attribute) {        Object resolved = resolveExpression(value);        Boolean result = null;        if (resolved instanceof Boolean) {            result = (Boolean) resolved;        }        else if (resolved instanceof String) {            result = Boolean.parseBoolean((String) resolved);        }        else if (resolved != null) {            throw new IllegalStateException(                    "The [" + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. "                            + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");        }        return result;    }    private Object resolveExpression(String value) {        return this.resolver.evaluate(resolve(value), this.expressionContext);    }    /**     * Resolve the specified value if possible.     * @param value the value to resolve     * @return the resolved value     * @see ConfigurableBeanFactory#resolveEmbeddedValue     */    private String resolve(String value) {        if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {            return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);        }        return value;    }    private void addFormatters(FormatterRegistry registry) {        for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {            registry.addConverter(converter);        }        for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {            registry.addConverter(converter);        }        for (Formatter<?> formatter : getBeansOfType(Formatter.class)) {            registry.addFormatter(formatter);        }    }    private <T> Collection<T> getBeansOfType(Class<T> type) {        if (LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ListableBeanFactory) {            return ((ListableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory)                    .getBeansOfType(type)                    .values();        }        else {            return Collections.emptySet();        }    }    /**     * An {@link MessageHandlerMethodFactory} adapter that offers a configurable underlying     * instance to use. Useful if the factory to use is determined once the endpoints     * have been registered but not created yet.     * @see KafkaListenerEndpointRegistrar#setMessageHandlerMethodFactory     */    private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {        private final DefaultFormattingConversionService defaultFormattingConversionService =                new DefaultFormattingConversionService();        private MessageHandlerMethodFactory messageHandlerMethodFactory;        public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {            this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;        }        @Override        public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {            return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);        }        private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {            if (this.messageHandlerMethodFactory == null) {                this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();            }            return this.messageHandlerMethodFactory;        }        private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {            DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();            Validator validator = LybGeekKafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();            if (validator != null) {                defaultFactory.setValidator(validator);            }            defaultFactory.setBeanFactory(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory);            ConfigurableBeanFactory cbf =                    LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ?                            (ConfigurableBeanFactory) LybGeekKafkaListenerAnnotationBeanPostProcessor.this.beanFactory :                            null;            this.defaultFormattingConversionService.addConverter(                    new BytesToStringConverter(LybGeekKafkaListenerAnnotationBeanPostProcessor.this.charset));            defaultFactory.setConversionService(this.defaultFormattingConversionService);            List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();            // Annotation-based argument resolution            argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));            argumentResolvers.add(new HeadersMethodArgumentResolver());            // Type-based argument resolution            final GenericMessageConverter messageConverter =                    new GenericMessageConverter(this.defaultFormattingConversionService);            argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));            argumentResolvers.add(new PayloadArgumentResolver(messageConverter, validator) {                @Override                public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {                    Object resolved = super.resolveArgument(parameter, message);                    /*                     * Replace KafkaNull list elements with null.                     */                    if (resolved instanceof List) {                        List<?> list = ((List<?>) resolved);                        for (int i = 0; i < list.size(); i++) {                            if (list.get(i) instanceof KafkaNull) {                                list.set(i, null);                            }                        }                    }                    return resolved;                }                @Override                protected boolean isEmptyPayload(Object payload) {                    return payload == null || payload instanceof KafkaNull;                }            });            defaultFactory.setArgumentResolvers(argumentResolvers);            defaultFactory.afterPropertiesSet();            return defaultFactory;        }    }    private static class BytesToStringConverter implements Converter<byte[], String> {        private final Charset charset;        BytesToStringConverter(Charset charset) {            this.charset = charset;        }        @Override        public String convert(byte[] source) {            return new String(source, this.charset);        }    }    private static class ListenerScope implements Scope {        private final Map<String, Object> listeners = new HashMap<>();        ListenerScope() {            super();        }        public void addListener(String key, Object bean) {            this.listeners.put(key, bean);        }        public void removeListener(String key) {            this.listeners.remove(key);        }        @Override        public Object get(String name, ObjectFactory<?> objectFactory) {            return this.listeners.get(name);        }        @Override        public Object remove(String name) {            return null;        }        @Override        public void registerDestructionCallback(String name, Runnable callback) {        }        @Override        public Object resolveContextualObject(String key) {            return this.listeners.get(key);        }        @Override        public String getConversationId() {            return null;        }    }}

业务侧如何使用

示例

@LybGeekKafkaListener(id = "createUser",topics = Constant.USER_TOPIC)public class UserComsumer extends BaseComusmeListener {    @Autowired    private UserService userService;    @Override    public boolean isRepeateConsume(KafkaComsumePayLoad kafkaComsumePayLoad) {        User user = JSON.parseObject(kafkaComsumePayLoad.getData(),User.class);        return userService.isExistUserByUsername(user.getUsername());    }    @Override    public boolean doBiz(KafkaComsumePayLoad kafkaComsumerPayLoad) {        User user = JSON.parseObject(kafkaComsumerPayLoad.getData(),User.class);        return userService.save(user);    }}

总结

有时候我们在宣导一些事情时,往往会发现即使我们已经说了N遍了,事情仍然会出现纰漏。这时候我们可以考虑把我们想宣导的东西工具化,通过工具来规范。比如有些业务,可能一些开发没考虑全面,我们就可以基于业务,把一些核心的场景抽象成方法,然后开发人员基于这些抽象方法,做具体实现。

demo链接

https://github.com/lyb-geek/springboot-learning/tree/master/springboot-kafka-template

原文:https://juejin.cn/post/7101110793864740877


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/15921.html