/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.storage;

import com.google.common.collect.Maps;
import io.streamnative.pulsar.handlers.kop.storage.AbortedTxn;
import io.streamnative.pulsar.handlers.kop.storage.CompletedTxn;
import io.streamnative.pulsar.handlers.kop.storage.PartitionLog;
import io.streamnative.pulsar.handlers.kop.storage.ProducerAppendInfo;
import io.streamnative.pulsar.handlers.kop.storage.ProducerStateEntry;
import io.streamnative.pulsar.handlers.kop.storage.TxnMetadata;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.kafka.common.requests.FetchResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerStateManager {
    private static final Logger log = LoggerFactory.getLogger(ProducerStateManager.class);
    private final String topicPartition;
    private final Map<Long, ProducerStateEntry> producers = Maps.newConcurrentMap();
    private final TreeMap<Long, TxnMetadata> ongoingTxns = Maps.newTreeMap();
    private final List<AbortedTxn> abortedIndexList = new ArrayList<AbortedTxn>();

    public ProducerStateManager(String topicPartition) {
        this.topicPartition = topicPartition;
    }

    public ProducerAppendInfo prepareUpdate(Long producerId, PartitionLog.AppendOrigin origin) {
        ProducerStateEntry currentEntry = this.lastEntry(producerId).orElse(ProducerStateEntry.empty(producerId));
        return new ProducerAppendInfo(this.topicPartition, producerId, currentEntry, origin);
    }

    public long lastStableOffset(CompletedTxn completedTxn) {
        for (TxnMetadata txnMetadata : this.ongoingTxns.values()) {
            if (completedTxn.producerId().equals(txnMetadata.producerId())) continue;
            return txnMetadata.firstOffset();
        }
        return completedTxn.lastOffset() + 1L;
    }

    public Optional<Long> firstUndecidedOffset() {
        Map.Entry<Long, TxnMetadata> entry = this.ongoingTxns.firstEntry();
        if (entry == null) {
            return Optional.empty();
        }
        return Optional.of(entry.getValue().firstOffset());
    }

    public Optional<ProducerStateEntry> lastEntry(Long producerId) {
        if (!this.producers.containsKey(producerId)) {
            return Optional.empty();
        }
        return Optional.of(this.producers.get(producerId));
    }

    public void update(ProducerAppendInfo appendInfo) {
        if (log.isDebugEnabled()) {
            log.debug("Updated producer {} state to {}", (Object)appendInfo.producerId(), (Object)appendInfo);
        }
        if (appendInfo.producerId() == -1L) {
            throw new IllegalArgumentException(String.format("Invalid producer id %s passed to update for %s", appendInfo.producerId(), this.topicPartition));
        }
        ProducerStateEntry updatedEntry = appendInfo.toEntry();
        this.producers.compute(appendInfo.producerId(), (pid, stateEntry) -> {
            if (stateEntry == null) {
                stateEntry = updatedEntry;
            } else {
                stateEntry.update(updatedEntry);
            }
            return stateEntry;
        });
        for (TxnMetadata txn : appendInfo.startedTransactions()) {
            this.ongoingTxns.put(txn.firstOffset(), txn);
        }
    }

    public void updateTxnIndex(CompletedTxn completedTxn, long lastStableOffset) {
        if (completedTxn.isAborted().booleanValue()) {
            this.abortedIndexList.add(new AbortedTxn(completedTxn.producerId(), completedTxn.firstOffset(), completedTxn.lastOffset(), lastStableOffset));
        }
    }

    public void completeTxn(CompletedTxn completedTxn) {
        TxnMetadata txnMetadata = this.ongoingTxns.remove(completedTxn.firstOffset());
        if (txnMetadata == null) {
            String msg = String.format("Attempted to complete transaction %s on partition %s which was not started.", completedTxn, this.topicPartition);
            throw new IllegalArgumentException(msg);
        }
    }

    public List<FetchResponse.AbortedTransaction> getAbortedIndexList(long fetchOffset) {
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        for (AbortedTxn abortedTxn : this.abortedIndexList) {
            if (abortedTxn.lastOffset() < fetchOffset) continue;
            abortedTransactions.add(new FetchResponse.AbortedTransaction(abortedTxn.producerId().longValue(), abortedTxn.firstOffset().longValue()));
        }
        return abortedTransactions;
    }
}

