/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.ProducerIdManager;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.KafkaException;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerIdManagerImpl
implements ProducerIdManager {
    private static final Logger log = LoggerFactory.getLogger(ProducerIdManagerImpl.class);
    private static final Long currentVersion = 1L;
    public static final Long PID_BLOCK_SIZE = 1000L;
    public static final String KOP_PID_BLOCK_ZNODE = "/kop_latest_producer_id_block";
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private final int brokerId;
    private final MetadataStoreExtended metadataStore;
    private ProducerIdBlock currentProducerIdBlock;
    private Long nextProducerId = -1L;
    private CompletableFuture<Void> newProducerIdBlockFuture;

    public ProducerIdManagerImpl(int brokerId, MetadataStoreExtended metadataStore) {
        this.brokerId = brokerId;
        this.metadataStore = metadataStore;
    }

    public static byte[] generateProducerIdBlockJson(ProducerIdBlock producerIdBlock) throws JsonProcessingException {
        HashMap dataMap = Maps.newHashMap();
        dataMap.put("version", currentVersion);
        dataMap.put("broker", producerIdBlock.brokerId);
        dataMap.put("block_start", producerIdBlock.blockStartId);
        dataMap.put("block_end", producerIdBlock.blockEndId);
        return objectMapper.writeValueAsBytes((Object)dataMap);
    }

    public static ProducerIdBlock parseProducerIdBlockData(byte[] bytes) throws IOException {
        JsonNode jsonNode = objectMapper.readTree(bytes);
        return ProducerIdBlock.builder().brokerId(jsonNode.get("broker").asInt()).blockStartId(jsonNode.get("block_start").asLong()).blockEndId(jsonNode.get("block_end").asLong()).build();
    }

    public synchronized CompletableFuture<Void> getNewProducerIdBlock() {
        if (this.newProducerIdBlockFuture != null && !this.newProducerIdBlockFuture.isDone()) {
            return this.newProducerIdBlockFuture;
        }
        this.newProducerIdBlockFuture = new CompletableFuture();
        ((CompletableFuture)this.getCurrentDataAndVersion().thenAccept(currentDataAndVersionOpt -> {
            ProducerIdManagerImpl producerIdManagerImpl = this;
            synchronized (producerIdManagerImpl) {
                ProducerIdBlock nextProducerIdBlock;
                if (currentDataAndVersionOpt.isPresent() && ((DataAndVersion)currentDataAndVersionOpt.get()).getData() != null) {
                    DataAndVersion dataAndVersion = (DataAndVersion)currentDataAndVersionOpt.get();
                    try {
                        ProducerIdBlock currProducerIdBlock = ProducerIdManagerImpl.parseProducerIdBlockData(dataAndVersion.getData());
                        if (currProducerIdBlock.blockEndId > Long.MAX_VALUE - PID_BLOCK_SIZE) {
                            log.error("Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit (current block end producerId is {})", (Object)currProducerIdBlock.blockEndId);
                            this.newProducerIdBlockFuture.completeExceptionally(new KafkaException("Have exhausted all producerIds."));
                            return;
                        }
                        nextProducerIdBlock = ProducerIdBlock.builder().brokerId(this.brokerId).blockStartId(currProducerIdBlock.blockEndId + 1L).blockEndId(currProducerIdBlock.blockEndId + PID_BLOCK_SIZE).build();
                    }
                    catch (IOException e) {
                        this.newProducerIdBlockFuture.completeExceptionally(new KafkaException("Get producerId failed.", (Throwable)e));
                        return;
                    }
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("There is no producerId block yet, creating the first block");
                    }
                    nextProducerIdBlock = ProducerIdBlock.builder().brokerId(this.brokerId).blockStartId(0L).blockEndId(PID_BLOCK_SIZE - 1L).build();
                }
                try {
                    byte[] newProducerIdBlockData = ProducerIdManagerImpl.generateProducerIdBlockJson(nextProducerIdBlock);
                    ((CompletableFuture)this.conditionalUpdateData(newProducerIdBlockData, currentDataAndVersionOpt.orElse(DataAndVersion.DEFAULT_VERSION).getVersion()).thenAccept(version -> {
                        ProducerIdManagerImpl producerIdManagerImpl = this;
                        synchronized (producerIdManagerImpl) {
                            this.currentProducerIdBlock = nextProducerIdBlock;
                            this.nextProducerId = nextProducerIdBlock.blockStartId;
                            this.newProducerIdBlockFuture.complete(null);
                        }
                    })).exceptionally(ex -> {
                        ProducerIdManagerImpl producerIdManagerImpl = this;
                        synchronized (producerIdManagerImpl) {
                            this.newProducerIdBlockFuture.completeExceptionally((Throwable)ex);
                        }
                        return null;
                    });
                }
                catch (JsonProcessingException e) {
                    this.newProducerIdBlockFuture.completeExceptionally(e);
                }
            }
        })).exceptionally(ex -> {
            ProducerIdManagerImpl producerIdManagerImpl = this;
            synchronized (producerIdManagerImpl) {
                this.newProducerIdBlockFuture.completeExceptionally((Throwable)ex);
            }
            return null;
        });
        return this.newProducerIdBlockFuture;
    }

    @Override
    public CompletableFuture<Void> initialize() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        ((CompletableFuture)this.getNewProducerIdBlock().thenAccept(__ -> {
            ProducerIdManagerImpl producerIdManagerImpl = this;
            synchronized (producerIdManagerImpl) {
                this.nextProducerId = this.currentProducerIdBlock.blockStartId;
            }
            future.complete(null);
        })).exceptionally(throwable -> {
            future.completeExceptionally((Throwable)throwable);
            return null;
        });
        return future;
    }

    @Override
    public synchronized CompletableFuture<Long> generateProducerId() {
        CompletableFuture<Long> nextProducerIdFuture = new CompletableFuture<Long>();
        if (this.nextProducerId > this.currentProducerIdBlock.blockEndId) {
            ((CompletableFuture)this.getNewProducerIdBlock().thenAccept(__ -> {
                ProducerIdManagerImpl producerIdManagerImpl = this;
                synchronized (producerIdManagerImpl) {
                    if (this.nextProducerId > this.currentProducerIdBlock.blockEndId) {
                        IllegalStateException ex = new IllegalStateException("New ProducerIdBlock exhausted. Try again.");
                        nextProducerIdFuture.completeExceptionally(ex);
                    } else {
                        this.nextProducerId = this.nextProducerId + 1L;
                        nextProducerIdFuture.complete(this.nextProducerId - 1L);
                    }
                }
            })).exceptionally(ex -> {
                nextProducerIdFuture.completeExceptionally((Throwable)ex);
                return null;
            });
        } else {
            this.nextProducerId = this.nextProducerId + 1L;
            nextProducerIdFuture.complete(this.nextProducerId - 1L);
        }
        return nextProducerIdFuture;
    }

    private CompletableFuture<Long> conditionalUpdateData(byte[] data, long expectVersion) {
        CompletableFuture<Long> updateFuture = new CompletableFuture<Long>();
        ((CompletableFuture)this.metadataStore.put(KOP_PID_BLOCK_ZNODE, data, Optional.of(expectVersion)).thenAccept(stat -> {
            if (log.isDebugEnabled()) {
                log.debug("ConditionalUpdateData Expect version: {}, stat version: {}", (Object)expectVersion, (Object)stat.getVersion());
            }
            updateFuture.complete(stat.getVersion());
        })).exceptionally(ex -> {
            if (ex.getCause() instanceof MetadataStoreException.BadVersionException) {
                ((CompletableFuture)this.checkProducerIdBlockMetadata(data).thenAccept(updateFuture::complete)).exceptionally(e -> {
                    updateFuture.completeExceptionally((Throwable)e);
                    return null;
                });
            } else if (ex.getCause() instanceof MetadataStoreException.NotFoundException) {
                log.error("Update of path {} with data {} and expected version {} failed due to {}", new Object[]{KOP_PID_BLOCK_ZNODE, this.getProducerIdBlockStr(data), expectVersion, "NoNode for path /kop_latest_producer_id_block"});
                updateFuture.completeExceptionally(new Exception("NoNode for path /kop_latest_producer_id_block"));
            } else {
                log.error("Update of path {} with data {} and expected version {} exception: {}", new Object[]{KOP_PID_BLOCK_ZNODE, this.getProducerIdBlockStr(data), expectVersion, ex.getCause()});
                updateFuture.completeExceptionally(new Exception("Error to update data.", ex.getCause()));
            }
            return null;
        });
        return updateFuture;
    }

    private CompletableFuture<Long> checkProducerIdBlockMetadata(byte[] expectedData) {
        CompletableFuture<Long> future = new CompletableFuture<Long>();
        try {
            ProducerIdBlock expectedPidBlock = ProducerIdManagerImpl.parseProducerIdBlockData(expectedData);
            ((CompletableFuture)this.getCurrentDataAndVersion().thenAccept(dataAndVersionOpt -> {
                if (dataAndVersionOpt.isPresent()) {
                    DataAndVersion dataAndVersion = (DataAndVersion)dataAndVersionOpt.get();
                    byte[] data = dataAndVersion.getData();
                    try {
                        ProducerIdBlock producerIdBlock = ProducerIdManagerImpl.parseProducerIdBlockData(data);
                        if (expectedPidBlock.equals(producerIdBlock)) {
                            long version = dataAndVersion.getVersion();
                            future.complete(version);
                            return;
                        }
                    }
                    catch (IOException e) {
                        future.completeExceptionally(e);
                        return;
                    }
                    future.complete(((DataAndVersion)dataAndVersionOpt.get()).getVersion());
                } else {
                    future.completeExceptionally(new Exception("ProducerId is not present !"));
                }
            })).exceptionally(ex -> {
                future.completeExceptionally((Throwable)ex);
                return null;
            });
        }
        catch (IOException e) {
            log.warn("Error while checking for producerId block Zk data on path {}: expected data {}", new Object[]{KOP_PID_BLOCK_ZNODE, this.getProducerIdBlockStr(expectedData), e});
            future.completeExceptionally(e);
        }
        return future;
    }

    private CompletableFuture<Optional<DataAndVersion>> getCurrentDataAndVersion() {
        CompletableFuture<Optional<DataAndVersion>> future = new CompletableFuture<Optional<DataAndVersion>>();
        ((CompletableFuture)this.metadataStore.get(KOP_PID_BLOCK_ZNODE).thenAccept(resultOpt -> {
            if (resultOpt.isPresent()) {
                GetResult getResult = (GetResult)resultOpt.get();
                future.complete(Optional.of(new DataAndVersion(getResult.getValue(), getResult.getStat().getVersion())));
            } else {
                future.complete(Optional.empty());
            }
        })).exceptionally(ex -> {
            future.completeExceptionally((Throwable)ex);
            return null;
        });
        return future;
    }

    private String getProducerIdBlockStr(byte[] bytes) {
        return new String(bytes, StandardCharsets.UTF_8);
    }

    @Override
    public void shutdown() {
        log.info("Shutdown complete: last producerId assigned {}", (Object)this.nextProducerId);
    }

    public static class ProducerIdBlock {
        private Integer brokerId;
        private Long blockStartId;
        private Long blockEndId;

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ProducerIdBlock that = (ProducerIdBlock)o;
            return Objects.equals(this.brokerId, that.brokerId) && Objects.equals(this.blockStartId, that.blockStartId) && Objects.equals(this.blockEndId, that.blockEndId);
        }

        public int hashCode() {
            return Objects.hash(this.brokerId, this.blockStartId, this.blockEndId);
        }

        public static ProducerIdBlockBuilder builder() {
            return new ProducerIdBlockBuilder();
        }

        public Integer getBrokerId() {
            return this.brokerId;
        }

        public Long getBlockStartId() {
            return this.blockStartId;
        }

        public Long getBlockEndId() {
            return this.blockEndId;
        }

        public void setBrokerId(Integer brokerId) {
            this.brokerId = brokerId;
        }

        public void setBlockStartId(Long blockStartId) {
            this.blockStartId = blockStartId;
        }

        public void setBlockEndId(Long blockEndId) {
            this.blockEndId = blockEndId;
        }

        public ProducerIdBlock(Integer brokerId, Long blockStartId, Long blockEndId) {
            this.brokerId = brokerId;
            this.blockStartId = blockStartId;
            this.blockEndId = blockEndId;
        }

        public String toString() {
            return "ProducerIdManagerImpl.ProducerIdBlock(brokerId=" + this.getBrokerId() + ", blockStartId=" + this.getBlockStartId() + ", blockEndId=" + this.getBlockEndId() + ")";
        }

        public static class ProducerIdBlockBuilder {
            private Integer brokerId;
            private Long blockStartId;
            private Long blockEndId;

            ProducerIdBlockBuilder() {
            }

            public ProducerIdBlockBuilder brokerId(Integer brokerId) {
                this.brokerId = brokerId;
                return this;
            }

            public ProducerIdBlockBuilder blockStartId(Long blockStartId) {
                this.blockStartId = blockStartId;
                return this;
            }

            public ProducerIdBlockBuilder blockEndId(Long blockEndId) {
                this.blockEndId = blockEndId;
                return this;
            }

            public ProducerIdBlock build() {
                return new ProducerIdBlock(this.brokerId, this.blockStartId, this.blockEndId);
            }

            public String toString() {
                return "ProducerIdManagerImpl.ProducerIdBlock.ProducerIdBlockBuilder(brokerId=" + this.brokerId + ", blockStartId=" + this.blockStartId + ", blockEndId=" + this.blockEndId + ")";
            }
        }
    }

    private static class DataAndVersion {
        private byte[] data;
        private long version;
        public static final DataAndVersion DEFAULT_VERSION = new DataAndVersion(null, -1L);

        public byte[] getData() {
            return this.data;
        }

        public long getVersion() {
            return this.version;
        }

        public void setData(byte[] data) {
            this.data = data;
        }

        public void setVersion(long version) {
            this.version = version;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof DataAndVersion)) {
                return false;
            }
            DataAndVersion other = (DataAndVersion)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getVersion() != other.getVersion()) {
                return false;
            }
            return Arrays.equals(this.getData(), other.getData());
        }

        protected boolean canEqual(Object other) {
            return other instanceof DataAndVersion;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            long $version = this.getVersion();
            result = result * 59 + (int)($version >>> 32 ^ $version);
            result = result * 59 + Arrays.hashCode(this.getData());
            return result;
        }

        public String toString() {
            return "ProducerIdManagerImpl.DataAndVersion(data=" + Arrays.toString(this.getData()) + ", version=" + this.getVersion() + ")";
        }

        public DataAndVersion(byte[] data, long version) {
            this.data = data;
            this.version = version;
        }
    }
}

