/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.DefaultCheckpointMetadata;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.smallrye.reactive.messaging.kafka.impl.TopicPartitions;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.mutiny.core.Vertx;
import java.lang.annotation.Annotation;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;

@Experimental(value="Experimental API")
public class KafkaCheckpointCommit
extends ContextHolder
implements KafkaCommitHandler {
    private final Map<TopicPartition, CheckpointState<?>> checkpointStateMap = new ConcurrentHashMap();
    private volatile long timerId = -1L;
    private final int autoCommitInterval;
    private final KafkaConsumer<?, ?> consumer;
    private final CheckpointStateStore stateStore;
    private final BiConsumer<Throwable, Boolean> reportFailure;
    private final String consumerId;
    private final int unsyncedStateMaxAge;

    public KafkaCheckpointCommit(Vertx vertx, KafkaConsumer<?, ?> consumer, CheckpointStateStore stateStore, BiConsumer<Throwable, Boolean> reportFailure, int autoCommitInterval, int unsynchedStateMaxAge, int defaultTimeout) {
        super(vertx, defaultTimeout);
        this.consumer = consumer;
        this.consumerId = (String)consumer.configuration().get("client.id");
        this.stateStore = stateStore;
        this.reportFailure = reportFailure;
        this.autoCommitInterval = autoCommitInterval;
        this.unsyncedStateMaxAge = unsynchedStateMaxAge;
        if (unsynchedStateMaxAge <= 0) {
            KafkaLogging.log.disableCheckpointCommitStrategyHealthCheck(this.consumerId);
        } else {
            KafkaLogging.log.setCheckpointCommitStrategyUnsyncedStateMaxAge(this.consumerId, unsynchedStateMaxAge);
        }
    }

    private void stopFlushAndCheckHealthTimer() {
        if (this.timerId != -1L) {
            this.vertx.cancelTimer(this.timerId);
            this.timerId = -1L;
        }
    }

    private void startFlushAndCheckHealthTimer() {
        if (!this.checkpointStateMap.isEmpty()) {
            this.timerId = this.vertx.setTimer((long)this.autoCommitInterval, x -> this.runOnContext(this::flushAndCheckHealth));
        }
    }

    private void flushAndCheckHealth() {
        this.persistProcessingState(this.checkpointStateMap).onItemOrFailure().invoke(() -> {
            this.startFlushAndCheckHealthTimer();
            this.checkHealth();
        }).subscribe().with(unused -> {}, unused -> {});
    }

    private void checkHealth() {
        if (this.unsyncedStateMaxAge > 0) {
            for (Map.Entry<TopicPartition, CheckpointState<?>> state : this.checkpointStateMap.entrySet()) {
                TopicPartition tp = state.getKey();
                CheckpointState<?> checkpointState = state.getValue();
                long elapsed = checkpointState.millisSinceLastPersistedOffset();
                boolean waitedTooLong = elapsed > (long)this.unsyncedStateMaxAge;
                if (!waitedTooLong) continue;
                LastStateStoredTooLongAgoException exception = new LastStateStoredTooLongAgoException(tp, elapsed / 1000L, checkpointState.getProcessingState().getOffset(), checkpointState.persistedAt.getOffset());
                KafkaLogging.log.warnf((Throwable)((Object)exception), exception.getMessage(), new Object[0]);
                this.reportFailure.accept((Throwable)((Object)exception), true);
            }
        }
    }

    @Override
    public <K, V> Uni<IncomingKafkaRecord<K, V>> received(IncomingKafkaRecord<K, V> record) {
        TopicPartition tp = TopicPartitions.getTopicPartition(record);
        CheckpointState<?> state = this.checkpointStateMap.get(tp);
        if (state != null) {
            state.receivedRecord();
            record.injectMetadata(new DefaultCheckpointMetadata(tp, record.getOffset(), state));
        }
        if (this.timerId < 0L) {
            this.startFlushAndCheckHealthTimer();
        }
        return Uni.createFrom().item(record);
    }

    @Override
    public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record) {
        return Uni.createFrom().completionStage(VertxContext.runOnEventLoopContext((Context)this.context.getDelegate(), f -> {
            TopicPartition tp = TopicPartitions.getTopicPartition(record);
            CheckpointState<?> checkpointState = this.checkpointStateMap.get(tp);
            DefaultCheckpointMetadata metadata = DefaultCheckpointMetadata.fromMessage(record);
            if (metadata != null && metadata.getCheckpointState().equals(checkpointState)) {
                ProcessingState newState = metadata.getNext();
                checkpointState.processedRecord();
                if (!ProcessingState.isEmptyOrNull(newState) && metadata.isPersistOnAck()) {
                    this.persistProcessingState(Map.of(tp, checkpointState)).onFailure().recoverWithNull().emitOn(arg_0 -> ((IncomingKafkaRecord)record).runOnMessageContext(arg_0)).subscribe().with(unused -> f.complete(null), f::completeExceptionally);
                    return;
                }
            } else {
                KafkaLogging.log.acknowledgementFromRevokedTopicPartition(record.getOffset(), tp, this.consumerId, this.checkpointStateMap.keySet());
            }
            record.runOnMessageContext(() -> f.complete(null));
        }));
    }

    @Override
    public void terminate(boolean graceful) {
        long stillUnprocessed;
        if (graceful && (stillUnprocessed = this.waitForProcessing()) > 0L) {
            KafkaLogging.log.messageStillUnprocessedAfterTimeout(stillUnprocessed);
        }
        this.removeFromState(this.checkpointStateMap.keySet()).chain(this::persistProcessingState).runSubscriptionOn(this::runOnContext).await().atMost(Duration.ofMillis(this.getTimeoutInMillis()));
        this.stateStore.close();
    }

    private long waitForProcessing() {
        int attempt = this.autoCommitInterval / 100;
        for (int i = 0; i < attempt; ++i) {
            long sum = this.checkpointStateMap.values().stream().map(CheckpointState::getUnprocessedRecords).mapToLong(l -> l).sum();
            if (sum == 0L) {
                return sum;
            }
            KafkaLogging.log.waitingForMessageProcessing(sum);
            try {
                Thread.sleep(100L);
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        return this.checkpointStateMap.values().stream().map(CheckpointState::getUnprocessedRecords).mapToLong(l -> l).sum();
    }

    @Override
    public void partitionsAssigned(Collection<TopicPartition> partitions) {
        this.stopFlushAndCheckHealthTimer();
        if (partitions.isEmpty()) {
            return;
        }
        Map fetchedStates = (Map)Uni.createFrom().deferred(() -> this.stateStore.fetchProcessingState(partitions).onItem().invoke(fetched -> KafkaLogging.log.checkpointPartitionsAssigned(this.consumerId, partitions, fetched.toString())).onFailure().invoke(f -> KafkaLogging.log.failedCheckpointPartitionsAssigned(this.consumerId, partitions, (Throwable)f)).invoke(fetched -> {
            for (TopicPartition tp : partitions) {
                ProcessingState state = (ProcessingState)fetched.get(tp);
                this.checkpointStateMap.put(tp, new CheckpointState(tp, state));
            }
            this.startFlushAndCheckHealthTimer();
        })).runSubscriptionOn(this::runOnContext).await().atMost(Duration.ofMillis(this.getTimeoutInMillis()));
        Consumer<?, ?> kafkaConsumer = this.consumer.unwrap();
        for (Map.Entry entry : fetchedStates.entrySet()) {
            ProcessingState state = (ProcessingState)entry.getValue();
            kafkaConsumer.seek((TopicPartition)entry.getKey(), state != null ? state.getOffset() : 0L);
        }
    }

    private Uni<Map<TopicPartition, CheckpointState<?>>> removeFromState(Collection<TopicPartition> partitions) {
        return Uni.createFrom().emitter(e -> {
            this.stopFlushAndCheckHealthTimer();
            HashMap toRemove = new HashMap(this.checkpointStateMap);
            this.checkpointStateMap.keySet().removeAll(partitions);
            toRemove.keySet().removeAll(partitions);
            e.complete(toRemove);
        });
    }

    @Override
    public void partitionsRevoked(Collection<TopicPartition> partitions) {
        this.removeFromState(partitions).invoke(revoked -> KafkaLogging.log.checkpointPartitionsRevoked(this.consumerId, partitions, revoked.toString())).chain(this::persistProcessingState).invoke(this::startFlushAndCheckHealthTimer).runSubscriptionOn(this::runOnContext).await().atMost(Duration.ofMillis(this.getTimeoutInMillis()));
    }

    Uni<Void> persistProcessingState(Map<TopicPartition, CheckpointState<?>> stateMap) {
        HashMap map = new HashMap();
        for (Map.Entry<TopicPartition, CheckpointState<?>> entry : stateMap.entrySet()) {
            CheckpointState<?> checkpointState = entry.getValue();
            if (!checkpointState.hasUnsyncedOffset()) continue;
            map.put(entry.getKey(), checkpointState.getProcessingState());
        }
        if (map.isEmpty()) {
            return Uni.createFrom().voidItem();
        }
        return this.stateStore.persistProcessingState(map).onItem().invoke(() -> map.forEach((tp, state) -> this.checkpointStateMap.computeIfPresent((TopicPartition)tp, (t, s) -> s.withPersistedAt(OffsetPersistedAt.persisted(state.getOffset()))))).onItem().invoke(() -> KafkaLogging.log.checkpointPersistedState(this.consumerId, this.checkpointStateMap.toString())).onFailure().invoke(t -> KafkaLogging.log.checkpointFailedPersistingState(this.consumerId, this.checkpointStateMap.toString(), (Throwable)t));
    }

    public static class CheckpointState<T> {
        private final TopicPartition topicPartition;
        private final long createdTimestamp;
        private final AtomicLong received;
        private final AtomicLong processed;
        private volatile ProcessingState<T> processingState;
        private volatile OffsetPersistedAt persistedAt;

        private CheckpointState(TopicPartition topicPartition, ProcessingState<T> processingState, OffsetPersistedAt persistedAt) {
            this.topicPartition = topicPartition;
            this.createdTimestamp = System.currentTimeMillis();
            this.processingState = processingState;
            this.persistedAt = persistedAt;
            this.processed = new AtomicLong(0L);
            this.received = new AtomicLong(0L);
        }

        public CheckpointState(TopicPartition topicPartition) {
            this(topicPartition, ProcessingState.EMPTY_STATE);
        }

        public CheckpointState(TopicPartition topicPartition, ProcessingState<T> processingState) {
            this(topicPartition, ProcessingState.getOrEmpty(processingState), OffsetPersistedAt.NOT_PERSISTED);
        }

        public CheckpointState<T> withPersistedAt(OffsetPersistedAt offsetPersistedAt) {
            this.persistedAt = offsetPersistedAt;
            return this;
        }

        public synchronized ProcessingState<T> transformState(Supplier<ProcessingState<T>> initialStateSupplier, Function<ProcessingState<T>, ProcessingState<T>> transform) {
            ProcessingState<T> previousState = ProcessingState.isEmptyOrNull(this.processingState) ? initialStateSupplier.get() : this.processingState;
            this.processingState = transform.apply(previousState);
            return previousState;
        }

        public TopicPartition getTopicPartition() {
            return this.topicPartition;
        }

        public ProcessingState<T> getProcessingState() {
            return this.processingState;
        }

        public OffsetPersistedAt getPersistedAt() {
            return this.persistedAt;
        }

        public void receivedRecord() {
            this.received.incrementAndGet();
        }

        public void processedRecord() {
            this.processed.incrementAndGet();
        }

        public long getUnprocessedRecords() {
            return this.received.get() - this.processed.get();
        }

        public long millisSinceLastPersistedOffset() {
            if (this.persistedAt.notPersisted() && this.received.get() > 0L) {
                return System.currentTimeMillis() - this.createdTimestamp;
            }
            if (this.hasUnsyncedOffset()) {
                return System.currentTimeMillis() - this.persistedAt.getPersistedAt();
            }
            return -1L;
        }

        public boolean hasUnsyncedOffset() {
            return !ProcessingState.isEmptyOrNull(this.processingState) && this.processingState.getOffset() > this.persistedAt.getOffset();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            CheckpointState that = (CheckpointState)o;
            return this.createdTimestamp == that.createdTimestamp && this.topicPartition.equals((Object)that.topicPartition);
        }

        public int hashCode() {
            return Objects.hash(this.topicPartition, this.createdTimestamp);
        }

        public String toString() {
            return "CheckpointState{topicPartition=" + this.topicPartition + ", createdTimestamp=" + this.createdTimestamp + ", received=" + this.received + ", processed=" + this.processed + ", processingState=" + this.processingState + ", persistedAt=" + this.persistedAt + "}";
        }
    }

    public static class LastStateStoredTooLongAgoException
    extends NoStackTraceThrowable {
        public LastStateStoredTooLongAgoException(TopicPartition topic, long time, long currentStateOffset, long lastStoredOffset) {
            super(String.format("Latest processing state for topic-partition `%s` persisted %d seconds ago. At the moment latest registered local processing state is for offset %d. The last offset for which a state is successfully persisted was %d.", topic, time, currentStateOffset, lastStoredOffset));
        }
    }

    private static class OffsetPersistedAt {
        private final long offset;
        private final long persistedAt;
        public static OffsetPersistedAt NOT_PERSISTED = new OffsetPersistedAt(-1L, -1L);

        public static OffsetPersistedAt persisted(long offset) {
            return new OffsetPersistedAt(offset, System.currentTimeMillis());
        }

        private OffsetPersistedAt(long offset, long persistedAt) {
            this.offset = offset;
            this.persistedAt = persistedAt;
        }

        public boolean notPersisted() {
            return NOT_PERSISTED.equals(this);
        }

        public long getOffset() {
            return this.offset;
        }

        public long getPersistedAt() {
            return this.persistedAt;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            OffsetPersistedAt that = (OffsetPersistedAt)o;
            return this.offset == that.offset && this.persistedAt == that.persistedAt;
        }

        public int hashCode() {
            return Objects.hash(this.offset, this.persistedAt);
        }

        public String toString() {
            return "OffsetPersistedAt{offset=" + this.offset + ", persistedAt=" + this.persistedAt + "}";
        }
    }

    @ApplicationScoped
    @Identifier(value="checkpoint")
    public static class Factory
    implements KafkaCommitHandler.Factory {
        Instance<CheckpointStateStore.Factory> stateStoreFactory;

        @Inject
        public Factory(@Any Instance<CheckpointStateStore.Factory> stateStoreFactory) {
            this.stateStoreFactory = stateStoreFactory;
        }

        @Override
        public KafkaCommitHandler create(KafkaConnectorIncomingConfiguration config, Vertx vertx, KafkaConsumer<?, ?> consumer, BiConsumer<Throwable, Boolean> reportFailure) {
            String groupId = (String)consumer.configuration().get("group.id");
            int defaultTimeout = config.config().getOptionalValue("default.api.timeout.ms", Integer.class).orElse(60000);
            int autoCommitInterval = config.config().getOptionalValue("auto.commit.interval.ms", Integer.class).orElse(5000);
            KafkaLogging.log.settingCommitInterval(groupId, autoCommitInterval);
            String stateStoreIdentifier = config.getCheckpointStateStore().orElseGet(() -> {
                KafkaLogging.log.checkpointDefaultStateStore();
                return "file";
            });
            CheckpointStateStore.Factory factory = (CheckpointStateStore.Factory)this.stateStoreFactory.select(new Annotation[]{Identifier.Literal.of((String)stateStoreIdentifier)}).get();
            Class stateType = config.getCheckpointStateType().map(fqcn -> {
                try {
                    return Utils.loadClass((String)fqcn, Object.class);
                }
                catch (ClassNotFoundException e) {
                    KafkaLogging.log.checkpointStateTypeNotFound(config.getChannel(), (String)fqcn);
                    return null;
                }
            }).orElse(null);
            CheckpointStateStore stateStore = factory.create(config, vertx, consumer, stateType);
            return new KafkaCheckpointCommit(vertx, consumer, stateStore, reportFailure, autoCommitInterval, config.getCheckpointUnsyncedStateMaxAgeMs(), defaultTimeout);
        }
    }
}

