/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.extensions.kafka.eventhandling.tokenstore;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.AbstractTokenEntry;
import org.axonframework.eventhandling.tokenstore.GenericTokenEntry;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.UnableToInitializeTokenException;
import org.axonframework.extensions.kafka.eventhandling.tokenstore.TokenStoreState;
import org.axonframework.extensions.kafka.eventhandling.tokenstore.TokenUpdate;
import org.axonframework.extensions.kafka.eventhandling.tokenstore.TokenUpdateDeserializer;
import org.axonframework.extensions.kafka.eventhandling.tokenstore.TokenUpdateSerializer;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTokenStore
implements TokenStore,
Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger(KafkaTokenStore.class);
    private static final String DEFAULT_TOPIC = "__axon_token_store_updates";
    private static final String TOKEN_STORE_CLIENT_ID = "token_store_client";
    private static final String NOT_FOUND_MSG = "Unable to claim token '%s[%s]', It has not been initialized yet";
    private final String nodeId;
    private final TemporalAmount claimTimeout;
    private final TokenStoreState tokenStoreState;
    private final Serializer serializer;
    private final long readTimeOutMillis;

    public static Builder builder() {
        return new Builder();
    }

    protected KafkaTokenStore(Builder builder) {
        builder.validate();
        this.nodeId = builder.nodeId;
        this.claimTimeout = builder.claimTimeout;
        this.tokenStoreState = new TokenStoreState((Executor)builder.executorSupplier.get(), builder.topic, builder.claimTimeout, builder.consumerConfiguration, builder.producerConfiguration, builder.writeTimeout, builder.shutdownAction);
        this.serializer = builder.serializer;
        this.readTimeOutMillis = builder.readTimeOut.toMillis();
    }

    public void storeToken(@Nullable TrackingToken trackingToken, @Nonnull String processorName, int segment) throws UnableToClaimTokenException {
        Optional<TokenUpdate> current = this.tokenStoreState.getCurrent(processorName, segment).map(this::updatableToken);
        if (!current.isPresent()) {
            throw new UnableToClaimTokenException(String.format(NOT_FOUND_MSG, processorName, segment));
        }
        GenericTokenEntry tokenEntry = new GenericTokenEntry(trackingToken, this.serializer, byte[].class, processorName, segment);
        if (!tokenEntry.claim(this.nodeId, this.claimTimeout)) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It is owned by '%s'", processorName, segment, tokenEntry.getOwner()));
        }
        TokenUpdate update = new TokenUpdate((AbstractTokenEntry<byte[]>)tokenEntry, current.get().getSequenceNumber() + 1L);
        this.send(update);
    }

    public TrackingToken fetchToken(@Nonnull String processorName, int segment) throws UnableToClaimTokenException {
        Optional<TokenUpdate> current = this.tokenStoreState.getCurrent(processorName, segment);
        if (!current.isPresent()) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It has not been initialized yet", processorName, segment));
        }
        AbstractTokenEntry<byte[]> tokenEntry = current.get().toTokenEntry();
        if (!tokenEntry.claim(this.nodeId, this.claimTimeout)) {
            throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]'. It is owned by '%s'", processorName, segment, tokenEntry.getOwner()));
        }
        TokenUpdate update = new TokenUpdate(tokenEntry, current.get().getSequenceNumber() + 1L);
        this.send(update);
        return tokenEntry.getToken(this.serializer);
    }

    public boolean requiresExplicitSegmentInitialization() {
        return true;
    }

    public void releaseClaim(@Nonnull String processorName, int segment) {
        Optional<TokenUpdate> current = this.tokenStoreState.getCurrent(processorName, segment).map(this::updatableToken);
        if (!current.isPresent()) {
            throw new UnableToClaimTokenException(String.format(NOT_FOUND_MSG, processorName, segment));
        }
        this.send(this.tokenUpdateToReleaseUpdate(current.get()));
    }

    public void deleteToken(@Nonnull String processorName, int segment) throws UnableToClaimTokenException {
        Optional<TokenUpdate> current = this.tokenStoreState.getCurrent(processorName, segment).map(this::deletableToken);
        if (!current.isPresent()) {
            throw new UnableToClaimTokenException(String.format(NOT_FOUND_MSG, processorName, segment));
        }
        this.delete(current.get());
    }

    public void initializeTokenSegments(@Nonnull String processorName, int segmentCount) throws UnableToClaimTokenException {
        this.initializeTokenSegments(processorName, segmentCount, null);
    }

    public void initializeTokenSegments(@Nonnull String processorName, int segmentCount, @Nullable TrackingToken initialToken) throws UnableToClaimTokenException {
        int[] currentSegments = this.fetchSegments(processorName);
        if (currentSegments.length > 0) {
            throw new UnableToClaimTokenException(String.format("Unable to initialize tokens for '%s', already %d segments exist", processorName, currentSegments.length));
        }
        List<FutureWithContext> futures = IntStream.range(0, segmentCount).mapToObj(segment -> new GenericTokenEntry(initialToken, this.serializer, byte[].class, processorName, segment)).map(tokenEntry -> new TokenUpdate((AbstractTokenEntry<byte[]>)tokenEntry, 0L)).map(u -> {
            Future<Boolean> future = this.tokenStoreState.send((TokenUpdate)u);
            return new FutureWithContext(future, u.getProcessorName(), u.getSegment());
        }).collect(Collectors.toList());
        futures.forEach(this::handleFuture);
    }

    public void initializeSegment(@Nullable TrackingToken token, @Nonnull String processorName, int segment) throws UnableToInitializeTokenException {
        Optional<TokenUpdate> current = this.tokenStoreState.getCurrent(processorName, segment);
        if (current.isPresent()) {
            throw new UnableToInitializeTokenException(String.format("Unable to initialize token '%s[%s]', one already exist", processorName, segment));
        }
        GenericTokenEntry tokenEntry = new GenericTokenEntry(token, this.serializer, byte[].class, processorName, segment);
        this.send(new TokenUpdate((AbstractTokenEntry<byte[]>)tokenEntry, 0L));
    }

    public int[] fetchSegments(@Nonnull String processorName) {
        return this.tokenStoreState.fetchSegments(processorName);
    }

    public List<Segment> fetchAvailableSegments(@Nonnull String processorName) {
        int[] allSegments = this.fetchSegments(processorName);
        return this.tokenStoreState.fetchAll(processorName).stream().filter(this::isAvailable).map(update -> Segment.computeSegment((int)update.getSegment(), (int[])allSegments)).collect(Collectors.toList());
    }

    public void start() {
        this.tokenStoreState.start();
    }

    public void close() {
        this.tokenStoreState.close();
    }

    public void registerLifecycleHandlers(@Nonnull Lifecycle.LifecycleRegistry lifecycle) {
        lifecycle.onStart(-134217728, this::start);
        lifecycle.onShutdown(-134217728, this::close);
    }

    private void send(TokenUpdate update) {
        Future<Boolean> future = this.tokenStoreState.send(update);
        this.handleFuture(new FutureWithContext(future, update.getProcessorName(), update.getSegment()));
    }

    private void delete(TokenUpdate update) {
        Future<Boolean> future = this.tokenStoreState.send(new TokenUpdate(update, true));
        this.handleFuture(new FutureWithContext(future, update.getProcessorName(), update.getSegment()));
    }

    private void handleFuture(FutureWithContext fwc) {
        boolean result = false;
        try {
            result = fwc.future.get(this.readTimeOutMillis, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            logger.warn("interrupted while waiting for send to '{}[{}]' to return", new Object[]{fwc.processorName, fwc.segment, e});
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException e) {
            logger.warn("Error sending token '{}[{}]' to token state store", new Object[]{fwc.processorName, fwc.segment, e});
            throw new UnableToClaimTokenException(String.format("Unable to process update '%s[%s]', error sending token", fwc.processorName, fwc.segment));
        }
        catch (TimeoutException e) {
            throw new UnableToClaimTokenException(String.format("Unable to process update '%s[%s]', timed out writing to store state", fwc.processorName, fwc.segment));
        }
        if (!result) {
            throw new UnableToClaimTokenException(String.format("Unable to process update '%s[%s]', concurrent write invalidated this one", fwc.processorName, fwc.segment));
        }
    }

    private boolean isAvailable(TokenUpdate update) {
        try {
            this.updatableToken(update);
        }
        catch (UnableToClaimTokenException e) {
            logger.debug("token not available", (Throwable)e);
            return false;
        }
        return true;
    }

    private TokenUpdate updatableToken(TokenUpdate update) {
        if (update.getOwner() == null || update.getOwner().equals(this.nodeId)) {
            return update;
        }
        if (update.getTimestamp().isBefore(AbstractTokenEntry.clock.instant().minus(this.claimTimeout))) {
            return update;
        }
        throw new UnableToClaimTokenException(String.format("Unable to claim token '%s[%s]', not the owner, and claim timeout is not expired yet", update.getProcessorName(), update.getSegment()));
    }

    private TokenUpdate deletableToken(TokenUpdate update) {
        if (this.nodeId.equals(update.getOwner())) {
            return update;
        }
        throw new UnableToClaimTokenException(String.format("Unable to remove token '%s[%s]'. It is not owned by %s", update.getProcessorName(), update.getSegment(), this.nodeId));
    }

    private TokenUpdate tokenUpdateToReleaseUpdate(TokenUpdate update) {
        AbstractTokenEntry<byte[]> tokenEntry = update.toTokenEntry();
        tokenEntry.releaseClaim(update.getOwner());
        return new TokenUpdate(tokenEntry, update.getSequenceNumber() + 1L);
    }

    public static class Builder {
        private String topic = "__axon_token_store_updates";
        private Serializer serializer;
        private TemporalAmount claimTimeout = Duration.ofSeconds(10L);
        private String nodeId = ManagementFactory.getRuntimeMXBean().getName();
        private Supplier<Executor> executorSupplier = Executors::newSingleThreadExecutor;
        private Consumer<Executor> shutdownAction = executor -> {
            if (executor instanceof ExecutorService) {
                ((ExecutorService)executor).shutdown();
            }
        };
        private Map<String, Object> consumerConfiguration;
        private Map<String, Object> producerConfiguration;
        private Duration readTimeOut = Duration.ofSeconds(5L);
        private Duration writeTimeout = Duration.ofSeconds(3L);

        public Builder topic(String topic) {
            BuilderUtils.assertNonEmpty((String)topic, (String)"The topic may not be null");
            this.topic = topic;
            return this;
        }

        public Builder serializer(Serializer serializer) {
            BuilderUtils.assertNonNull((Object)serializer, (String)"Serializer may not be null");
            this.serializer = serializer;
            return this;
        }

        public Builder claimTimeout(TemporalAmount claimTimeout) {
            BuilderUtils.assertNonNull((Object)claimTimeout, (String)"The claim timeout may not be null");
            this.claimTimeout = claimTimeout;
            return this;
        }

        public Builder nodeId(String nodeId) {
            BuilderUtils.assertNonEmpty((String)nodeId, (String)"The nodeId may not be null or empty");
            this.nodeId = nodeId;
            return this;
        }

        public Builder executor(Executor executor) {
            BuilderUtils.assertNonNull((Object)executor, (String)"The executor may not be null");
            this.executorSupplier = () -> executor;
            this.shutdownAction = e -> {};
            return this;
        }

        public Builder consumerConfiguration(Map<String, Object> consumerConfiguration) {
            this.assertMinimalValidClientConfiguration(consumerConfiguration, "The consumer configuration may not be null, and needs to contain a 'bootstrap.servers' value");
            this.consumerConfiguration = this.setConsumerConfig(consumerConfiguration);
            return this;
        }

        public Builder producerConfiguration(Map<String, Object> producerConfiguration) {
            this.assertMinimalValidClientConfiguration(producerConfiguration, "The consumer configuration may not be null, and needs to contain a 'bootstrap.servers' value");
            this.producerConfiguration = this.setProducerConfig(producerConfiguration);
            return this;
        }

        public Builder readTimeOut(Duration readTimeOut) {
            BuilderUtils.assertNonNull((Object)readTimeOut, (String)"The readTimeOut may not be null");
            this.readTimeOut = readTimeOut;
            return this;
        }

        public Builder writeTimeout(Duration writeTimeout) {
            BuilderUtils.assertNonNull((Object)writeTimeout, (String)"The readTimeOut may not be null");
            this.writeTimeout = writeTimeout;
            return this;
        }

        public Builder onShutdown(Consumer<Executor> shutdownAction) {
            BuilderUtils.assertNonNull(shutdownAction, (String)"The shutdown action may not be null");
            this.shutdownAction = shutdownAction;
            return this;
        }

        public KafkaTokenStore build() {
            return new KafkaTokenStore(this);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonEmpty((String)this.topic, (String)"The topic is a hard requirement and should be provided");
            BuilderUtils.assertNonNull((Object)this.serializer, (String)"The Serializer is a hard requirement and should be provided");
            BuilderUtils.assertNonEmpty((String)this.nodeId, (String)"The nodeId is a hard requirement and should be provided");
            this.assertMinimalValidClientConfiguration(this.consumerConfiguration, "Consumer configuration is a hard requirement and should at least contain a 'bootstrap.servers' value");
            this.assertMinimalValidClientConfiguration(this.producerConfiguration, "Producer configuration is a hard requirement and should at least contain a 'bootstrap.servers' value");
        }

        private Map<String, Object> setConsumerConfig(Map<String, Object> configuration) {
            HashMap<String, Object> result = new HashMap<String, Object>(configuration);
            result.remove("group.id");
            result.put("client.id", KafkaTokenStore.TOKEN_STORE_CLIENT_ID);
            result.put("enable.auto.commit", false);
            result.put("auto.offset.reset", "earliest");
            result.put("key.deserializer", StringDeserializer.class);
            result.put("value.deserializer", TokenUpdateDeserializer.class);
            return result;
        }

        private Map<String, Object> setProducerConfig(Map<String, Object> configuration) {
            HashMap<String, Object> result = new HashMap<String, Object>(configuration);
            result.put("client.id", KafkaTokenStore.TOKEN_STORE_CLIENT_ID);
            result.put("linger.ms", 0);
            result.put("acks", "1");
            result.put("key.serializer", StringSerializer.class);
            result.put("value.serializer", TokenUpdateSerializer.class);
            return result;
        }

        private void assertMinimalValidClientConfiguration(Map<String, Object> configuration, String exceptionMessage) {
            BuilderUtils.assertNonNull(configuration, (String)exceptionMessage);
            Object bootstrapServer = configuration.get("bootstrap.servers");
            BuilderUtils.assertNonNull((Object)bootstrapServer, (String)exceptionMessage);
        }
    }

    private static class FutureWithContext {
        final Future<Boolean> future;
        final String processorName;
        final int segment;

        private FutureWithContext(Future<Boolean> future, String processorName, int segment) {
            this.future = future;
            this.processorName = processorName;
            this.segment = segment;
        }
    }
}

