/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.eventhandling.consumer.subscribable;

import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.HierarchicalStreamDriver;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.Consumer;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.RuntimeErrorHandler;
import org.axonframework.extensions.kafka.eventhandling.consumer.TopicSubscriber;
import org.axonframework.extensions.kafka.eventhandling.consumer.TopicSubscriberBuilder;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.serialization.Serializer;
import org.axonframework.serialization.xml.CompactDriver;
import org.axonframework.serialization.xml.XStreamSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscribableKafkaMessageSource<K, V>
implements SubscribableMessageSource<EventMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger(SubscribableKafkaMessageSource.class);
    private final TopicSubscriber subscriber;
    private final String groupId;
    private final ConsumerFactory<K, V> consumerFactory;
    private final Fetcher<K, V, EventMessage<?>> fetcher;
    private final KafkaMessageConverter<K, V> messageConverter;
    private final boolean autoStart;
    private final int consumerCount;
    private final Set<java.util.function.Consumer<List<? extends EventMessage<?>>>> eventProcessors = new CopyOnWriteArraySet();
    private final Map<Integer, Registration> fetcherRegistrations = new ConcurrentHashMap<Integer, Registration>();
    private final AtomicBoolean inProgress = new AtomicBoolean(false);

    protected SubscribableKafkaMessageSource(Builder<K, V> builder) {
        builder.validate();
        this.subscriber = builder.getSubscriber();
        this.groupId = ((Builder)builder).groupId;
        this.consumerFactory = ((Builder)builder).consumerFactory;
        this.fetcher = ((Builder)builder).fetcher;
        this.messageConverter = ((Builder)builder).messageConverter;
        this.autoStart = ((Builder)builder).autoStart;
        this.consumerCount = ((Builder)builder).consumerCount;
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder();
    }

    public Registration subscribe(java.util.function.Consumer<List<? extends EventMessage<?>>> eventProcessor) {
        if (this.eventProcessors.add(eventProcessor)) {
            logger.debug("Event Processor [{}] subscribed successfully", eventProcessor);
        } else {
            logger.info("Event Processor [{}] not added. It was already subscribed", eventProcessor);
        }
        if (this.autoStart) {
            logger.info("Starting event consumption as auto start is enabled");
            this.start();
        }
        return () -> {
            if (this.eventProcessors.remove(eventProcessor)) {
                logger.debug("Event Processor [{}] unsubscribed successfully", (Object)eventProcessor);
                if (this.eventProcessors.isEmpty() && this.autoStart) {
                    logger.info("Closing event consumption as auto start is enabled");
                    this.close();
                }
                return true;
            }
            logger.info("Event Processor [{}] not removed. It was already unsubscribed", (Object)eventProcessor);
            return false;
        };
    }

    public void start() {
        if (this.inProgress.getAndSet(true)) {
            return;
        }
        for (int consumerIndex = 0; consumerIndex < this.consumerCount; ++consumerIndex) {
            this.addConsumer(consumerIndex);
        }
    }

    private void addConsumer(int consumerIndex) {
        Consumer<K, V> consumer = this.consumerFactory.createConsumer(this.groupId);
        this.subscriber.subscribeTopics(consumer);
        Registration closeConsumer = this.fetcher.poll(consumer, consumerRecords -> StreamSupport.stream(consumerRecords.spliterator(), false).map(this.messageConverter::readKafkaMessage).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()), eventMessages -> this.eventProcessors.forEach(eventProcessor -> eventProcessor.accept(eventMessages)), this.restartOnError(consumerIndex));
        this.fetcherRegistrations.put(consumerIndex, closeConsumer);
    }

    private RuntimeErrorHandler restartOnError(int consumerIndex) {
        return e -> {
            logger.warn("Consumer had a fatal exception, starting a new one", (Throwable)e);
            this.addConsumer(consumerIndex);
        };
    }

    public void close() {
        if (this.fetcherRegistrations.isEmpty()) {
            logger.debug("No Event Processors have been subscribed who's Consumers should be closed");
            return;
        }
        this.fetcherRegistrations.values().forEach(Registration::close);
        this.fetcherRegistrations.clear();
        this.inProgress.set(false);
    }

    public static class Builder<K, V>
    extends TopicSubscriberBuilder<Builder<K, V>> {
        private String groupId;
        private ConsumerFactory<K, V> consumerFactory;
        private Fetcher<K, V, EventMessage<?>> fetcher;
        private KafkaMessageConverter<K, V> messageConverter;
        private boolean autoStart = false;
        private int consumerCount = 1;
        private Supplier<Serializer> serializer;

        public Builder<K, V> serializer(Serializer serializer) {
            BuilderUtils.assertNonNull((Object)serializer, (String)"The Serializer may not be null");
            this.serializer = () -> serializer;
            return this;
        }

        public Builder<K, V> groupId(String groupId) {
            BuilderUtils.assertThat((Object)groupId, name -> Objects.nonNull(name) && !"".equals(name), (String)"The groupId may not be null or empty");
            this.groupId = groupId;
            return this;
        }

        public Builder<K, V> consumerFactory(ConsumerFactory<K, V> consumerFactory) {
            BuilderUtils.assertNonNull(consumerFactory, (String)"ConsumerFactory may not be null");
            this.consumerFactory = consumerFactory;
            return this;
        }

        public Builder<K, V> consumerFactory(Map<String, Object> consumerConfiguration) {
            this.consumerFactory = new DefaultConsumerFactory(consumerConfiguration);
            return this;
        }

        public Builder<K, V> fetcher(Fetcher<K, V, EventMessage<?>> fetcher) {
            BuilderUtils.assertNonNull(fetcher, (String)"Fetcher may not be null");
            this.fetcher = fetcher;
            return this;
        }

        public Builder<K, V> messageConverter(KafkaMessageConverter<K, V> messageConverter) {
            BuilderUtils.assertNonNull(messageConverter, (String)"MessageConverter may not be null");
            this.messageConverter = messageConverter;
            return this;
        }

        public Builder<K, V> autoStart() {
            this.autoStart = true;
            return this;
        }

        public Builder<K, V> consumerCount(int consumerCount) {
            BuilderUtils.assertThat((Object)consumerCount, count -> count > 0, (String)"The consumer count must be a positive, none zero number");
            this.consumerCount = consumerCount;
            return this;
        }

        public SubscribableKafkaMessageSource<K, V> build() {
            return new SubscribableKafkaMessageSource(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull((Object)this.groupId, (String)"The Consumer Group Id is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.consumerFactory, (String)"The ConsumerFactory is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.fetcher, (String)"The Fetcher is a hard requirement and should be provided");
            if (this.serializer == null) {
                logger.warn("The default XStreamSerializer is used, whereas it is strongly recommended to configure the security context of the XStream instance.", (Throwable)new AxonConfigurationException("A default XStreamSerializer is used, without specifying the security context"));
                this.serializer = () -> XStreamSerializer.builder().xStream(new XStream((HierarchicalStreamDriver)new CompactDriver())).build();
            }
            if (this.messageConverter == null) {
                this.messageConverter = DefaultKafkaMessageConverter.builder().serializer(this.serializer.get()).build();
            }
        }

        @Override
        protected Builder<K, V> self() {
            return this;
        }
    }
}

