/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.smallrye.reactivemessaging.kafka;

import io.quarkus.redis.client.RedisClientName;
import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.tuples.Tuple2;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingStateCodec;
import io.smallrye.reactive.messaging.kafka.commit.VertxJsonProcessingStateCodec;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.vertx.mutiny.core.Vertx;
import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Default;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import org.apache.kafka.common.TopicPartition;

public class RedisStateStore
implements CheckpointStateStore {
    public static final String REDIS_CHECKPOINT_NAME = "quarkus-redis";
    private final ReactiveRedisDataSource redis;
    private final String consumerGroupId;
    private final ProcessingStateCodec stateCodec;
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public RedisStateStore(ReactiveRedisDataSource redis, String consumerGroupId, ProcessingStateCodec stateCodec) {
        this.redis = redis;
        this.consumerGroupId = consumerGroupId;
        this.stateCodec = stateCodec;
    }

    public void close() {
        this.closed.set(true);
    }

    public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(Collection<TopicPartition> partitions) {
        if (partitions.isEmpty() || this.closed.get()) {
            return Uni.createFrom().item(Collections.emptyMap());
        }
        List tps = partitions.stream().map(tp -> Tuple2.of((Object)tp, (Object)this.getKey((TopicPartition)tp))).collect(Collectors.toList());
        return this.redis.value(byte[].class).mget((Object[])((String[])tps.stream().map(Tuple2::getItem2).toArray(String[]::new))).map(response -> response.entrySet().stream().filter(e -> e.getValue() != null).collect(Collectors.toMap(e -> this.getTpFromKey((String)e.getKey()), e -> ProcessingState.getOrEmpty((ProcessingState)this.stateCodec.decode((byte[])e.getValue())))));
    }

    private String getKey(TopicPartition partition) {
        return this.consumerGroupId + ":" + partition.topic() + ":" + partition.partition();
    }

    private TopicPartition getTpFromKey(String key) {
        String[] parts = key.split(":");
        return new TopicPartition(parts[1], Integer.parseInt(parts[2]));
    }

    public Uni<Void> persistProcessingState(Map<TopicPartition, ProcessingState<?>> states) {
        if (states.isEmpty() || this.closed.get()) {
            return Uni.createFrom().voidItem();
        }
        String[] keys = (String[])states.keySet().stream().map(this::getKey).toArray(String[]::new);
        return this.redis.withTransaction(r -> r.value(byte[].class).mget((Object[])keys), (current, r) -> {
            Map<String, byte[]> map = states.entrySet().stream().filter(toPersist -> {
                String key = this.getKey((TopicPartition)toPersist.getKey());
                ProcessingState newState = (ProcessingState)toPersist.getValue();
                if (!current.containsKey(key)) {
                    return true;
                }
                ProcessingState currentState = this.stateCodec.decode((byte[])current.get(key));
                return ProcessingState.isEmptyOrNull((ProcessingState)currentState) || !ProcessingState.isEmptyOrNull((ProcessingState)newState) && newState.getOffset() >= currentState.getOffset();
            }).collect(Collectors.toMap(e -> this.getKey((TopicPartition)e.getKey()), e -> this.stateCodec.encode((ProcessingState)e.getValue())));
            if (map.isEmpty()) {
                return Uni.createFrom().voidItem();
            }
            return r.value(byte[].class).mset(map);
        }, keys).replaceWithVoid();
    }

    @ApplicationScoped
    @Identifier(value="quarkus-redis")
    public static class Factory
    implements CheckpointStateStore.Factory {
        @Inject
        @Any
        Instance<ReactiveRedisDataSource> redisDataSource;
        @Inject
        Instance<ProcessingStateCodec.Factory> stateCodecFactory;

        public CheckpointStateStore create(KafkaConnectorIncomingConfiguration config, Vertx vertx, KafkaConsumer<?, ?> consumer, Class<?> stateType) {
            String consumerGroupId = (String)consumer.configuration().get("group.id");
            String clientName = config.config().getOptionalValue("checkpoint.quarkus-redis.client-name", String.class).orElse(null);
            ReactiveRedisDataSource rds = clientName != null ? (ReactiveRedisDataSource)this.redisDataSource.select(new Annotation[]{RedisClientName.Literal.of((String)clientName)}).get() : (ReactiveRedisDataSource)this.redisDataSource.select(new Annotation[]{Default.Literal.INSTANCE}).get();
            ProcessingStateCodec stateCodec = ((ProcessingStateCodec.Factory)CDIUtils.getInstanceById(this.stateCodecFactory, (String)config.getChannel(), () -> {
                if (this.stateCodecFactory.isUnsatisfied()) {
                    return VertxJsonProcessingStateCodec.FACTORY;
                }
                return (ProcessingStateCodec.Factory)this.stateCodecFactory.get();
            })).create(stateType);
            return new RedisStateStore(rds, consumerGroupId, stateCodec);
        }
    }
}

