/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.SystemTopicClient;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionCoordinator;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionLogKey;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionLogValue;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionStateManager {
    private static final Logger log = LoggerFactory.getLogger(TransactionStateManager.class);
    private final TransactionConfig transactionConfig;
    private final SystemTopicClient txnTopicClient;
    private final ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock();
    private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
    private final int transactionTopicPartitionCount;
    @VisibleForTesting
    protected final Set<Integer> loadingPartitions = Sets.newHashSet();
    @VisibleForTesting
    protected final Set<Integer> leavingPartitions = Sets.newHashSet();
    private final Map<Integer, CompletableFuture<Producer<ByteBuffer>>> txnLogProducerMap = Maps.newHashMap();
    private final Map<Integer, CompletableFuture<Reader<ByteBuffer>>> txnLogReaderMap = Maps.newHashMap();
    @VisibleForTesting
    protected final Map<Integer, Map<String, TransactionMetadata>> transactionMetadataCache = Maps.newConcurrentMap();
    private final ScheduledExecutorService scheduler;
    private final Time time;

    public TransactionStateManager(TransactionConfig transactionConfig, SystemTopicClient txnTopicClient, ScheduledExecutorService scheduler, Time time) {
        this.transactionConfig = transactionConfig;
        this.txnTopicClient = txnTopicClient;
        this.scheduler = scheduler;
        this.transactionTopicPartitionCount = transactionConfig.getTransactionLogNumPartitions();
        this.time = time;
    }

    protected List<TransactionalIdAndProducerIdEpoch> timedOutTransactions() {
        long now = this.time.milliseconds();
        return CoreUtils.inReadLock(this.stateLock, () -> this.transactionMetadataCache.entrySet().stream().filter(entry -> !this.leavingPartitions.contains(entry.getKey())).flatMap(entry -> ((Map)entry.getValue()).entrySet().stream().filter(txnMetadataEntry -> {
            TransactionMetadata txnMetadata = (TransactionMetadata)txnMetadataEntry.getValue();
            if (txnMetadata.pendingTransitionInProgress()) {
                return false;
            }
            if (txnMetadata.getState().equals((Object)TransactionState.ONGOING)) {
                return txnMetadata.getTxnStartTimestamp() + (long)txnMetadata.getTxnTimeoutMs() < now;
            }
            return false;
        }).map(txnMetadataEntry -> {
            String txnId = (String)txnMetadataEntry.getKey();
            TransactionMetadata txnMetadata = (TransactionMetadata)txnMetadataEntry.getValue();
            return new TransactionalIdAndProducerIdEpoch(txnId, txnMetadata.getProducerId(), txnMetadata.getProducerEpoch());
        })).collect(Collectors.toList()));
    }

    public void startup(boolean enableTransactionalIdExpiration) {
        if (enableTransactionalIdExpiration) {
            this.enableTransactionalIdExpiration();
        }
    }

    private void enableTransactionalIdExpiration() {
        this.scheduler.scheduleAtFixedRate(this::removeExpiredTransactionalIds, this.transactionConfig.getRemoveExpiredTransactionalIdsIntervalMs(), this.transactionConfig.getRemoveExpiredTransactionalIdsIntervalMs(), TimeUnit.MILLISECONDS);
    }

    @VisibleForTesting
    protected CompletableFuture<Void> removeExpiredTransactionalIds() {
        return CoreUtils.inReadLock(this.stateLock, () -> {
            List collect = this.transactionMetadataCache.entrySet().stream().map(entry -> {
                Integer partitionId = (Integer)entry.getKey();
                Map partitionCacheEntry = (Map)entry.getValue();
                TopicPartition transactionPartition = new TopicPartition(this.transactionConfig.getTransactionMetadataTopicName(), partitionId.intValue());
                return this.removeExpiredTransactionalIds(transactionPartition, partitionCacheEntry);
            }).collect(Collectors.toList());
            return FutureUtils.collect(collect).thenAccept(__ -> {});
        });
    }

    private CompletableFuture<Void> removeExpiredTransactionalIds(TopicPartition transactionPartition, Map<String, TransactionMetadata> txnMetadataCacheEntry) {
        return CoreUtils.inReadLock(this.stateLock, () -> {
            long currentTimeMs = this.time.milliseconds();
            ArrayList removeExpiredTransactionalFutures = Lists.newArrayList();
            txnMetadataCacheEntry.values().forEach(txnMetadata -> {
                String transactionalId = txnMetadata.getTransactionalId();
                txnMetadata.inLock(() -> {
                    if (!txnMetadata.getPendingState().isPresent() && this.shouldExpire((TransactionMetadata)txnMetadata, currentTimeMs)) {
                        byte[] tombstone = new TransactionLogKey(txnMetadata.getTransactionalId()).toBytes();
                        TransactionMetadata.TxnTransitMetadata transitMetadata = txnMetadata.prepareDead();
                        removeExpiredTransactionalFutures.add(this.writeTombstoneForExpiredTransactionalIds(transactionPartition, new TransactionalIdCoordinatorEpochAndMetadata(transactionalId, transitMetadata), tombstone));
                    }
                    return null;
                });
            });
            return ((CompletableFuture)FutureUtils.collect((List)removeExpiredTransactionalFutures).thenAccept(__ -> {})).exceptionally(ex -> {
                log.error("Error to remove tombstones for expired transactional Ids.", ex);
                return null;
            });
        });
    }

    private CompletableFuture<Void> writeTombstoneForExpiredTransactionalIds(TopicPartition transactionPartition, TransactionalIdCoordinatorEpochAndMetadata expiredForPartition, byte[] tombstone) {
        return CoreUtils.inReadLock(this.stateLock, () -> this.appendTombstone(transactionPartition.partition(), tombstone).whenComplete((__, ex) -> CoreUtils.inReadLock(this.stateLock, () -> {
            Map<String, TransactionMetadata> partitionCacheEntry = this.transactionMetadataCache.get(transactionPartition.partition());
            if (partitionCacheEntry != null) {
                String transactionalId = expiredForPartition.getTransactionalId();
                TransactionMetadata txnMetadata = partitionCacheEntry.get(transactionalId);
                txnMetadata.inLock(() -> {
                    if (txnMetadata.getPendingState().isPresent() && txnMetadata.getPendingState().get().equals((Object)TransactionState.DEAD) && txnMetadata.getProducerEpoch() == expiredForPartition.getTransitMetadata().getProducerEpoch() && ex == null) {
                        partitionCacheEntry.remove(transactionalId);
                    } else {
                        log.warn("Failed to remove expired transactionalId: {} from cache. Tombstone append error: {}pendingState: {}, producerEpoch: {}, expected producerEpoch: {}", new Object[]{transactionalId, ex, txnMetadata.getPendingState(), txnMetadata.getProducerEpoch(), expiredForPartition.getTransitMetadata().getProducerEpoch()});
                        txnMetadata.setPendingState(Optional.empty());
                    }
                    return null;
                });
            }
            return null;
        })));
    }

    protected CompletableFuture<Void> appendTombstone(int partition, byte[] tombstone) {
        return ((CompletableFuture)this.getProducer(partition).thenComposeAsync(producer -> producer.newMessage().keyBytes(tombstone).value(null).sendAsync(), (Executor)this.scheduler)).thenAcceptAsync(messageId -> {
            if (log.isDebugEnabled()) {
                log.debug("Append tombstone success, msgId: [{}], tombstone: [{}]", messageId, (Object)TransactionLogKey.decode(ByteBuffer.wrap(tombstone), (short)0));
            }
        }, (Executor)this.scheduler);
    }

    private boolean shouldExpire(TransactionMetadata txnMetadata, Long currentTimeMs) {
        return txnMetadata.getState().isExpirationAllowed() && txnMetadata.getTxnLastUpdateTimestamp() <= currentTimeMs - this.transactionConfig.getTransactionalIdExpirationMs();
    }

    public void appendTransactionToLog(String transactionalId, int coordinatorEpoch, TransactionMetadata.TxnTransitMetadata newMetadata, ResponseCallback responseCallback, RetryOnError retryOnError) {
        TopicPartition topicPartition = new TopicPartition(this.transactionConfig.getTransactionMetadataTopicName(), this.partitionFor(transactionalId));
        CoreUtils.inReadLock(this.stateLock, () -> {
            Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>> errorsAndData = this.getTransactionState(transactionalId);
            if (errorsAndData.isLeft()) {
                responseCallback.fail(errorsAndData.getLeft());
                return null;
            }
            if (!errorsAndData.getRight().isPresent()) {
                responseCallback.fail(Errors.NOT_COORDINATOR);
                return null;
            }
            CoordinatorEpochAndTxnMetadata epochAndMetadata = errorsAndData.getRight().get();
            TransactionMetadata metadata = epochAndMetadata.getTransactionMetadata();
            metadata.inLock(() -> {
                if (epochAndMetadata.getCoordinatorEpoch() != coordinatorEpoch) {
                    responseCallback.fail(Errors.NOT_COORDINATOR);
                    return null;
                }
                ((CompletableFuture)this.storeTxnLog(transactionalId, newMetadata).thenAccept(messageId -> {
                    HashMap<TopicPartition, ProduceResponse.PartitionResponse> partitionResponseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
                    partitionResponseMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.NONE));
                    this.updateCacheCallback(transactionalId, newMetadata, topicPartition, coordinatorEpoch, partitionResponseMap, responseCallback, retryOnError);
                    if (log.isDebugEnabled()) {
                        log.debug("Appending new metadata {} for transaction id {} to the local transaction log with messageId {}", new Object[]{newMetadata, transactionalId, messageId});
                    }
                })).exceptionally(ex -> {
                    log.error("Store transactional log failed, transactionalId : {}, metadata: [{}].", new Object[]{transactionalId, newMetadata, ex});
                    responseCallback.fail(Errors.forException((Throwable)ex));
                    return null;
                });
                return null;
            });
            return null;
        });
    }

    private void updateCacheCallback(String transactionalId, TransactionMetadata.TxnTransitMetadata newMetadata, TopicPartition topicPartition, int coordinatorEpoch, Map<TopicPartition, ProduceResponse.PartitionResponse> responseStatus, ResponseCallback responseCallback, RetryOnError retryOnError) {
        if (responseStatus.size() != 1 || !responseStatus.containsKey(topicPartition)) {
            throw new IllegalStateException(String.format("Append status %s should only have one partition %s", responseStatus, topicPartition));
        }
        ProduceResponse.PartitionResponse status = responseStatus.get(topicPartition);
        Errors errors = this.statusCheck(transactionalId, newMetadata, status);
        if (errors == Errors.NONE) {
            errors = this.validStatus(transactionalId, newMetadata, errors, coordinatorEpoch);
        } else {
            this.invalidStatus(transactionalId, newMetadata, coordinatorEpoch, errors, retryOnError);
        }
        if (errors != Errors.NONE) {
            responseCallback.fail(errors);
        } else {
            responseCallback.complete();
        }
    }

    private Errors statusCheck(String transactionalId, TransactionMetadata.TxnTransitMetadata newMetadata, ProduceResponse.PartitionResponse status) {
        if (status.error == Errors.NONE) {
            return Errors.NONE;
        }
        if (log.isDebugEnabled()) {
            log.debug("Appending {}'s new metadata {} failed due to {}", new Object[]{transactionalId, newMetadata, status.error.exceptionName()});
        }
        switch (status.error) {
            case UNKNOWN_TOPIC_OR_PARTITION: 
            case NOT_ENOUGH_REPLICAS: 
            case NOT_ENOUGH_REPLICAS_AFTER_APPEND: 
            case REQUEST_TIMED_OUT: {
                return Errors.COORDINATOR_NOT_AVAILABLE;
            }
            case KAFKA_STORAGE_ERROR: {
                return Errors.NOT_COORDINATOR;
            }
        }
        return Errors.UNKNOWN_SERVER_ERROR;
    }

    private Errors validStatus(String transactionalId, TransactionMetadata.TxnTransitMetadata newMetadata, Errors errors, int coordinatorEpoch) {
        Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>> errorsAndData = this.getTransactionState(transactionalId);
        if (errorsAndData.isLeft()) {
            log.info("Accessing the cached transaction metadata for {} returns {} error; aborting transition to the new metadata and setting the error in the callback", (Object)transactionalId, (Object)errorsAndData.getLeft());
            return errorsAndData.getLeft();
        }
        if (!errorsAndData.getRight().isPresent()) {
            log.info("The cached coordinator metadata does not exist in the cache anymore for {} after appended its new metadata {} to the transaction log (txn topic partition {}) while it was {} before appending; aborting transition to the new metadata and returning {} in the callback", new Object[]{transactionalId, newMetadata, this.partitionFor(transactionalId), coordinatorEpoch, Errors.NOT_COORDINATOR});
            return Errors.NOT_COORDINATOR;
        }
        TransactionMetadata metadata = errorsAndData.getRight().get().transactionMetadata;
        return metadata.inLock(() -> {
            if (((CoordinatorEpochAndTxnMetadata)((Optional)errorsAndData.getRight()).get()).coordinatorEpoch != coordinatorEpoch) {
                log.info("The cached coordinator epoch for {} has changed to {} after appended its new metadata {} to the transaction log (txn topic partition {}) while it was {} before appending; aborting transition to the new metadata and returning {} in the callback", new Object[]{transactionalId, coordinatorEpoch, newMetadata, this.partitionFor(transactionalId), coordinatorEpoch, Errors.NOT_CONTROLLER});
                return Errors.NOT_COORDINATOR;
            }
            try {
                if (log.isDebugEnabled()) {
                    log.debug("Updating {}'s transaction state to {} with coordinator epoch {} for {} successed", new Object[]{transactionalId, newMetadata, coordinatorEpoch, transactionalId});
                }
                metadata.completeTransitionTo(newMetadata);
                return errors;
            }
            catch (IllegalStateException ex) {
                log.error("Failed to complete transition.", (Throwable)ex);
                return Errors.UNKNOWN_SERVER_ERROR;
            }
        });
    }

    private void invalidStatus(String transactionalId, TransactionMetadata.TxnTransitMetadata newMetadata, int coordinatorEpoch, Errors errors, RetryOnError retryOnError) {
        Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>> errorsAndData = this.getTransactionState(transactionalId);
        if (errorsAndData.isLeft()) {
            log.info("TransactionalId {} append transaction log for {} transition failed due to {}, aborting state transition and returning the error in the callback since retrieving metadata returned {}", new Object[]{transactionalId, newMetadata, errors, errorsAndData.getLeft()});
        } else if (!errorsAndData.getRight().isPresent()) {
            log.info("TransactionalId {} append transaction log for {} transition failed due to {}, aborting state transition and returning the error in the callback since metadata is not available in the cache anymore", new Object[]{transactionalId, newMetadata, errors});
        } else {
            TransactionMetadata metadata = errorsAndData.getRight().get().transactionMetadata;
            metadata.inLock(() -> {
                if (((CoordinatorEpochAndTxnMetadata)((Optional)errorsAndData.getRight()).get()).coordinatorEpoch == coordinatorEpoch) {
                    if (retryOnError.retry(errors)) {
                        log.info("TransactionalId {} append transaction log for {} transition failed due to {}, not resetting pending state {} but just returning the error in the callback to let the caller retry", new Object[]{metadata.getTransactionalId(), newMetadata, errors, metadata.getPendingState()});
                    } else {
                        log.info("TransactionalId {} append transaction log for {} transition failed due to {}, resetting pending state from {}, aborting state transition and returning {} in the callback", new Object[]{metadata.getTransactionalId(), newMetadata, errors, metadata.getPendingState(), errorsAndData.getLeft()});
                        metadata.setPendingState(Optional.empty());
                    }
                } else {
                    log.info("TransactionalId {} append transaction log for {} transition failed due to {}, aborting state transition and returning the error in the callback since the coordinator epoch has changed from {} to {}", new Object[]{metadata.getTransactionalId(), newMetadata, errors, ((CoordinatorEpochAndTxnMetadata)((Optional)errorsAndData.getRight()).get()).coordinatorEpoch, coordinatorEpoch});
                }
                return null;
            });
        }
    }

    public Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>> getTransactionState(String transactionalId) {
        return this.getAndMaybeAddTransactionState(transactionalId, Optional.empty());
    }

    public Either<Errors, CoordinatorEpochAndTxnMetadata> putTransactionStateIfNotExists(TransactionMetadata metadata) {
        return this.getAndMaybeAddTransactionState(metadata.getTransactionalId(), Optional.of(metadata)).map(option -> (CoordinatorEpochAndTxnMetadata)option.orElseThrow(() -> new IllegalStateException("Unexpected empty transaction metadata returned while putting " + metadata)));
    }

    public boolean validateTransactionTimeoutMs(int txnTimeoutMs) {
        return (long)txnTimeoutMs <= this.transactionConfig.getTransactionMaxTimeoutMs() && txnTimeoutMs > 0;
    }

    private Either<Errors, Optional<CoordinatorEpochAndTxnMetadata>> getAndMaybeAddTransactionState(String transactionalId, Optional<TransactionMetadata> createdTxnMetadataOpt) {
        return CoreUtils.inReadLock(this.stateLock, () -> {
            Optional txnMetadata;
            int partitionId = this.partitionFor(transactionalId);
            if (this.loadingPartitions.contains(partitionId)) {
                return Either.left(Errors.COORDINATOR_LOAD_IN_PROGRESS);
            }
            if (this.leavingPartitions.contains(partitionId)) {
                return Either.left(Errors.NOT_COORDINATOR);
            }
            Map<String, TransactionMetadata> metadataMap = this.transactionMetadataCache.get(partitionId);
            if (metadataMap == null) {
                return Either.left(Errors.NOT_COORDINATOR);
            }
            TransactionMetadata txnMetadataCache = metadataMap.get(transactionalId);
            if (txnMetadataCache == null) {
                createdTxnMetadataOpt.ifPresent(metadata -> metadataMap.put(transactionalId, (TransactionMetadata)metadata));
                txnMetadata = createdTxnMetadataOpt;
            } else {
                txnMetadata = Optional.of(txnMetadataCache);
            }
            return Either.right(txnMetadata.map(metadata -> new CoordinatorEpochAndTxnMetadata(-1, (TransactionMetadata)metadata)));
        });
    }

    public int partitionFor(String transactionalId) {
        return TransactionCoordinator.partitionFor(transactionalId, this.transactionTopicPartitionCount);
    }

    public CompletableFuture<Void> loadTransactionsForTxnTopicPartition(int partitionId, SendTxnMarkersCallback sendTxnMarkers) {
        TopicPartition topicPartition = new TopicPartition(this.transactionConfig.getTransactionMetadataTopicName(), partitionId);
        boolean alreadyLoading = CoreUtils.inWriteLock(this.stateLock, () -> {
            if (this.leavingPartitions.remove(partitionId)) {
                log.warn("Leaving partition: {} should have been removed.", (Object)partitionId);
            }
            boolean partitionAlreadyLoading = !this.loadingPartitions.add(partitionId);
            this.addLoadedTransactionsToCache(topicPartition.partition(), Maps.newConcurrentMap());
            return partitionAlreadyLoading;
        });
        if (alreadyLoading) {
            log.error("Partition {} is already loading", (Object)partitionId);
            return FutureUtil.failedFuture((Throwable)new IllegalStateException("Partition " + partitionId + " is already loading"));
        }
        log.info("Partition {} start loading", (Object)partitionId);
        long startTimeMs = SystemTime.SYSTEM.milliseconds();
        return ((CompletableFuture)((CompletableFuture)((CompletableFuture)this.getProducer(topicPartition.partition()).thenComposeAsync(producer -> producer.newMessage().value(null).sendAsync(), (Executor)this.scheduler)).thenComposeAsync(lastMsgId -> {
            if (log.isDebugEnabled()) {
                log.debug("Successfully write a placeholder record into {} @ {}", (Object)topicPartition, lastMsgId);
            }
            return this.getReader(topicPartition.partition()).thenComposeAsync(reader -> this.loadTransactionMetadata(topicPartition.partition(), (Reader<ByteBuffer>)reader, (MessageId)lastMsgId), (Executor)this.scheduler);
        }, (Executor)this.scheduler)).thenAcceptAsync(__ -> this.completeLoadedTransactions(topicPartition, startTimeMs, sendTxnMarkers), (Executor)this.scheduler)).exceptionally(ex -> {
            log.error("Error to load transactions exceptions : [{}]", (Object)ex.getMessage());
            this.loadingPartitions.remove(partitionId);
            return null;
        });
    }

    private CompletableFuture<Void> loadTransactionMetadata(int partition, Reader<ByteBuffer> reader, MessageId lastMessageId) {
        if (log.isDebugEnabled()) {
            log.debug("Start load transaction metadata for partition {} till messageId {}", (Object)partition, (Object)lastMessageId);
        }
        CompletableFuture<Void> loadFuture = new CompletableFuture<Void>();
        HashMap<String, TransactionMetadata> transactionMetadataMap = new HashMap<String, TransactionMetadata>();
        this.loadNextTransaction(partition, reader, lastMessageId, loadFuture, transactionMetadataMap);
        return loadFuture;
    }

    private void loadNextTransaction(int partition, Reader<ByteBuffer> reader, MessageId lastMessageId, CompletableFuture<Void> loadFuture, Map<String, TransactionMetadata> transactionMetadataMap) {
        if (this.shuttingDown.get()) {
            loadFuture.completeExceptionally(new IllegalStateException("Transaction metadata manager is shutting down."));
            return;
        }
        reader.readNextAsync().whenCompleteAsync((message, throwable) -> {
            if (throwable != null) {
                log.error("Failed to load transaction log.", throwable);
                loadFuture.completeExceptionally((Throwable)throwable);
            }
            if (message.getMessageId().compareTo((Object)lastMessageId) >= 0) {
                this.addLoadedTransactionsToCache(partition, transactionMetadataMap);
                loadFuture.complete(null);
                return;
            }
            if (!message.hasKey()) {
                this.loadNextTransaction(partition, reader, lastMessageId, loadFuture, transactionMetadataMap);
                return;
            }
            try {
                TransactionLogKey logKey = TransactionLogKey.decode(ByteBuffer.wrap(message.getKeyBytes()), (short)0);
                String transactionId = logKey.getTransactionId();
                TransactionMetadata transactionMetadata = TransactionLogValue.readTxnRecordValue(transactionId, (ByteBuffer)message.getValue());
                if (transactionMetadata == null) {
                    transactionMetadataMap.remove(transactionId);
                } else {
                    transactionMetadataMap.put(logKey.getTransactionId(), transactionMetadata);
                }
                this.loadNextTransaction(partition, reader, lastMessageId, loadFuture, transactionMetadataMap);
            }
            catch (BufferUnderflowException | SchemaException ex) {
                log.error("Failed to decode transaction log with message {} for partition {}.", new Object[]{message.getMessageId(), partition, ex});
                loadFuture.completeExceptionally(ex);
            }
        }, (Executor)this.scheduler);
    }

    @VisibleForTesting
    protected void addLoadedTransactionsToCache(int txnTopicPartition, Map<String, TransactionMetadata> loadedTransactions) {
        Map<String, TransactionMetadata> previousTxnMetadataCacheEntry = this.transactionMetadataCache.put(txnTopicPartition, loadedTransactions);
        if (previousTxnMetadataCacheEntry != null && !previousTxnMetadataCacheEntry.isEmpty()) {
            log.warn("Unloaded transaction metadata {} from {} as part of loading metadata.", previousTxnMetadataCacheEntry, (Object)txnTopicPartition);
        }
    }

    private void completeLoadedTransactions(TopicPartition topicPartition, long startTimeMs, SendTxnMarkersCallback sendTxnMarkersCallback) {
        Map<String, TransactionMetadata> loadedTransactions = this.transactionMetadataCache.get(topicPartition.partition());
        long endTimeMs = SystemTime.SYSTEM.milliseconds();
        long totalLoadingTimeMs = endTimeMs - startTimeMs;
        log.info("Finished loading transaction metadata {} from {} in {} milliseconds", new Object[]{loadedTransactions.size(), topicPartition, totalLoadingTimeMs});
        CoreUtils.inWriteLock(this.stateLock, () -> {
            if (this.loadingPartitions.contains(topicPartition.partition())) {
                ArrayList transactionsPendingForCompletion = new ArrayList();
                for (Map.Entry entry : loadedTransactions.entrySet()) {
                    TransactionMetadata txnMetadata = (TransactionMetadata)entry.getValue();
                    txnMetadata.inLock(() -> {
                        switch (txnMetadata.getState()) {
                            case PREPARE_ABORT: {
                                transactionsPendingForCompletion.add(new TransactionalIdAndTransitMetadata((String)entry.getKey(), TransactionResult.ABORT, txnMetadata, txnMetadata.prepareComplete(SystemTime.SYSTEM.milliseconds())));
                                break;
                            }
                            case PREPARE_COMMIT: {
                                transactionsPendingForCompletion.add(new TransactionalIdAndTransitMetadata((String)entry.getKey(), TransactionResult.COMMIT, txnMetadata, txnMetadata.prepareComplete(SystemTime.SYSTEM.milliseconds())));
                                break;
                            }
                        }
                        return null;
                    });
                }
                this.loadingPartitions.remove(topicPartition.partition());
                transactionsPendingForCompletion.forEach(pendingTxn -> sendTxnMarkersCallback.send(pendingTxn.result, pendingTxn.txnMetadata, pendingTxn.transitMetadata));
            }
            this.loadingPartitions.remove(topicPartition.partition());
            return null;
        });
        log.info("Completed loading transaction metadata from {}", (Object)topicPartition);
    }

    public void removeTransactionsForTxnTopicPartition(int partition) {
        TopicPartition topicPartition = new TopicPartition(this.transactionConfig.getTransactionMetadataTopicName(), partition);
        log.info("Scheduling unloading transaction metadata from {}", (Object)topicPartition);
        CoreUtils.inWriteLock(this.stateLock, () -> {
            this.loadingPartitions.remove(partition);
            this.leavingPartitions.add(partition);
            return null;
        });
        this.scheduler.submit(() -> CoreUtils.inWriteLock(this.stateLock, () -> {
            if (this.leavingPartitions.contains(partition)) {
                this.transactionMetadataCache.remove(partition).forEach((txnId, metadata) -> log.info("Unloaded transaction metadata {} for {} following local partition deletion", metadata, (Object)topicPartition));
                CompletableFuture<Producer<ByteBuffer>> producer = this.txnLogProducerMap.remove(partition);
                CompletableFuture<Reader<ByteBuffer>> reader = this.txnLogReaderMap.remove(partition);
                if (producer != null) {
                    ((CompletableFuture)producer.thenApplyAsync(Producer::closeAsync, (Executor)this.scheduler)).whenCompleteAsync((ignore, t) -> {
                        if (t != null) {
                            log.error("Failed to close producer when remove partition {}.", (Object)((Producer)producer.join()).getTopic());
                        }
                    }, (Executor)this.scheduler);
                }
                if (reader != null) {
                    ((CompletableFuture)reader.thenApplyAsync(Reader::closeAsync, (Executor)this.scheduler)).whenCompleteAsync((ignore, t) -> {
                        if (t != null) {
                            log.error("Failed to close reader when remove partition {}.", (Object)((Reader)reader.join()).getTopic());
                        }
                    }, (Executor)this.scheduler);
                }
                this.leavingPartitions.remove(partition);
            }
            return null;
        }));
    }

    private CompletableFuture<Producer<ByteBuffer>> getProducer(Integer partition) {
        return this.txnLogProducerMap.computeIfAbsent(partition, key -> {
            String topic = this.transactionConfig.getTransactionMetadataTopicName() + "-partition-" + partition;
            return this.txnTopicClient.newProducerBuilder().clone().topic(topic).createAsync();
        });
    }

    protected CompletableFuture<MessageId> storeTxnLog(String transactionalId, TransactionMetadata.TxnTransitMetadata txnTransitMetadata) {
        byte[] keyBytes = new TransactionLogKey(transactionalId).toBytes();
        ByteBuffer valueByteBuffer = new TransactionLogValue(txnTransitMetadata).toByteBuffer();
        return this.getProducer(this.partitionFor(transactionalId)).thenCompose(producer -> producer.newMessage().keyBytes(keyBytes).value((Object)valueByteBuffer).sendAsync());
    }

    private CompletableFuture<Reader<ByteBuffer>> getReader(Integer partition) {
        return this.txnLogReaderMap.computeIfAbsent(partition, key -> {
            String topic = this.transactionConfig.getTransactionMetadataTopicName() + "-partition-" + partition;
            return this.txnTopicClient.newReaderBuilder().clone().topic(topic).startMessageId(MessageId.earliest).readCompacted(true).createAsync();
        });
    }

    public void shutdown() {
        this.shuttingDown.set(true);
        this.loadingPartitions.clear();
        this.transactionMetadataCache.clear();
        List txnLogProducerCloses = this.txnLogProducerMap.values().stream().map(producerCompletableFuture -> producerCompletableFuture.thenComposeAsync(Producer::closeAsync, (Executor)this.scheduler)).collect(Collectors.toList());
        this.txnLogProducerMap.clear();
        List txnLogReaderCloses = this.txnLogReaderMap.values().stream().map(readerCompletableFuture -> readerCompletableFuture.thenComposeAsync(Reader::closeAsync, (Executor)this.scheduler)).collect(Collectors.toList());
        this.txnLogProducerMap.clear();
        FutureUtil.waitForAll(txnLogProducerCloses).whenCompleteAsync((ignore, t) -> {
            if (t != null) {
                log.error("Error when close all the {} txnLogProducers in TransactionStateManager", (Object)txnLogProducerCloses.size(), t);
            }
            if (log.isDebugEnabled()) {
                log.debug("Closed all the {} txnLogProducers in TransactionStateManager", (Object)txnLogProducerCloses.size());
            }
        }, (Executor)this.scheduler);
        FutureUtil.waitForAll(txnLogReaderCloses).whenCompleteAsync((ignore, t) -> {
            if (t != null) {
                log.error("Error when close all the {} txnLogReaders in TransactionStateManager", (Object)txnLogReaderCloses.size(), t);
            }
            if (log.isDebugEnabled()) {
                log.debug("Closed all the {} txnLogReaders in TransactionStateManager.", (Object)txnLogReaderCloses.size());
            }
        }, (Executor)this.scheduler);
        this.scheduler.shutdown();
        log.info("Shutdown transaction state manager complete.");
    }

    private static class TransactionalIdCoordinatorEpochAndMetadata {
        private String transactionalId;
        private TransactionMetadata.TxnTransitMetadata transitMetadata;

        public String getTransactionalId() {
            return this.transactionalId;
        }

        public TransactionMetadata.TxnTransitMetadata getTransitMetadata() {
            return this.transitMetadata;
        }

        public void setTransactionalId(String transactionalId) {
            this.transactionalId = transactionalId;
        }

        public void setTransitMetadata(TransactionMetadata.TxnTransitMetadata transitMetadata) {
            this.transitMetadata = transitMetadata;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof TransactionalIdCoordinatorEpochAndMetadata)) {
                return false;
            }
            TransactionalIdCoordinatorEpochAndMetadata other = (TransactionalIdCoordinatorEpochAndMetadata)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$transactionalId = this.getTransactionalId();
            String other$transactionalId = other.getTransactionalId();
            if (this$transactionalId == null ? other$transactionalId != null : !this$transactionalId.equals(other$transactionalId)) {
                return false;
            }
            TransactionMetadata.TxnTransitMetadata this$transitMetadata = this.getTransitMetadata();
            TransactionMetadata.TxnTransitMetadata other$transitMetadata = other.getTransitMetadata();
            return !(this$transitMetadata == null ? other$transitMetadata != null : !((Object)this$transitMetadata).equals(other$transitMetadata));
        }

        protected boolean canEqual(Object other) {
            return other instanceof TransactionalIdCoordinatorEpochAndMetadata;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $transactionalId = this.getTransactionalId();
            result = result * 59 + ($transactionalId == null ? 43 : $transactionalId.hashCode());
            TransactionMetadata.TxnTransitMetadata $transitMetadata = this.getTransitMetadata();
            result = result * 59 + ($transitMetadata == null ? 43 : ((Object)$transitMetadata).hashCode());
            return result;
        }

        public String toString() {
            return "TransactionStateManager.TransactionalIdCoordinatorEpochAndMetadata(transactionalId=" + this.getTransactionalId() + ", transitMetadata=" + this.getTransitMetadata() + ")";
        }

        public TransactionalIdCoordinatorEpochAndMetadata(String transactionalId, TransactionMetadata.TxnTransitMetadata transitMetadata) {
            this.transactionalId = transactionalId;
            this.transitMetadata = transitMetadata;
        }
    }

    public static interface ResponseCallback {
        public void complete();

        public void fail(Errors var1);
    }

    public static interface RetryOnError {
        public boolean retry(Errors var1);
    }

    public static class CoordinatorEpochAndTxnMetadata {
        private Integer coordinatorEpoch;
        private TransactionMetadata transactionMetadata;

        public Integer getCoordinatorEpoch() {
            return this.coordinatorEpoch;
        }

        public TransactionMetadata getTransactionMetadata() {
            return this.transactionMetadata;
        }

        public void setCoordinatorEpoch(Integer coordinatorEpoch) {
            this.coordinatorEpoch = coordinatorEpoch;
        }

        public void setTransactionMetadata(TransactionMetadata transactionMetadata) {
            this.transactionMetadata = transactionMetadata;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof CoordinatorEpochAndTxnMetadata)) {
                return false;
            }
            CoordinatorEpochAndTxnMetadata other = (CoordinatorEpochAndTxnMetadata)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Integer this$coordinatorEpoch = this.getCoordinatorEpoch();
            Integer other$coordinatorEpoch = other.getCoordinatorEpoch();
            if (this$coordinatorEpoch == null ? other$coordinatorEpoch != null : !((Object)this$coordinatorEpoch).equals(other$coordinatorEpoch)) {
                return false;
            }
            TransactionMetadata this$transactionMetadata = this.getTransactionMetadata();
            TransactionMetadata other$transactionMetadata = other.getTransactionMetadata();
            return !(this$transactionMetadata == null ? other$transactionMetadata != null : !((Object)this$transactionMetadata).equals(other$transactionMetadata));
        }

        protected boolean canEqual(Object other) {
            return other instanceof CoordinatorEpochAndTxnMetadata;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Integer $coordinatorEpoch = this.getCoordinatorEpoch();
            result = result * 59 + ($coordinatorEpoch == null ? 43 : ((Object)$coordinatorEpoch).hashCode());
            TransactionMetadata $transactionMetadata = this.getTransactionMetadata();
            result = result * 59 + ($transactionMetadata == null ? 43 : ((Object)$transactionMetadata).hashCode());
            return result;
        }

        public String toString() {
            return "TransactionStateManager.CoordinatorEpochAndTxnMetadata(coordinatorEpoch=" + this.getCoordinatorEpoch() + ", transactionMetadata=" + this.getTransactionMetadata() + ")";
        }

        public CoordinatorEpochAndTxnMetadata(Integer coordinatorEpoch, TransactionMetadata transactionMetadata) {
            this.coordinatorEpoch = coordinatorEpoch;
            this.transactionMetadata = transactionMetadata;
        }
    }

    static interface SendTxnMarkersCallback {
        public void send(TransactionResult var1, TransactionMetadata var2, TransactionMetadata.TxnTransitMetadata var3);
    }

    private static class TransactionalIdAndTransitMetadata {
        private final String transactionalId;
        private TransactionResult result;
        private TransactionMetadata txnMetadata;
        private TransactionMetadata.TxnTransitMetadata transitMetadata;

        public TransactionalIdAndTransitMetadata(String transactionalId, TransactionResult result, TransactionMetadata txnMetadata, TransactionMetadata.TxnTransitMetadata transitMetadata) {
            this.transactionalId = transactionalId;
            this.result = result;
            this.txnMetadata = txnMetadata;
            this.transitMetadata = transitMetadata;
        }
    }

    protected static class TransactionalIdAndProducerIdEpoch {
        private String transactionalId;
        private Long producerId;
        private Short producerEpoch;

        public String getTransactionalId() {
            return this.transactionalId;
        }

        public Long getProducerId() {
            return this.producerId;
        }

        public Short getProducerEpoch() {
            return this.producerEpoch;
        }

        public void setTransactionalId(String transactionalId) {
            this.transactionalId = transactionalId;
        }

        public void setProducerId(Long producerId) {
            this.producerId = producerId;
        }

        public void setProducerEpoch(Short producerEpoch) {
            this.producerEpoch = producerEpoch;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof TransactionalIdAndProducerIdEpoch)) {
                return false;
            }
            TransactionalIdAndProducerIdEpoch other = (TransactionalIdAndProducerIdEpoch)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Long this$producerId = this.getProducerId();
            Long other$producerId = other.getProducerId();
            if (this$producerId == null ? other$producerId != null : !((Object)this$producerId).equals(other$producerId)) {
                return false;
            }
            Short this$producerEpoch = this.getProducerEpoch();
            Short other$producerEpoch = other.getProducerEpoch();
            if (this$producerEpoch == null ? other$producerEpoch != null : !((Object)this$producerEpoch).equals(other$producerEpoch)) {
                return false;
            }
            String this$transactionalId = this.getTransactionalId();
            String other$transactionalId = other.getTransactionalId();
            return !(this$transactionalId == null ? other$transactionalId != null : !this$transactionalId.equals(other$transactionalId));
        }

        protected boolean canEqual(Object other) {
            return other instanceof TransactionalIdAndProducerIdEpoch;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Long $producerId = this.getProducerId();
            result = result * 59 + ($producerId == null ? 43 : ((Object)$producerId).hashCode());
            Short $producerEpoch = this.getProducerEpoch();
            result = result * 59 + ($producerEpoch == null ? 43 : ((Object)$producerEpoch).hashCode());
            String $transactionalId = this.getTransactionalId();
            result = result * 59 + ($transactionalId == null ? 43 : $transactionalId.hashCode());
            return result;
        }

        public String toString() {
            return "TransactionStateManager.TransactionalIdAndProducerIdEpoch(transactionalId=" + this.getTransactionalId() + ", producerId=" + this.getProducerId() + ", producerEpoch=" + this.getProducerEpoch() + ")";
        }

        public TransactionalIdAndProducerIdEpoch(String transactionalId, Long producerId, Short producerEpoch) {
            this.transactionalId = transactionalId;
            this.producerId = producerId;
            this.producerEpoch = producerEpoch;
        }
    }

    private static class TxnMetadataCacheEntry {
        private Integer coordinatorEpoch;
        private Map<String, TransactionMetadata> metadataPerTransactionalId;

        public String toString() {
            return "TxnMetadataCacheEntry{coordinatorEpoch=" + this.coordinatorEpoch + ", numTransactionalEntries=" + this.metadataPerTransactionalId.size() + "}";
        }

        public TxnMetadataCacheEntry(Integer coordinatorEpoch, Map<String, TransactionMetadata> metadataPerTransactionalId) {
            this.coordinatorEpoch = coordinatorEpoch;
            this.metadataPerTransactionalId = metadataPerTransactionalId;
        }
    }
}

