/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.commit;

import io.smallrye.common.annotation.Experimental;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.kafka.IncomingKafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
import io.smallrye.reactive.messaging.kafka.commit.StateStore;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaLogging;
import io.vertx.mutiny.core.Vertx;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.jboss.logging.Logger;

@Experimental(value="Experimental API")
public abstract class KafkaCheckpointCommit
extends ContextHolder
implements KafkaCommitHandler {
    protected KafkaLogging log = (KafkaLogging)Logger.getMessageLogger(KafkaLogging.class, (String)"io.smallrye.reactive.messaging.kafka");
    protected final Map<TopicPartition, ProcessingState<?>> processingStateMap = new HashMap();
    protected final KafkaConnectorIncomingConfiguration config;
    protected final KafkaConsumer<?, ?> consumer;
    protected final BiConsumer<Throwable, Boolean> reportFailure;

    public KafkaCheckpointCommit(Vertx vertx, KafkaConnectorIncomingConfiguration config, KafkaConsumer<?, ?> consumer, BiConsumer<Throwable, Boolean> reportFailure, int defaultTimeout) {
        super(vertx, defaultTimeout);
        this.config = config;
        this.consumer = consumer;
        this.reportFailure = reportFailure;
    }

    @Override
    public <K, V> Uni<IncomingKafkaRecord<K, V>> received(IncomingKafkaRecord<K, V> record) {
        return Uni.createFrom().item(record).emitOn(this::runOnContext).onItem().transform(r -> {
            TopicPartition tp = new TopicPartition(record.getTopic(), record.getPartition());
            r.injectMetadata(new StateStore(tp, record.getOffset(), () -> this.processingStateMap.get(tp)));
            return r;
        });
    }

    @Override
    public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record) {
        TopicPartition tp = new TopicPartition(record.getTopic(), record.getPartition());
        ProcessingState newState = StateStore.getProcessingState(record);
        boolean persist = StateStore.isPersist(record);
        if (newState != null) {
            return Uni.createFrom().item(newState).emitOn(this::runOnContext).onItem().invoke(s -> this.processingStateMap.put(tp, (ProcessingState<?>)s)).chain(s -> persist ? this.persistProcessingState(tp, newState) : Uni.createFrom().voidItem()).emitOn(this::runOnContext).emitOn(arg_0 -> record.runOnMessageContext(arg_0)).replaceWithVoid();
        }
        return Uni.createFrom().voidItem();
    }

    @Override
    public void terminate(boolean graceful) {
        this.consumer.getAssignments().chain(this::persistStateFor).emitOn(this::runOnContext).invoke(this.processingStateMap::clear).await().atMost(Duration.ofMillis(this.getTimeoutInMillis()));
    }

    @Override
    public void partitionsAssigned(Collection<TopicPartition> partitions) {
        List states = (List)Multi.createFrom().iterable(partitions).emitOn(this::runOnContext).onItem().transformToUniAndConcatenate(tp -> this.fetchProcessingState((TopicPartition)tp).map(s -> Tuple2.of((Object)tp, (Object)s))).emitOn(this::runOnContext).onItem().invoke(t -> this.processingStateMap.put((TopicPartition)t.getItem1(), (ProcessingState)t.getItem2())).collect().asList().await().atMost(Duration.ofMillis(this.getTimeoutInMillis()));
        Consumer<?, ?> kafkaConsumer = this.consumer.unwrap();
        for (Tuple2 tuple : states) {
            ProcessingState state = (ProcessingState)tuple.getItem2();
            kafkaConsumer.seek((TopicPartition)tuple.getItem1(), state != null ? state.getOffset() : 0L);
        }
    }

    @Override
    public void partitionsRevoked(Collection<TopicPartition> partitions) {
        this.persistStateFor(partitions).await().atMost(Duration.ofMillis(this.getTimeoutInMillis()));
    }

    private Uni<List<Void>> persistStateFor(Collection<TopicPartition> partitions) {
        return Multi.createFrom().iterable(partitions).emitOn(this::runOnContext).onItem().transform(tp -> Tuple2.of((Object)tp, this.processingStateMap.get(tp))).skip().where(t -> t.getItem2() == null).onItem().transformToUniAndConcatenate(t -> this.persistProcessingState((TopicPartition)t.getItem1(), (ProcessingState)t.getItem2())).collect().asList();
    }

    protected abstract Uni<ProcessingState<?>> fetchProcessingState(TopicPartition var1);

    protected abstract Uni<Void> persistProcessingState(TopicPartition var1, ProcessingState<?> var2);
}

