/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import io.streamnative.pulsar.handlers.kop.coordinator.transaction.ProducerIdManager;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarStorageProducerIdManagerImpl
implements ProducerIdManager {
    private static final Logger log = LoggerFactory.getLogger(PulsarStorageProducerIdManagerImpl.class);
    static final int BLOCK_SIZE = 1000;
    private final AtomicLong nextId = new AtomicLong(0L);
    private final String topic;
    private final int blockSize;
    private final PulsarClient pulsarClient;
    private CompletableFuture<Reader<byte[]>> reader;
    private CompletableFuture<Void> currentReadHandle;
    private final ConcurrentLinkedQueue<Long> availableIdsLocally = new ConcurrentLinkedQueue();

    private synchronized CompletableFuture<Reader<byte[]>> ensureReaderHandle() {
        if (this.reader == null) {
            this.reader = this.pulsarClient.newReader().topic(this.topic).startMessageId(MessageId.earliest).readCompacted(true).createAsync();
        }
        return this.reader;
    }

    private CompletableFuture<Void> readNextMessageIfAvailable(Reader<byte[]> reader) {
        return reader.hasMessageAvailableAsync().thenCompose(hasMessageAvailable -> {
            if (hasMessageAvailable == null || !hasMessageAvailable.booleanValue()) {
                return CompletableFuture.completedFuture(null);
            }
            CompletableFuture opMessage = reader.readNextAsync();
            return opMessage.thenCompose(msg -> {
                byte[] value = (byte[])msg.getValue();
                long newId = (Long)Schema.INT64.decode(value);
                if (log.isDebugEnabled()) {
                    log.debug("{} Read {} from {}", new Object[]{this, newId, msg.getMessageId()});
                }
                this.nextId.set(newId);
                return this.readNextMessageIfAvailable(reader);
            });
        });
    }

    private synchronized CompletableFuture<Void> ensureLatestData(boolean beforeWrite) {
        if (this.currentReadHandle != null) {
            if (beforeWrite) {
                if (log.isDebugEnabled()) {
                    log.debug("A read was already pending, starting a new one in order to ensure consistency");
                }
                return this.currentReadHandle.thenCompose(___ -> this.ensureLatestData(false));
            }
            return this.currentReadHandle;
        }
        CompletableFuture<Reader<byte[]>> readerHandle = this.ensureReaderHandle();
        CompletionStage newReadHandle = readerHandle.thenCompose(this::readNextMessageIfAvailable);
        this.currentReadHandle = newReadHandle;
        return ((CompletableFuture)newReadHandle).thenApply(arg_0 -> this.lambda$ensureLatestData$3((CompletableFuture)newReadHandle, arg_0));
    }

    private synchronized void endReadLoop(CompletableFuture<?> handle) {
        if (handle == this.currentReadHandle) {
            this.currentReadHandle = null;
        }
    }

    @Override
    public synchronized CompletableFuture<Long> generateProducerId() {
        Long booked = this.availableIdsLocally.poll();
        if (booked != null) {
            if (log.isDebugEnabled()) {
                log.debug("Returning pre-allocated id {} for {}", (Object)booked, (Object)this.topic);
            }
            return CompletableFuture.completedFuture(booked);
        }
        if (log.isDebugEnabled()) {
            log.debug("Allocating new block of ids for {}", (Object)this.topic);
        }
        CompletableFuture producerHandle = this.pulsarClient.newProducer().enableBatching(false).topic(this.topic).accessMode(ProducerAccessMode.WaitForExclusive).blockIfQueueFull(true).createAsync();
        return producerHandle.thenCompose(opProducer -> {
            CompletionStage dummy = this.ensureLatestData(true).thenCompose(___ -> {
                long start = this.nextId.get();
                List<Long> block = PulsarStorageProducerIdManagerImpl.generateBlock(start + (long)this.blockSize, this.blockSize);
                long nextAvailableId = start + (long)this.blockSize;
                byte[] serialized = Schema.INT64.encode((Object)nextAvailableId);
                return opProducer.newMessage().key("").value((Object)serialized).sendAsync().thenApply(msgId -> {
                    if (log.isDebugEnabled()) {
                        log.debug("{} written {} as {}", new Object[]{this, nextAvailableId, msgId});
                    }
                    this.nextId.set(nextAvailableId);
                    this.availableIdsLocally.addAll(block);
                    long result = (Long)this.availableIdsLocally.remove();
                    if (log.isDebugEnabled()) {
                        log.debug("Returning allocated id {} for {}, new range: {}-{}", new Object[]{result, this.topic, start, nextAvailableId});
                    }
                    return result;
                });
            });
            return ((CompletableFuture)dummy).whenComplete((___, err) -> opProducer.closeAsync().whenComplete((____, errorClose) -> {
                if (errorClose != null) {
                    log.error("Error closing producer for {}", (Object)this.topic, errorClose);
                }
            }));
        });
    }

    private static List<Long> generateBlock(long start, int blockSize) {
        return LongStream.range(start, start + (long)blockSize).boxed().collect(Collectors.toList());
    }

    public PulsarStorageProducerIdManagerImpl(String topicName, PulsarClient pulsarClient, int blockSize) {
        this.topic = topicName;
        this.pulsarClient = pulsarClient;
        this.blockSize = blockSize;
    }

    public PulsarStorageProducerIdManagerImpl(String topicName, PulsarClient pulsarClient) {
        this(topicName, pulsarClient, 1000);
    }

    @Override
    public CompletableFuture<Void> initialize() {
        return this.ensureLatestData(false);
    }

    @Override
    public synchronized void shutdown() {
        if (this.reader != null) {
            this.reader.whenComplete((r, e) -> {
                if (r != null) {
                    r.closeAsync().whenComplete((___, err) -> {
                        if (err != null) {
                            log.error("Error closing reader for {}", (Object)this.topic, err);
                        }
                    });
                }
            });
        }
    }

    private /* synthetic */ Void lambda$ensureLatestData$3(CompletableFuture newReadHandle, Void __) {
        this.endReadLoop(newReadHandle);
        return null;
    }
}

