/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.eventhubs.checkpointstore.jedis;

import com.azure.core.exception.AzureException;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.core.util.serializer.JsonSerializer;
import com.azure.core.util.serializer.JsonSerializerProviders;
import com.azure.core.util.serializer.TypeReference;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;

public final class JedisCheckpointStore
implements CheckpointStore {
    private static final String PARTITION_ID_KEY = "partitionId";
    private static final ClientLogger LOGGER = new ClientLogger(JedisCheckpointStore.class);
    static final JsonSerializer DEFAULT_SERIALIZER = JsonSerializerProviders.createInstance((boolean)true);
    static final byte[] CHECKPOINT = "checkpoint".getBytes(StandardCharsets.UTF_8);
    static final byte[] PARTITION_OWNERSHIP = "partitionOwnership".getBytes(StandardCharsets.UTF_8);
    private final JedisPool jedisPool;

    public JedisCheckpointStore(JedisPool jedisPool) {
        if (jedisPool == null) {
            throw LOGGER.logExceptionAsError(Exceptions.propagate((Throwable)new IllegalArgumentException("JedisPool object supplied to constructor is null.")));
        }
        this.jedisPool = jedisPool;
    }

    public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {
        return Flux.fromIterable(requestedPartitionOwnerships).handle((partitionOwnership, sink) -> {
            String partitionId = partitionOwnership.getPartitionId();
            String fullyQualifiedNamespace = partitionOwnership.getFullyQualifiedNamespace();
            String eventHubName = partitionOwnership.getEventHubName();
            String consumerGroup = partitionOwnership.getConsumerGroup();
            byte[] key = JedisCheckpointStore.keyBuilder(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId);
            byte[] serializedOwnership = DEFAULT_SERIALIZER.serializeToBytes(partitionOwnership);
            try (Jedis jedis = this.jedisPool.getResource();){
                jedis.watch((byte[][])new byte[][]{key});
                List keyInformation = jedis.hmget(key, (byte[][])new byte[][]{PARTITION_OWNERSHIP});
                long lastModifiedTimeSeconds = Long.parseLong((String)jedis.time().get(0));
                partitionOwnership.setLastModifiedTime(Long.valueOf(lastModifiedTimeSeconds));
                partitionOwnership.setETag("");
                if (keyInformation == null || keyInformation.isEmpty() || keyInformation.get(0) == null) {
                    try {
                        long result = jedis.hsetnx(key, PARTITION_OWNERSHIP, serializedOwnership);
                        if (result == 1L) {
                            sink.next(partitionOwnership);
                        } else {
                            JedisCheckpointStore.addEventHubInformation(LOGGER.atVerbose(), fullyQualifiedNamespace, eventHubName, consumerGroup).addKeyValue(PARTITION_ID_KEY, partitionId).log("Unable to create new partition ownership entry.");
                            sink.error((Throwable)new AzureException("Unable to claim partition: " + partitionId + " Partition ownership created already."));
                        }
                    }
                    finally {
                        jedis.unwatch();
                    }
                    return;
                }
                Transaction transaction = jedis.multi();
                transaction.hset(key, PARTITION_OWNERSHIP, serializedOwnership);
                List executionResponse = transaction.exec();
                if (executionResponse == null) {
                    sink.error((Throwable)JedisCheckpointStore.createClaimPartitionException(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, "Transaction was aborted."));
                } else if (executionResponse.isEmpty()) {
                    sink.error((Throwable)JedisCheckpointStore.createClaimPartitionException(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, "No command results in transaction result."));
                } else if (executionResponse.get(0) == null) {
                    sink.error((Throwable)JedisCheckpointStore.createClaimPartitionException(fullyQualifiedNamespace, eventHubName, consumerGroup, partitionId, "Executing update command resulted in null."));
                } else {
                    JedisCheckpointStore.addEventHubInformation(LOGGER.atVerbose(), fullyQualifiedNamespace, eventHubName, consumerGroup).addKeyValue(PARTITION_ID_KEY, partitionId).log("Claimed partition.");
                    sink.next(partitionOwnership);
                }
            }
        });
    }

    public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        byte[] prefix = JedisCheckpointStore.prefixBuilder(fullyQualifiedNamespace, eventHubName, consumerGroup);
        try (Jedis jedis = this.jedisPool.getResource();){
            ArrayList<Checkpoint> listStoredCheckpoints = new ArrayList<Checkpoint>();
            Set members = jedis.smembers(prefix);
            if (members.isEmpty()) {
                Flux flux = Flux.fromIterable(listStoredCheckpoints);
                return flux;
            }
            for (byte[] member : members) {
                List checkpointJsonList = jedis.hmget(member, (byte[][])new byte[][]{CHECKPOINT});
                if (!checkpointJsonList.isEmpty()) {
                    byte[] checkpointJson = (byte[])checkpointJsonList.get(0);
                    if (checkpointJson == null) {
                        JedisCheckpointStore.addEventHubInformation(LOGGER.atVerbose(), fullyQualifiedNamespace, eventHubName, consumerGroup).log("No checkpoint persists yet.");
                        continue;
                    }
                    Checkpoint checkpoint = (Checkpoint)DEFAULT_SERIALIZER.deserializeFromBytes(checkpointJson, TypeReference.createInstance(Checkpoint.class));
                    listStoredCheckpoints.add(checkpoint);
                    continue;
                }
                JedisCheckpointStore.addEventHubInformation(LOGGER.atVerbose(), fullyQualifiedNamespace, eventHubName, consumerGroup).log("No checkpoint persists yet.");
            }
            Flux flux = Flux.fromIterable(listStoredCheckpoints);
            return flux;
        }
    }

    public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        byte[] prefix = JedisCheckpointStore.prefixBuilder(fullyQualifiedNamespace, eventHubName, consumerGroup);
        try (Jedis jedis = this.jedisPool.getResource();){
            Set members = jedis.smembers(prefix);
            ArrayList<PartitionOwnership> listStoredOwnerships = new ArrayList<PartitionOwnership>();
            if (members.isEmpty()) {
                Flux flux = Flux.fromIterable(listStoredOwnerships);
                return flux;
            }
            for (byte[] member : members) {
                List partitionOwnershipJsonList = jedis.hmget(member, (byte[][])new byte[][]{PARTITION_OWNERSHIP});
                if (partitionOwnershipJsonList.isEmpty()) continue;
                byte[] partitionOwnershipJson = (byte[])partitionOwnershipJsonList.get(0);
                if (partitionOwnershipJson == null) {
                    JedisCheckpointStore.addEventHubInformation(LOGGER.atVerbose(), fullyQualifiedNamespace, eventHubName, consumerGroup).log("No partition ownership records exist for this checkpoint yet.");
                    continue;
                }
                PartitionOwnership partitionOwnership = (PartitionOwnership)DEFAULT_SERIALIZER.deserializeFromBytes(partitionOwnershipJson, TypeReference.createInstance(PartitionOwnership.class));
                listStoredOwnerships.add(partitionOwnership);
            }
            Flux flux = Flux.fromIterable(listStoredOwnerships);
            return flux;
        }
    }

    public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
        if (Objects.isNull(checkpoint)) {
            return Mono.error((Throwable)new NullPointerException("'checkpoint' cannot be null."));
        }
        if (!JedisCheckpointStore.isCheckpointValid(checkpoint).booleanValue()) {
            return FluxUtil.monoError((LoggingEventBuilder)JedisCheckpointStore.addEventHubInformation(LOGGER.atError(), checkpoint.getFullyQualifiedNamespace(), checkpoint.getEventHubName(), checkpoint.getConsumerGroup()).addKeyValue(PARTITION_ID_KEY, checkpoint.getPartitionId()), (RuntimeException)new IllegalArgumentException("Checkpoint is either null, or both the offset and the sequence number are null."));
        }
        return Mono.fromRunnable(() -> {
            byte[] prefix = JedisCheckpointStore.prefixBuilder(checkpoint.getFullyQualifiedNamespace(), checkpoint.getEventHubName(), checkpoint.getConsumerGroup());
            byte[] key = JedisCheckpointStore.keyBuilder(checkpoint.getFullyQualifiedNamespace(), checkpoint.getEventHubName(), checkpoint.getConsumerGroup(), checkpoint.getPartitionId());
            try (Jedis jedis = this.jedisPool.getResource();){
                if (!jedis.exists(prefix) || !jedis.exists(key)) {
                    jedis.sadd(prefix, (byte[][])new byte[][]{key});
                    jedis.hset(key, CHECKPOINT, DEFAULT_SERIALIZER.serializeToBytes((Object)checkpoint));
                } else {
                    jedis.hset(key, CHECKPOINT, DEFAULT_SERIALIZER.serializeToBytes((Object)checkpoint));
                }
            }
        });
    }

    static byte[] prefixBuilder(String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        return (fullyQualifiedNamespace + "/" + eventHubName + "/" + consumerGroup).getBytes(StandardCharsets.UTF_8);
    }

    static byte[] keyBuilder(String fullyQualifiedNamespace, String eventHubName, String consumerGroup, String partitionId) {
        return (fullyQualifiedNamespace + "/" + eventHubName + "/" + consumerGroup + "/" + partitionId).getBytes(StandardCharsets.UTF_8);
    }

    private static Boolean isCheckpointValid(Checkpoint checkpoint) {
        return checkpoint.getOffset() != null || checkpoint.getSequenceNumber() != null;
    }

    private static LoggingEventBuilder addEventHubInformation(LoggingEventBuilder builder, String fullyQualifiedNamespace, String eventHubName, String consumerGroup) {
        return builder.addKeyValue("hostName", fullyQualifiedNamespace).addKeyValue("entityName", eventHubName).addKeyValue("consumerGroup", consumerGroup);
    }

    private static AzureException createClaimPartitionException(String fullyQualifiedNamespace, String eventHubName, String consumerGroup, String partitionId, String message) {
        AzureException exception = new AzureException("Unable to claim partition: " + partitionId + ". " + message);
        JedisCheckpointStore.addEventHubInformation(LOGGER.atInfo(), fullyQualifiedNamespace, eventHubName, consumerGroup).addKeyValue(PARTITION_ID_KEY, partitionId).log("Unable to claim partition. ", new Object[]{exception});
        return exception;
    }
}

