/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.eventhandling.tokenstore;

import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.axonframework.common.AxonException;
import org.axonframework.extensions.kafka.eventhandling.tokenstore.TokenStoreInitializationException;
import org.axonframework.extensions.kafka.eventhandling.tokenstore.TokenUpdate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TokenStoreState {
    private static final Logger logger = LoggerFactory.getLogger(TokenStoreState.class);
    private static final String SEQUENCE_ERROR_FORMAT = "%d is not after %d for processor %s with segment %d";
    private static final String KEY_FORMAT = "%s::%d";
    private final Map<String, Map<Integer, TokenUpdate>> state = new ConcurrentHashMap<String, Map<Integer, TokenUpdate>>();
    private final Map<UUID, CompletableFuture<Boolean>> writeResult = new ConcurrentHashMap<UUID, CompletableFuture<Boolean>>();
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final AtomicReference<CompletableFuture<Boolean>> isReady = new AtomicReference(new CompletableFuture());
    private final AtomicReference<Producer<String, TokenUpdate>> producer = new AtomicReference<Object>(null);
    private final Executor executor;
    private final String topic;
    private final TemporalAmount claimTimeout;
    private final Map<String, Object> consumerConfiguration;
    private final Map<String, Object> producerConfiguration;
    private final long writeTimeOutMillis;
    private final java.util.function.Consumer<Executor> shutdownAction;

    TokenStoreState(Executor executor, String topic, TemporalAmount claimTimeout, Map<String, Object> consumerConfiguration, Map<String, Object> producerConfiguration, Duration writeTimeOut, java.util.function.Consumer<Executor> shutdownAction) {
        this.executor = executor;
        this.topic = topic;
        this.claimTimeout = claimTimeout;
        this.consumerConfiguration = consumerConfiguration;
        this.producerConfiguration = producerConfiguration;
        this.writeTimeOutMillis = writeTimeOut.toMillis();
        this.shutdownAction = shutdownAction;
    }

    Future<Boolean> send(TokenUpdate update) {
        String key = String.format(KEY_FORMAT, update.getProcessorName(), update.getSegment());
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.writeResult.put(update.getId(), future);
        try {
            this.producer.updateAndGet(p -> {
                if (p == null) {
                    return new KafkaProducer(this.producerConfiguration);
                }
                return p;
            }).send(new ProducerRecord(this.topic, (Object)key, (Object)update)).get(this.writeTimeOutMillis, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            this.writeResult.remove(update.getId());
            future.complete(false);
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException | TimeoutException e) {
            logger.warn("error sending token update '{}[{}]' to Kafka", new Object[]{update.getProcessorName(), update.getSegment(), e});
            this.writeResult.remove(update.getId());
            future.complete(false);
        }
        return future;
    }

    Optional<TokenUpdate> getCurrent(String processorName, int segment) {
        this.waitTillReady();
        return Optional.ofNullable(this.state.get(processorName)).map(s -> (TokenUpdate)s.get(segment));
    }

    int[] fetchSegments(String processorName) {
        this.waitTillReady();
        return Optional.ofNullable(this.state.get(processorName)).map(this::toPrimitiveIntArray).orElse(new int[0]);
    }

    Collection<TokenUpdate> fetchAll(String processorName) {
        this.waitTillReady();
        return Optional.ofNullable(this.state.get(processorName)).map(Map::values).orElse(Collections.emptyList());
    }

    void start() {
        if (this.isRunning.compareAndSet(false, true)) {
            this.executor.execute(this::startConsumer);
            this.producer.set((Producer<String, TokenUpdate>)new KafkaProducer(this.producerConfiguration));
        }
    }

    void close() {
        this.isRunning.set(false);
        this.producer.updateAndGet(p -> {
            if (p != null) {
                p.close();
            }
            return null;
        });
        this.shutdownAction.accept(this.executor);
    }

    private int[] toPrimitiveIntArray(Map<Integer, TokenUpdate> segments) {
        Iterator<Integer> integers = segments.keySet().iterator();
        int[] ints = new int[segments.size()];
        for (int i = 0; i < ints.length; ++i) {
            ints[i] = integers.next();
        }
        return ints;
    }

    private void waitTillReady() {
        try {
            boolean result = this.isReady.get().get();
            if (!result) {
                this.waitTillReady();
            }
        }
        catch (InterruptedException e) {
            logger.warn("interrupted while waiting until token store state was ready", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            logger.warn("Error waiting till token store state was ready", (Throwable)e);
            throw new TokenStoreInitializationException("Token store is not ready yet");
        }
    }

    private void startConsumer() {
        block16: {
            try (KafkaConsumer consumer = new KafkaConsumer(this.consumerConfiguration);){
                Map topicInfo = consumer.listTopics();
                if (!topicInfo.containsKey(this.topic)) {
                    this.createTopic();
                    topicInfo = consumer.listTopics();
                }
                List partitions = ((List)topicInfo.get(this.topic)).stream().map(p -> new TopicPartition(p.topic(), p.partition())).collect(Collectors.toList());
                consumer.assign(partitions);
                Map endOffsets = consumer.endOffsets(partitions);
                while (this.isRunning.get()) {
                    ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
                    logger.debug("received: {} records", (Object)records.count());
                    records.forEach(this::update);
                    if (endOffsets.isEmpty()) continue;
                    ArrayList toRemove = new ArrayList();
                    endOffsets.forEach((arg_0, arg_1) -> TokenStoreState.lambda$startConsumer$4((Consumer)consumer, toRemove, arg_0, arg_1));
                    toRemove.forEach(endOffsets::remove);
                    if (!endOffsets.isEmpty()) continue;
                    this.isReady.get().complete(true);
                }
            }
            catch (Exception e) {
                this.isReady.getAndUpdate(c -> {
                    c.complete(false);
                    return new CompletableFuture();
                });
                logger.warn("Error consuming to update Kafka token store", (Throwable)e);
                if (!this.isRunning.get()) break block16;
                logger.info("Restarting consumer, the model is kept, so errors updating state are expected");
                this.executor.execute(this::startConsumer);
            }
        }
    }

    private void createTopic() {
        try (AdminClient client = AdminClient.create(this.consumerConfiguration);){
            short replicationFactor = (short)Math.min(3, ((Collection)client.describeCluster().nodes().get()).size());
            NewTopic newTopic = new NewTopic(this.topic, 1, replicationFactor);
            HashMap<String, String> topicConfig = new HashMap<String, String>();
            topicConfig.put("cleanup.policy", "compact");
            topicConfig.put("min.compaction.lag.ms", String.valueOf(2L * Duration.from(this.claimTimeout).getSeconds()));
            topicConfig.put("max.compaction.lag.ms", String.valueOf(8L * Duration.from(this.claimTimeout).getSeconds()));
            newTopic.configs(topicConfig);
            client.createTopics(Collections.singletonList(newTopic));
        }
        catch (InterruptedException | ExecutionException e) {
            logger.warn("error creating topic for Kafka token store", (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    private void update(ConsumerRecord<String, TokenUpdate> consumerRecord) {
        boolean result;
        TokenUpdate update;
        if (consumerRecord.value() != null) {
            update = (TokenUpdate)consumerRecord.value();
            result = this.updateAddition(update);
        } else {
            update = new TokenUpdate(consumerRecord.headers(), null);
            result = this.updateDeletion(update);
        }
        this.writeResult.computeIfPresent(update.getId(), (k, f) -> {
            f.complete(result);
            return null;
        });
    }

    private boolean updateAddition(TokenUpdate tokenUpdate) {
        try {
            this.updateState(tokenUpdate);
            return true;
        }
        catch (TokenStoreUpdateException e) {
            logger.info("failed to update state for '{}[{}]'", new Object[]{tokenUpdate.getProcessorName(), tokenUpdate.getSegment(), e});
            return false;
        }
    }

    private boolean updateDeletion(TokenUpdate tokenUpdate) {
        AtomicBoolean result = new AtomicBoolean(false);
        this.state.computeIfPresent(tokenUpdate.getProcessorName(), (k, v) -> {
            result.set(v.remove(tokenUpdate.getSegment()) != null);
            return v;
        });
        return result.get();
    }

    private void updateState(TokenUpdate tokenUpdate) {
        Map innerMap = this.state.computeIfAbsent(tokenUpdate.getProcessorName(), x -> new ConcurrentHashMap());
        innerMap.merge(tokenUpdate.getSegment(), tokenUpdate, this::mergeFunction);
    }

    private TokenUpdate mergeFunction(TokenUpdate oldUpdate, TokenUpdate newUpdate) {
        if (newUpdate.getSequenceNumber() > oldUpdate.getSequenceNumber()) {
            return newUpdate;
        }
        throw new TokenStoreUpdateException(String.format(SEQUENCE_ERROR_FORMAT, newUpdate.getSequenceNumber(), oldUpdate.getSequenceNumber(), newUpdate.getProcessorName(), newUpdate.getSegment()));
    }

    private static /* synthetic */ void lambda$startConsumer$4(Consumer consumer, List toRemove, TopicPartition tp, Long o) {
        long current = consumer.position(tp);
        if (current >= o) {
            toRemove.add(tp);
        }
    }

    private static class TokenStoreUpdateException
    extends AxonException {
        private TokenStoreUpdateException(String message) {
            super(message);
        }
    }
}

