/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.eventhandling.consumer;

import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.extensions.kafka.eventhandling.consumer.EventConsumer;
import org.axonframework.extensions.kafka.eventhandling.consumer.FetchEventsTask;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.RecordConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.RuntimeErrorHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsyncFetcher<K, V, E>
implements Fetcher<K, V, E> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int DEFAULT_POLL_TIMEOUT_MS = 5000;
    private final Duration pollTimeout;
    private final ExecutorService executorService;
    private final boolean requirePoolShutdown;
    private final Set<FetchEventsTask<K, V, E>> activeFetchers = ConcurrentHashMap.newKeySet();

    public static <K, V, E> Builder<K, V, E> builder() {
        return new Builder();
    }

    protected AsyncFetcher(Builder<K, V, E> builder) {
        this.pollTimeout = ((Builder)builder).pollTimeout;
        this.executorService = ((Builder)builder).executorService;
        this.requirePoolShutdown = ((Builder)builder).requirePoolShutdown;
    }

    @Override
    public Registration poll(Consumer<K, V> consumer, RecordConverter<K, V, E> recordConverter, EventConsumer<E> eventConsumer) {
        return this.poll(consumer, recordConverter, eventConsumer, e -> logger.warn("Error from fetching thread, should be handled properly", (Throwable)e));
    }

    @Override
    public Registration poll(Consumer<K, V> consumer, RecordConverter<K, V, E> recordConverter, EventConsumer<E> eventConsumer, RuntimeErrorHandler runtimeErrorHandler) {
        FetchEventsTask fetcherTask = new FetchEventsTask(consumer, this.pollTimeout, recordConverter, eventConsumer, this.activeFetchers::remove, runtimeErrorHandler);
        this.activeFetchers.add(fetcherTask);
        this.executorService.execute(fetcherTask);
        return () -> {
            fetcherTask.close();
            return true;
        };
    }

    @Override
    public void shutdown() {
        logger.info("Shutting down AsyncFetcher");
        this.activeFetchers.forEach(FetchEventsTask::close);
        if (this.requirePoolShutdown) {
            this.executorService.shutdown();
        }
    }

    public static final class Builder<K, V, E> {
        private Duration pollTimeout = Duration.ofMillis(5000L);
        private ExecutorService executorService = Executors.newCachedThreadPool((ThreadFactory)new AxonThreadFactory("AsyncFetcher"));
        private boolean requirePoolShutdown = true;

        public Builder<K, V, E> pollTimeout(long timeoutMillis) {
            BuilderUtils.assertThat((Object)timeoutMillis, timeout -> timeout > 0L, (String)("The poll timeout may not be negative [" + timeoutMillis + "]"));
            this.pollTimeout = Duration.ofMillis(timeoutMillis);
            return this;
        }

        public Builder<K, V, E> executorService(ExecutorService executorService) {
            BuilderUtils.assertNonNull((Object)executorService, (String)"ExecutorService may not be null");
            this.requirePoolShutdown = false;
            this.executorService = executorService;
            return this;
        }

        public AsyncFetcher<K, V, E> build() {
            return new AsyncFetcher(this);
        }
    }
}

