/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.storage;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.streamnative.pulsar.handlers.kop.DelayedFetch;
import io.streamnative.pulsar.handlers.kop.DelayedProduceAndFetch;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.MessageFetchContext;
import io.streamnative.pulsar.handlers.kop.RequestStats;
import io.streamnative.pulsar.handlers.kop.storage.AppendRecordsContext;
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
import io.streamnative.pulsar.handlers.kop.storage.PartitionLogManager;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperation;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationKey;
import io.streamnative.pulsar.handlers.kop.utils.delayed.DelayedOperationPurgatory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicaManager {
    private static final Logger log = LoggerFactory.getLogger(ReplicaManager.class);
    private final PartitionLogManager logManager;
    private final DelayedOperationPurgatory<DelayedOperation> producePurgatory;
    private final DelayedOperationPurgatory<DelayedOperation> fetchPurgatory;
    private final String metadataNamespace;

    public ReplicaManager(KafkaServiceConfiguration kafkaConfig, RequestStats requestStats, Time time, ImmutableMap<String, EntryFilterWithClassLoader> entryfilterMap, DelayedOperationPurgatory<DelayedOperation> producePurgatory, DelayedOperationPurgatory<DelayedOperation> fetchPurgatory) {
        this.logManager = new PartitionLogManager(kafkaConfig, requestStats, entryfilterMap, time);
        this.producePurgatory = producePurgatory;
        this.fetchPurgatory = fetchPurgatory;
        this.metadataNamespace = kafkaConfig.getKafkaMetadataNamespace();
    }

    public PartitionLog getPartitionLog(TopicPartition topicPartition, String namespacePrefix) {
        return this.logManager.getLog(topicPartition, namespacePrefix);
    }

    public void removePartitionLog(String topicName) {
        PartitionLog partitionLog = this.logManager.removeLog(topicName);
        if (log.isDebugEnabled() && partitionLog != null) {
            log.debug("PartitionLog: {} has bean removed.", (Object)partitionLog);
        }
    }

    @VisibleForTesting
    public int size() {
        return this.logManager.size();
    }

    public CompletableFuture<Map<TopicPartition, ProduceResponse.PartitionResponse>> appendRecords(long timeout, boolean internalTopicsAllowed, String namespacePrefix, Map<TopicPartition, MemoryRecords> entriesPerPartition, PartitionLog.AppendOrigin origin, AppendRecordsContext appendRecordsContext) {
        CompletableFuture<Map<TopicPartition, ProduceResponse.PartitionResponse>> completableFuture = new CompletableFuture<Map<TopicPartition, ProduceResponse.PartitionResponse>>();
        AtomicInteger topicPartitionNum = new AtomicInteger(entriesPerPartition.size());
        ConcurrentHashMap<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new ConcurrentHashMap<TopicPartition, ProduceResponse.PartitionResponse>();
        PendingProduceCallback complete = new PendingProduceCallback(topicPartitionNum, responseMap, completableFuture, entriesPerPartition);
        BiConsumer<TopicPartition, ProduceResponse.PartitionResponse> addPartitionResponse = (topicPartition, response) -> {
            responseMap.put((TopicPartition)topicPartition, (ProduceResponse.PartitionResponse)response);
            int restTopicPartitionNum = topicPartitionNum.decrementAndGet();
            if (restTopicPartitionNum < 0) {
                return;
            }
            if (restTopicPartitionNum == 0) {
                complete.run();
            }
        };
        entriesPerPartition.forEach((topicPartition, memoryRecords) -> {
            String fullPartitionName = KopTopic.toString(topicPartition, namespacePrefix);
            if (!internalTopicsAllowed && KopTopic.isInternalTopic(fullPartitionName, this.metadataNamespace)) {
                addPartitionResponse.accept((TopicPartition)topicPartition, new ProduceResponse.PartitionResponse(Errors.forException((Throwable)new InvalidTopicException(String.format("Cannot append to internal topic %s", topicPartition.topic())))));
            } else {
                PartitionLog partitionLog = this.getPartitionLog((TopicPartition)topicPartition, namespacePrefix);
                ((CompletableFuture)partitionLog.appendRecords((MemoryRecords)memoryRecords, origin, appendRecordsContext).thenAccept(offset -> addPartitionResponse.accept((TopicPartition)topicPartition, new ProduceResponse.PartitionResponse(Errors.NONE, offset.longValue(), -1L, -1L)))).exceptionally(ex -> {
                    addPartitionResponse.accept((TopicPartition)topicPartition, new ProduceResponse.PartitionResponse(Errors.forException((Throwable)ex.getCause())));
                    return null;
                });
            }
        });
        if (timeout <= 0L) {
            complete.run();
        } else {
            List<Object> delayedCreateKeys = entriesPerPartition.keySet().stream().map(DelayedOperationKey.TopicPartitionOperationKey::new).collect(Collectors.toList());
            DelayedProduceAndFetch delayedProduce = new DelayedProduceAndFetch(timeout, topicPartitionNum, complete);
            this.producePurgatory.tryCompleteElseWatch(delayedProduce, delayedCreateKeys);
        }
        return completableFuture;
    }

    public CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> fetchMessage(long timeout, int fetchMinBytes, int fetchMaxBytes, ConcurrentHashMap<TopicPartition, FetchRequest.PartitionData> fetchInfos, IsolationLevel isolationLevel, MessageFetchContext context) {
        CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> future = new CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>>();
        boolean readCommitted = context.getTc() != null && isolationLevel.equals((Object)IsolationLevel.READ_COMMITTED);
        long startTime = SystemTime.SYSTEM.hiResClockMs();
        this.readFromLocalLog(readCommitted, fetchMaxBytes, context.getMaxReadEntriesNum(), fetchInfos, context).thenAccept(readResults -> {
            MutableLong bytesReadable = new MutableLong(0L);
            MutableBoolean errorReadingData = new MutableBoolean(false);
            readResults.forEach((topicPartition, readRecordsResult) -> {
                if (readRecordsResult.errors() != Errors.NONE) {
                    errorReadingData.setTrue();
                }
                if (readRecordsResult.decodeResult() != null) {
                    bytesReadable.addAndGet((long)readRecordsResult.decodeResult().getRecords().sizeInBytes());
                }
            });
            long now = SystemTime.SYSTEM.hiResClockMs();
            long currentWait = now - startTime;
            long remainingMaxWait = timeout - currentWait;
            long maxWait = Math.min(remainingMaxWait, timeout);
            if (maxWait <= 0L || fetchInfos.isEmpty() || bytesReadable.longValue() >= (long)fetchMinBytes || errorReadingData.booleanValue()) {
                future.complete((Map<TopicPartition, PartitionLog.ReadRecordsResult>)readResults);
                return;
            }
            List<Object> delayedFetchKeys = fetchInfos.keySet().stream().map(DelayedOperationKey.TopicPartitionOperationKey::new).collect(Collectors.toList());
            DelayedFetch delayedFetch = new DelayedFetch(maxWait, fetchMaxBytes, bytesReadable.getValue(), readCommitted, context, this, (Map<TopicPartition, FetchRequest.PartitionData>)fetchInfos, (Map<TopicPartition, PartitionLog.ReadRecordsResult>)readResults, future);
            this.fetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys);
        });
        return future;
    }

    public CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> readFromLocalLog(boolean readCommitted, int fetchMaxBytes, int maxReadEntriesNum, Map<TopicPartition, FetchRequest.PartitionData> readPartitionInfo, MessageFetchContext context) {
        AtomicLong limitBytes = new AtomicLong(fetchMaxBytes);
        CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>> resultFuture = new CompletableFuture<Map<TopicPartition, PartitionLog.ReadRecordsResult>>();
        ConcurrentHashMap result = new ConcurrentHashMap();
        AtomicInteger restTopicPartitionNeedRead = new AtomicInteger(readPartitionInfo.size());
        Runnable complete = () -> {
            if (restTopicPartitionNeedRead.decrementAndGet() == 0) {
                resultFuture.complete(result);
            }
        };
        readPartitionInfo.forEach((tp, fetchInfo) -> this.getPartitionLog((TopicPartition)tp, context.getNamespacePrefix()).readRecords((FetchRequest.PartitionData)fetchInfo, readCommitted, limitBytes, maxReadEntriesNum, context).thenAccept(readResult -> {
            result.put(tp, readResult);
            complete.run();
        }));
        return resultFuture;
    }

    public void tryCompleteDelayedFetch(DelayedOperationKey key) {
        int completed = this.fetchPurgatory.checkAndComplete(key);
        if (log.isDebugEnabled()) {
            log.debug("Request key {} unblocked {} fetch requests.", (Object)key.keyLabel(), (Object)completed);
        }
    }

    private static class PendingProduceCallback
    implements Runnable {
        final AtomicInteger topicPartitionNum;
        Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap;
        final CompletableFuture<Map<TopicPartition, ProduceResponse.PartitionResponse>> completableFuture;
        Map<TopicPartition, MemoryRecords> entriesPerPartition;

        @Override
        public void run() {
            this.topicPartitionNum.set(0);
            if (this.completableFuture.isDone()) {
                return;
            }
            this.entriesPerPartition.keySet().forEach(topicPartition -> {
                if (!this.responseMap.containsKey(topicPartition)) {
                    this.responseMap.put((TopicPartition)topicPartition, new ProduceResponse.PartitionResponse(Errors.REQUEST_TIMED_OUT));
                }
            });
            if (log.isDebugEnabled()) {
                log.debug("Complete handle appendRecords.");
            }
            this.completableFuture.complete(this.responseMap);
            this.responseMap = null;
            this.entriesPerPartition = null;
        }

        public PendingProduceCallback(AtomicInteger topicPartitionNum, Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap, CompletableFuture<Map<TopicPartition, ProduceResponse.PartitionResponse>> completableFuture, Map<TopicPartition, MemoryRecords> entriesPerPartition) {
            this.topicPartitionNum = topicPartitionNum;
            this.responseMap = responseMap;
            this.completableFuture = completableFuture;
            this.entriesPerPartition = entriesPerPartition;
        }
    }
}

