/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.KopBrokerLookupManager;
import io.streamnative.pulsar.handlers.kop.SystemTopicClient;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.ProducerIdManager;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.ProducerIdManagerImpl;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.PulsarStorageProducerIdManagerImpl;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionConfig;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMarkerChannelManager;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionState;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.bookkeeper.common.util.MathUtils;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.Time;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionCoordinator {
    private static final Logger log = LoggerFactory.getLogger(TransactionCoordinator.class);
    private final String namespacePrefixForMetadata;
    private final String namespacePrefixForUserTopics;
    private final TransactionConfig transactionConfig;
    private final ProducerIdManager producerIdManager;
    private final TransactionStateManager txnManager;
    private final TransactionMarkerChannelManager transactionMarkerChannelManager;
    private final ScheduledExecutorService scheduler;
    private final Time time;
    private static final BiConsumer<TransactionStateManager.TransactionalIdAndProducerIdEpoch, Errors> onEndTransactionComplete = (txnIdAndPidEpoch, errors) -> {
        switch (errors) {
            case NONE: {
                log.info("Completed rollback of ongoing transaction for transactionalId {} due to timeout", (Object)txnIdAndPidEpoch.getTransactionalId());
                break;
            }
            case INVALID_PRODUCER_ID_MAPPING: 
            case PRODUCER_FENCED: 
            case CONCURRENT_TRANSACTIONS: {
                if (!log.isDebugEnabled()) break;
                log.debug("Rollback of ongoing transaction for transactionalId {} has been cancelled due to error {}", (Object)txnIdAndPidEpoch.getTransactionalId(), errors);
                break;
            }
            default: {
                log.warn("Rollback of ongoing transaction for transactionalId {} failed due to error {}", (Object)txnIdAndPidEpoch.getTransactionalId(), errors);
            }
        }
    };

    protected TransactionCoordinator(TransactionConfig transactionConfig, TransactionMarkerChannelManager transactionMarkerChannelManager, ScheduledExecutorService scheduler, ProducerIdManager producerIdManager, TransactionStateManager txnManager, Time time, String namespacePrefixForMetadata, String namespacePrefixForUserTopics) {
        this.namespacePrefixForMetadata = namespacePrefixForMetadata;
        this.namespacePrefixForUserTopics = namespacePrefixForUserTopics;
        this.transactionConfig = transactionConfig;
        this.txnManager = txnManager;
        this.producerIdManager = producerIdManager;
        this.transactionMarkerChannelManager = transactionMarkerChannelManager;
        this.scheduler = scheduler;
        this.time = time;
    }

    public static TransactionCoordinator of(String tenant, KafkaServiceConfiguration kafkaConfig, TransactionConfig transactionConfig, SystemTopicClient txnTopicClient, MetadataStoreExtended metadataStore, KopBrokerLookupManager kopBrokerLookupManager, ScheduledExecutorService scheduler, Time time) throws Exception {
        String namespacePrefixForMetadata = MetadataUtils.constructMetadataNamespace(tenant, kafkaConfig);
        String namespacePrefixForUserTopics = MetadataUtils.constructUserTopicsNamespace(tenant, kafkaConfig);
        TransactionStateManager transactionStateManager = new TransactionStateManager(transactionConfig, txnTopicClient, scheduler, time);
        ProducerIdManager producerIdManager = kafkaConfig.isKafkaTransactionProducerIdsStoredOnPulsar() ? new PulsarStorageProducerIdManagerImpl(transactionConfig.getTransactionProducerIdTopicName(), (PulsarClient)txnTopicClient.getPulsarClient()) : new ProducerIdManagerImpl(transactionConfig.getBrokerId(), metadataStore);
        return new TransactionCoordinator(transactionConfig, new TransactionMarkerChannelManager(tenant, kafkaConfig, transactionStateManager, kopBrokerLookupManager, kafkaConfig.isKopTlsEnabledWithBroker(), namespacePrefixForUserTopics, scheduler), scheduler, producerIdManager, transactionStateManager, time, namespacePrefixForMetadata, namespacePrefixForUserTopics);
    }

    public CompletableFuture<Void> handleTxnImmigration(int partition) {
        log.info("Elected as the txn coordinator for partition {} for {}.", (Object)partition, (Object)this.namespacePrefixForMetadata);
        this.transactionMarkerChannelManager.removeMarkersForTxnTopicPartition(partition);
        return this.txnManager.loadTransactionsForTxnTopicPartition(partition, (transactionResult, transactionMetadata, txnTransitMetadata) -> this.transactionMarkerChannelManager.addTxnMarkersToSend(-1, transactionResult, transactionMetadata, txnTransitMetadata, this.namespacePrefixForUserTopics));
    }

    public void handleTxnEmigration(int partition) {
        log.info("Resigned as the txn coordinator for partition {} for {}.", (Object)partition, (Object)this.namespacePrefixForMetadata);
        this.txnManager.removeTransactionsForTxnTopicPartition(partition);
        this.transactionMarkerChannelManager.removeMarkersForTxnTopicPartition(partition);
    }

    public int partitionFor(String transactionalId) {
        return TransactionCoordinator.partitionFor(transactionalId, this.transactionConfig.getTransactionLogNumPartitions());
    }

    public static int partitionFor(String transactionalId, int transactionLogNumPartitions) {
        return MathUtils.signSafeMod((long)transactionalId.hashCode(), (int)transactionLogNumPartitions);
    }

    public String getTopicPartitionName() {
        return this.transactionConfig.getTransactionMetadataTopicName();
    }

    public String getTopicPartitionName(int partitionId) {
        return TransactionCoordinator.getTopicPartitionName(this.getTopicPartitionName(), partitionId);
    }

    public static String getTopicPartitionName(String topicPartitionName, int partitionId) {
        return topicPartitionName + "-partition-" + partitionId;
    }

    public void handleInitProducerId(String transactionalId, int transactionTimeoutMs, Optional<ProducerIdAndEpoch> expectedProducerIdAndEpoch, Consumer<InitProducerIdResult> responseCallback) {
        if (transactionalId == null) {
            this.producerIdManager.generateProducerId().whenComplete((pid, throwable) -> {
                short producerEpoch = 0;
                if (throwable != null) {
                    log.error("Failed to generate producer id for idempotent producer", throwable);
                    responseCallback.accept(new InitProducerIdResult((Long)pid, producerEpoch, Errors.UNKNOWN_SERVER_ERROR));
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("Generate producer id {} for idempotent producer", pid);
                }
                responseCallback.accept(new InitProducerIdResult((Long)pid, producerEpoch, Errors.NONE));
            });
        } else if (StringUtils.isEmpty((CharSequence)transactionalId)) {
            responseCallback.accept(this.initTransactionError(Errors.INVALID_REQUEST));
        } else if (!this.txnManager.validateTransactionTimeoutMs(transactionTimeoutMs)) {
            responseCallback.accept(this.initTransactionError(Errors.INVALID_TRANSACTION_TIMEOUT));
        } else {
            CompletableFuture epochAndTxnMetaFuture = new CompletableFuture();
            this.txnManager.getTransactionState(transactionalId).match(errors -> epochAndTxnMetaFuture.complete(Either.left(errors)), optEpochAndTxnMetadata -> {
                if (optEpochAndTxnMetadata.isPresent()) {
                    epochAndTxnMetaFuture.complete(Either.right((TransactionStateManager.CoordinatorEpochAndTxnMetadata)optEpochAndTxnMetadata.get()));
                } else {
                    this.producerIdManager.generateProducerId().whenComplete((pid, throwable) -> {
                        if (throwable != null) {
                            log.error("Failed to generate producer id for {}", (Object)transactionalId, throwable);
                            epochAndTxnMetaFuture.complete(Either.left(Errors.UNKNOWN_SERVER_ERROR));
                            return;
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("Generate producer id {} for {}", pid, (Object)transactionalId);
                        }
                        TransactionMetadata newMetadata = TransactionMetadata.builder().transactionalId(transactionalId).producerId((long)pid).lastProducerId(-1L).producerEpoch((short)-1).lastProducerEpoch((short)-1).state(TransactionState.EMPTY).topicPartitions(Sets.newHashSet()).txnLastUpdateTimestamp(this.time.milliseconds()).build();
                        epochAndTxnMetaFuture.complete(this.txnManager.putTransactionStateIfNotExists(newMetadata));
                    });
                }
            });
            epochAndTxnMetaFuture.thenAccept(either -> either.match(errors -> responseCallback.accept(this.initTransactionError((Errors)errors)), epochAndTxnMetadata -> {
                int coordinatorEpoch = epochAndTxnMetadata.getCoordinatorEpoch();
                TransactionMetadata txnMetadata = epochAndTxnMetadata.getTransactionMetadata();
                txnMetadata.inLock(() -> {
                    this.prepareInitProducerIdTransit(transactionalId, transactionTimeoutMs, coordinatorEpoch, txnMetadata, expectedProducerIdAndEpoch).whenComplete((errorsOrEpochAndTxnTransitMetadata, prepareThrowable) -> this.completeInitProducer(transactionalId, coordinatorEpoch, (Either<Errors, EpochAndTxnTransitMetadata>)errorsOrEpochAndTxnTransitMetadata, (Throwable)prepareThrowable, responseCallback));
                    return null;
                });
            }));
        }
    }

    private void completeInitProducer(final String transactionalId, int coordinatorEpoch, Either<Errors, EpochAndTxnTransitMetadata> errorsOrEpochAndTransitMetadata, Throwable prepareInitPidThrowable, final Consumer<InitProducerIdResult> responseCallback) {
        if (prepareInitPidThrowable != null) {
            log.error("Failed to init producerId.", prepareInitPidThrowable);
            responseCallback.accept(this.initTransactionError(Errors.forException((Throwable)prepareInitPidThrowable)));
            return;
        }
        if (errorsOrEpochAndTransitMetadata.isLeft()) {
            log.error("Failed to init producerId: {}", (Object)errorsOrEpochAndTransitMetadata.getLeft());
            responseCallback.accept(this.initTransactionError(errorsOrEpochAndTransitMetadata.getLeft()));
            return;
        }
        final TransactionMetadata.TxnTransitMetadata newMetadata = errorsOrEpochAndTransitMetadata.getRight().getTxnTransitMetadata();
        if (newMetadata.getTxnState() == TransactionState.PREPARE_EPOCH_FENCE) {
            this.endTransaction(transactionalId, newMetadata.getProducerId(), newMetadata.getProducerEpoch(), TransactionResult.ABORT, false, errors -> {
                if (errors != Errors.NONE) {
                    responseCallback.accept(this.initTransactionError((Errors)errors));
                } else {
                    responseCallback.accept(this.initTransactionError(Errors.CONCURRENT_TRANSACTIONS));
                }
            });
        } else {
            this.txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, newMetadata, new TransactionStateManager.ResponseCallback(){

                @Override
                public void complete() {
                    log.info("Initialized transactionalId {} with producerId {} and producer epoch {} on partition {}-{}", new Object[]{transactionalId, newMetadata.getProducerId(), newMetadata.getProducerEpoch(), "__transaction_state", TransactionCoordinator.this.txnManager.partitionFor(transactionalId)});
                    responseCallback.accept(new InitProducerIdResult(newMetadata.getProducerId(), newMetadata.getProducerEpoch(), Errors.NONE));
                }

                @Override
                public void fail(Errors errors) {
                    log.info("Returning {} error code to client for {}'s InitProducerId request", (Object)errors, (Object)transactionalId);
                    responseCallback.accept(TransactionCoordinator.this.initTransactionError(errors));
                }
            }, errors -> true);
        }
    }

    private InitProducerIdResult initTransactionError(Errors error) {
        return new InitProducerIdResult(-1L, (short)-1, error);
    }

    private boolean isValidProducerId(TransactionMetadata txnMetadata, ProducerIdAndEpoch producerIdAndEpoch) {
        return txnMetadata.getProducerEpoch() == -1 || producerIdAndEpoch.producerId == txnMetadata.getProducerId() || producerIdAndEpoch.producerId == txnMetadata.getLastProducerId() && txnMetadata.isEpochExhausted(producerIdAndEpoch.epoch);
    }

    private CompletableFuture<Either<Errors, EpochAndTxnTransitMetadata>> prepareInitProducerIdTransit(String transactionalId, Integer transactionTimeoutMs, Integer coordinatorEpoch, TransactionMetadata txnMetadata, Optional<ProducerIdAndEpoch> expectedProducerIdAndEpoch) {
        CompletableFuture<Either<Errors, EpochAndTxnTransitMetadata>> resultFuture = new CompletableFuture<Either<Errors, EpochAndTxnTransitMetadata>>();
        if (txnMetadata.pendingTransitionInProgress()) {
            resultFuture.complete(Either.left(Errors.CONCURRENT_TRANSACTIONS));
            return resultFuture;
        }
        if (expectedProducerIdAndEpoch.isPresent() && !this.isValidProducerId(txnMetadata, expectedProducerIdAndEpoch.get())) {
            resultFuture.complete(Either.left(this.producerEpochFenceErrors()));
            return resultFuture;
        }
        switch (txnMetadata.getState()) {
            case PREPARE_ABORT: 
            case PREPARE_COMMIT: {
                resultFuture.complete(Either.left(Errors.CONCURRENT_TRANSACTIONS));
                break;
            }
            case COMPLETE_ABORT: 
            case COMPLETE_COMMIT: 
            case EMPTY: {
                CompletableFuture<Either<Errors, TransactionMetadata.TxnTransitMetadata>> transitMetadata = new CompletableFuture<Either<Errors, TransactionMetadata.TxnTransitMetadata>>();
                if (txnMetadata.isProducerEpochExhausted()) {
                    CompletableFuture<Long> newProducerId = this.producerIdManager.generateProducerId();
                    newProducerId.thenAccept(newPid -> transitMetadata.complete(Either.right(txnMetadata.prepareProducerIdRotation((Long)newPid, transactionTimeoutMs, this.time.milliseconds(), expectedProducerIdAndEpoch.isPresent()))));
                } else {
                    transitMetadata.complete(txnMetadata.prepareIncrementProducerEpoch(transactionTimeoutMs, expectedProducerIdAndEpoch.map(ProducerIdAndEpoch::getEpoch), this.time.milliseconds()));
                }
                transitMetadata.thenAccept(txnTransitMetadata -> resultFuture.complete(txnTransitMetadata.map(__ -> new EpochAndTxnTransitMetadata(coordinatorEpoch, (TransactionMetadata.TxnTransitMetadata)__))));
                break;
            }
            case ONGOING: {
                resultFuture.complete(Either.right(new EpochAndTxnTransitMetadata(coordinatorEpoch, txnMetadata.prepareFenceProducerEpoch())));
                break;
            }
            default: {
                String errorMsg = String.format("Found transactionalId %s with state %s. This is illegal as we should never have transitioned to this state.", new Object[]{transactionalId, txnMetadata.getState()});
                resultFuture.completeExceptionally(new IllegalStateException(errorMsg));
            }
        }
        return resultFuture;
    }

    public void handleAddPartitionsToTransaction(String transactionalId, long producerId, short producerEpoch, Set<TopicPartition> partitionList, final Consumer<Errors> responseCallback) {
        if (transactionalId == null || transactionalId.isEmpty()) {
            if (log.isDebugEnabled()) {
                log.debug("Returning {} error code to client for {}'s AddPartitions request", (Object)Errors.INVALID_REQUEST, (Object)transactionalId);
            }
            responseCallback.accept(Errors.INVALID_REQUEST);
            return;
        }
        Either<Errors, Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> errorsOrMetadata = this.txnManager.getTransactionState(transactionalId);
        if (errorsOrMetadata.isLeft()) {
            responseCallback.accept(errorsOrMetadata.getLeft());
            return;
        }
        if (!errorsOrMetadata.getRight().isPresent()) {
            responseCallback.accept(Errors.INVALID_PRODUCER_ID_MAPPING);
            return;
        }
        TransactionStateManager.CoordinatorEpochAndTxnMetadata epochAndTxnMetadata = errorsOrMetadata.getRight().get();
        int coordinatorEpoch = epochAndTxnMetadata.getCoordinatorEpoch();
        TransactionMetadata txnMetadata = epochAndTxnMetadata.getTransactionMetadata();
        Either errorsOrTransitMetadata = txnMetadata.inLock(() -> {
            if (txnMetadata.getProducerId() != producerId) {
                return Either.left(Errors.INVALID_PRODUCER_ID_MAPPING);
            }
            if (txnMetadata.getProducerEpoch() != producerEpoch) {
                return Either.left(this.producerEpochFenceErrors());
            }
            if (txnMetadata.getPendingState().isPresent()) {
                return Either.left(Errors.CONCURRENT_TRANSACTIONS);
            }
            if (txnMetadata.getState() == TransactionState.PREPARE_COMMIT || txnMetadata.getState() == TransactionState.PREPARE_ABORT) {
                return Either.left(Errors.CONCURRENT_TRANSACTIONS);
            }
            if (txnMetadata.getState() == TransactionState.ONGOING && txnMetadata.getTopicPartitions().containsAll(partitionList)) {
                return Either.left(Errors.NONE);
            }
            return Either.right(new EpochAndTxnTransitMetadata(coordinatorEpoch, txnMetadata.prepareAddPartitions(new HashSet<TopicPartition>(partitionList), this.time.milliseconds())));
        });
        if (errorsOrTransitMetadata.getLeft() != null) {
            responseCallback.accept((Errors)errorsOrTransitMetadata.getLeft());
            return;
        }
        EpochAndTxnTransitMetadata transitMetadata = (EpochAndTxnTransitMetadata)errorsOrTransitMetadata.getRight();
        this.txnManager.appendTransactionToLog(transactionalId, transitMetadata.getCoordinatorEpoch(), transitMetadata.getTxnTransitMetadata(), new TransactionStateManager.ResponseCallback(){

            @Override
            public void complete() {
                responseCallback.accept(Errors.NONE);
            }

            @Override
            public void fail(Errors e) {
                responseCallback.accept(e);
            }
        }, errors -> true);
    }

    private Errors producerEpochFenceErrors() {
        if (log.isDebugEnabled()) {
            log.debug("There is a newer producer with the same transactionalId which fences the current one.");
        }
        return Errors.PRODUCER_FENCED;
    }

    public void handleEndTransaction(String transactionalId, long producerId, short producerEpoch, TransactionResult transactionResult, Consumer<Errors> responseCallback) {
        this.endTransaction(transactionalId, producerId, producerEpoch, transactionResult, true, responseCallback);
    }

    private void endTransaction(final String transactionalId, final Long producerId, final Short producerEpoch, final TransactionResult txnMarkerResult, boolean isFromClient, final Consumer<Errors> callback) {
        final AtomicBoolean isEpochFence = new AtomicBoolean(false);
        if (transactionalId == null || transactionalId.isEmpty()) {
            callback.accept(Errors.INVALID_REQUEST);
            return;
        }
        Either<Errors, Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> transactionState = this.txnManager.getTransactionState(transactionalId);
        if (transactionState.isLeft()) {
            callback.accept(transactionState.getLeft());
            return;
        }
        Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata> epochAndMetadata = transactionState.getRight();
        if (!epochAndMetadata.isPresent()) {
            callback.accept(Errors.INVALID_PRODUCER_ID_MAPPING);
            return;
        }
        final Either<Errors, TransactionMetadata.TxnTransitMetadata> preAppendResult = this.endTxnPreAppend(epochAndMetadata.get(), transactionalId, producerId, isFromClient, producerEpoch, txnMarkerResult, isEpochFence);
        if (preAppendResult.isLeft()) {
            log.error("Aborting append of {} to transaction log with coordinator and returning {} error to client for {}'s EndTransaction request", new Object[]{txnMarkerResult, preAppendResult.getLeft(), transactionalId});
            callback.accept(preAppendResult.getLeft());
            return;
        }
        final int coordinatorEpoch = epochAndMetadata.get().getCoordinatorEpoch();
        this.txnManager.appendTransactionToLog(transactionalId, coordinatorEpoch, preAppendResult.getRight(), new TransactionStateManager.ResponseCallback(){

            @Override
            public void complete() {
                TransactionCoordinator.this.completeEndTxn(transactionalId, coordinatorEpoch, producerId, producerEpoch.shortValue(), txnMarkerResult, callback);
            }

            @Override
            public void fail(Errors errors) {
                log.info("Aborting sending of transaction markers and returning {} error to client for {}'s EndTransaction request of {}, since appending {} to transaction log with coordinator epoch {} failed", new Object[]{errors, transactionalId, txnMarkerResult, preAppendResult.getRight(), coordinatorEpoch});
                if (isEpochFence.get()) {
                    Either<Errors, Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> errorsAndData = TransactionCoordinator.this.txnManager.getTransactionState(transactionalId);
                    if (!errorsAndData.getRight().isPresent()) {
                        log.warn("The coordinator still owns the transaction partition for {}, but there is no metadata in the cache; this is not expected", (Object)transactionalId);
                        return;
                    }
                    TransactionStateManager.CoordinatorEpochAndTxnMetadata epochAndMetadata = errorsAndData.getRight().get();
                    if (epochAndMetadata.getCoordinatorEpoch() == coordinatorEpoch) {
                        epochAndMetadata.getTransactionMetadata().setHasFailedEpochFence(true);
                        log.warn("The coordinator failed to write an epoch fence transition for producer {} to the transaction log with error {}. The epoch was increased to {} but not returned to the client", new Object[]{transactionalId, errors, ((TransactionMetadata.TxnTransitMetadata)preAppendResult.getRight()).getProducerEpoch()});
                    }
                }
                callback.accept(errors);
            }
        }, retryErrors -> true);
    }

    private Either<Errors, TransactionMetadata.TxnTransitMetadata> endTxnPreAppend(TransactionStateManager.CoordinatorEpochAndTxnMetadata epochAndMetadata, String transactionalId, long producerId, boolean isFromClient, short producerEpoch, TransactionResult txnMarkerResult, AtomicBoolean isEpochFence) {
        TransactionMetadata txnMetadata = epochAndMetadata.getTransactionMetadata();
        return txnMetadata.inLock(() -> {
            if (txnMetadata.getProducerId() != producerId) {
                return Either.left(Errors.INVALID_PRODUCER_ID_MAPPING);
            }
            if (isFromClient && producerEpoch != txnMetadata.getProducerEpoch() || producerEpoch < txnMetadata.getProducerEpoch()) {
                return Either.left(this.producerEpochFenceErrors());
            }
            if (txnMetadata.getPendingState().isPresent() && txnMetadata.getPendingState().get() != TransactionState.PREPARE_EPOCH_FENCE) {
                return Either.left(Errors.CONCURRENT_TRANSACTIONS);
            }
            return this.endTxnByStatus(transactionalId, txnMarkerResult, txnMetadata, isEpochFence, producerEpoch);
        });
    }

    private Either<Errors, TransactionMetadata.TxnTransitMetadata> endTxnByStatus(String transactionalId, TransactionResult txnMarkerResult, TransactionMetadata txnMetadata, AtomicBoolean isEpochFence, short producerEpoch) {
        switch (txnMetadata.getState()) {
            case ONGOING: {
                return Either.right(this.endTxnOnGoingResult(txnMarkerResult, txnMetadata, isEpochFence, producerEpoch));
            }
            case COMPLETE_COMMIT: {
                return Either.left(this.getPreEndTxnErrors(txnMarkerResult, TransactionResult.COMMIT, Errors.NONE, transactionalId, txnMetadata));
            }
            case COMPLETE_ABORT: {
                return Either.left(this.getPreEndTxnErrors(txnMarkerResult, TransactionResult.ABORT, Errors.NONE, transactionalId, txnMetadata));
            }
            case PREPARE_COMMIT: {
                return Either.left(this.getPreEndTxnErrors(txnMarkerResult, TransactionResult.COMMIT, Errors.CONCURRENT_TRANSACTIONS, transactionalId, txnMetadata));
            }
            case PREPARE_ABORT: {
                return Either.left(this.getPreEndTxnErrors(txnMarkerResult, TransactionResult.ABORT, Errors.CONCURRENT_TRANSACTIONS, transactionalId, txnMetadata));
            }
            case EMPTY: {
                return Either.left(this.logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.getState(), txnMarkerResult));
            }
        }
        String errorMsg = String.format("Found transactionalId %s with state %s. This is illegal as we should never have transitioned to this state.", new Object[]{transactionalId, txnMetadata.getState()});
        log.error(errorMsg);
        throw new IllegalStateException(errorMsg);
    }

    private TransactionMetadata.TxnTransitMetadata endTxnOnGoingResult(TransactionResult txnMarkerResult, TransactionMetadata txnMetadata, AtomicBoolean isEpochFence, short producerEpoch) {
        TransactionState nextState = txnMarkerResult == TransactionResult.COMMIT ? TransactionState.PREPARE_COMMIT : TransactionState.PREPARE_ABORT;
        if (nextState == TransactionState.PREPARE_ABORT && txnMetadata.getPendingState().isPresent() && txnMetadata.getPendingState().get().equals((Object)TransactionState.PREPARE_EPOCH_FENCE)) {
            isEpochFence.set(true);
            txnMetadata.setPendingState(Optional.empty());
            txnMetadata.setProducerEpoch(producerEpoch);
            txnMetadata.setLastProducerEpoch((short)-1);
        }
        return txnMetadata.prepareAbortOrCommit(nextState, this.time.milliseconds());
    }

    private Errors getPreEndTxnErrors(TransactionResult txnMarkerResult, TransactionResult compareResult, Errors errors, String transactionalId, TransactionMetadata txnMetadata) {
        if (txnMarkerResult.equals((Object)compareResult)) {
            return errors;
        }
        return this.logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.getState(), txnMarkerResult);
    }

    private void completeEndTxn(String transactionalId, int coordinatorEpoch, long producerId, int producerEpoch, TransactionResult txnMarkerResult, Consumer<Errors> callback) {
        Either errorsOrPreSendResult;
        Either<Errors, Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> errorsOrOptEpochAndTxnMetadata = this.txnManager.getTransactionState(transactionalId);
        if (!errorsOrOptEpochAndTxnMetadata.getRight().isPresent()) {
            String errorMsg = String.format("The coordinator still owns the transaction partition for %s, but there is no metadata in the cache; this is not expected", transactionalId);
            log.error(errorMsg);
            throw new IllegalStateException(errorMsg);
        }
        TransactionStateManager.CoordinatorEpochAndTxnMetadata epochAndTxnMetadata = errorsOrOptEpochAndTxnMetadata.getRight().get();
        if (epochAndTxnMetadata.getCoordinatorEpoch() == coordinatorEpoch) {
            TransactionMetadata txnMetadata = epochAndTxnMetadata.getTransactionMetadata();
            errorsOrPreSendResult = txnMetadata.inLock(() -> {
                if (txnMetadata.getProducerId() != producerId) {
                    return Either.left(Errors.INVALID_PRODUCER_ID_MAPPING);
                }
                if (txnMetadata.getProducerEpoch() != producerEpoch) {
                    return Either.left(this.producerEpochFenceErrors());
                }
                if (txnMetadata.getPendingState().isPresent()) {
                    return Either.left(Errors.CONCURRENT_TRANSACTIONS);
                }
                switch (txnMetadata.getState()) {
                    case COMPLETE_ABORT: 
                    case COMPLETE_COMMIT: 
                    case EMPTY: 
                    case ONGOING: {
                        return Either.left(this.logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.getState(), txnMarkerResult));
                    }
                    case PREPARE_COMMIT: {
                        if (txnMarkerResult != TransactionResult.COMMIT) {
                            return Either.left(this.logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.getState(), txnMarkerResult));
                        }
                        TransactionMetadata.TxnTransitMetadata txnTransitMetadata = txnMetadata.prepareComplete(this.time.milliseconds());
                        return Either.right(new PreSendResult(txnMetadata, txnTransitMetadata));
                    }
                    case PREPARE_ABORT: {
                        if (txnMarkerResult != TransactionResult.ABORT) {
                            return Either.left(this.logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.getState(), txnMarkerResult));
                        }
                        TransactionMetadata.TxnTransitMetadata txnTransitMetadata = txnMetadata.prepareComplete(this.time.milliseconds());
                        return Either.right(new PreSendResult(txnMetadata, txnTransitMetadata));
                    }
                }
                String errorMsg = String.format("Found transactionalId %s with state %s. This is illegal as we should never have transitioned to this state.", new Object[]{transactionalId, txnMetadata.getState()});
                log.error(errorMsg);
                throw new IllegalStateException(errorMsg);
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("The transaction coordinator epoch has changed to {} after {} was successfully appended to the log for {} with old epoch {}", new Object[]{epochAndTxnMetadata.getCoordinatorEpoch(), txnMarkerResult, transactionalId, coordinatorEpoch});
            }
            errorsOrPreSendResult = Either.left(Errors.NOT_COORDINATOR);
        }
        if (errorsOrPreSendResult.isLeft()) {
            log.info("Aborting sending of transaction markers after appended {} to transaction log and returning {} error to client for {}'s EndTransaction request", new Object[]{transactionalId, txnMarkerResult, errorsOrPreSendResult.getLeft()});
            callback.accept((Errors)errorsOrPreSendResult.getLeft());
            return;
        }
        callback.accept(Errors.NONE);
        this.transactionMarkerChannelManager.addTxnMarkersToSend(coordinatorEpoch, txnMarkerResult, epochAndTxnMetadata.getTransactionMetadata(), ((PreSendResult)errorsOrPreSendResult.getRight()).getTxnTransitMetadata(), this.namespacePrefixForUserTopics);
    }

    private Errors logInvalidStateTransitionAndReturnError(String transactionalId, TransactionState transactionState, TransactionResult transactionResult) {
        log.debug("TransactionalId: {}'s state is {}, but received transaction marker result to send: {}", new Object[]{transactionalId, transactionState, transactionResult});
        return Errors.INVALID_TXN_STATE;
    }

    @VisibleForTesting
    protected void abortTimedOutTransactions(BiConsumer<TransactionStateManager.TransactionalIdAndProducerIdEpoch, Errors> onComplete) {
        for (TransactionStateManager.TransactionalIdAndProducerIdEpoch txnIdAndPidEpoch : this.txnManager.timedOutTransactions()) {
            this.txnManager.getTransactionState(txnIdAndPidEpoch.getTransactionalId()).map(option -> option.map(epochAndTxnMetadata -> {
                TransactionMetadata txnMetadata = epochAndTxnMetadata.getTransactionMetadata();
                Either transitMetadata = txnMetadata.inLock(() -> {
                    if (txnMetadata.getProducerId() != txnIdAndPidEpoch.getProducerId().longValue()) {
                        log.error("Found incorrect producerId when expiring transactionalId: {}. Expected producerId: {}. Found producerId: {}", new Object[]{txnIdAndPidEpoch.getTransactionalId(), txnIdAndPidEpoch.getProducerId(), txnMetadata.getProducerId()});
                        return Either.left(Errors.INVALID_PRODUCER_ID_MAPPING);
                    }
                    if (txnMetadata.pendingTransitionInProgress()) {
                        if (log.isDebugEnabled()) {
                            log.debug("Skipping abort of timed out transaction {} since there is a pending state transition", (Object)txnIdAndPidEpoch.getTransactionalId());
                        }
                        return Either.left(Errors.CONCURRENT_TRANSACTIONS);
                    }
                    return Either.right(txnMetadata.prepareFenceProducerEpoch());
                });
                if (transitMetadata.getRight() != null) {
                    TransactionMetadata.TxnTransitMetadata txnTransitMetadata = (TransactionMetadata.TxnTransitMetadata)transitMetadata.getRight();
                    this.endTransaction(txnMetadata.getTransactionalId(), txnTransitMetadata.getProducerId(), txnTransitMetadata.getProducerEpoch(), TransactionResult.ABORT, false, errors -> onComplete.accept(txnIdAndPidEpoch, (Errors)errors));
                }
                return null;
            }));
        }
    }

    @VisibleForTesting
    protected void abortTimedOutTransactions() {
        this.abortTimedOutTransactions(onEndTransactionComplete);
    }

    public CompletableFuture<Void> startup(boolean enableTransactionalIdExpiration) {
        log.info("Starting up transaction coordinator ...");
        this.scheduler.scheduleAtFixedRate((Runnable)SafeRunnable.safeRun(this::abortTimedOutTransactions, ex -> log.error("Uncaught exception in scheduled task transaction-abort", ex)), this.transactionConfig.getAbortTimedOutTransactionsIntervalMs(), this.transactionConfig.getAbortTimedOutTransactionsIntervalMs(), TimeUnit.MILLISECONDS);
        this.txnManager.startup(enableTransactionalIdExpiration);
        return this.producerIdManager.initialize().thenCompose(ignored -> {
            log.info("Startup transaction coordinator complete.");
            return CompletableFuture.completedFuture(null);
        });
    }

    public void shutdown() {
        log.info("Shutting down transaction coordinator ...");
        this.producerIdManager.shutdown();
        this.txnManager.shutdown();
        this.transactionMarkerChannelManager.close();
        this.scheduler.shutdown();
        log.info("Shutdown transaction coordinator complete.");
    }

    public TransactionStateManager getTxnManager() {
        return this.txnManager;
    }

    public static class InitProducerIdResult {
        private Long producerId;
        private Short producerEpoch;
        private Errors error;

        public Long getProducerId() {
            return this.producerId;
        }

        public Short getProducerEpoch() {
            return this.producerEpoch;
        }

        public Errors getError() {
            return this.error;
        }

        public void setProducerId(Long producerId) {
            this.producerId = producerId;
        }

        public void setProducerEpoch(Short producerEpoch) {
            this.producerEpoch = producerEpoch;
        }

        public void setError(Errors error) {
            this.error = error;
        }

        public String toString() {
            return "TransactionCoordinator.InitProducerIdResult(producerId=" + this.getProducerId() + ", producerEpoch=" + this.getProducerEpoch() + ", error=" + this.getError() + ")";
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof InitProducerIdResult)) {
                return false;
            }
            InitProducerIdResult other = (InitProducerIdResult)o;
            if (!other.canEqual(this)) {
                return false;
            }
            Long this$producerId = this.getProducerId();
            Long other$producerId = other.getProducerId();
            if (this$producerId == null ? other$producerId != null : !((Object)this$producerId).equals(other$producerId)) {
                return false;
            }
            Short this$producerEpoch = this.getProducerEpoch();
            Short other$producerEpoch = other.getProducerEpoch();
            if (this$producerEpoch == null ? other$producerEpoch != null : !((Object)this$producerEpoch).equals(other$producerEpoch)) {
                return false;
            }
            Errors this$error = this.getError();
            Errors other$error = other.getError();
            return !(this$error == null ? other$error != null : !this$error.equals(other$error));
        }

        protected boolean canEqual(Object other) {
            return other instanceof InitProducerIdResult;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            Long $producerId = this.getProducerId();
            result = result * 59 + ($producerId == null ? 43 : ((Object)$producerId).hashCode());
            Short $producerEpoch = this.getProducerEpoch();
            result = result * 59 + ($producerEpoch == null ? 43 : ((Object)$producerEpoch).hashCode());
            Errors $error = this.getError();
            result = result * 59 + ($error == null ? 43 : $error.hashCode());
            return result;
        }

        public InitProducerIdResult(Long producerId, Short producerEpoch, Errors error) {
            this.producerId = producerId;
            this.producerEpoch = producerEpoch;
            this.error = error;
        }
    }

    private static class EpochAndTxnTransitMetadata {
        private final int coordinatorEpoch;
        private final TransactionMetadata.TxnTransitMetadata txnTransitMetadata;

        public int getCoordinatorEpoch() {
            return this.coordinatorEpoch;
        }

        public TransactionMetadata.TxnTransitMetadata getTxnTransitMetadata() {
            return this.txnTransitMetadata;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof EpochAndTxnTransitMetadata)) {
                return false;
            }
            EpochAndTxnTransitMetadata other = (EpochAndTxnTransitMetadata)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getCoordinatorEpoch() != other.getCoordinatorEpoch()) {
                return false;
            }
            TransactionMetadata.TxnTransitMetadata this$txnTransitMetadata = this.getTxnTransitMetadata();
            TransactionMetadata.TxnTransitMetadata other$txnTransitMetadata = other.getTxnTransitMetadata();
            return !(this$txnTransitMetadata == null ? other$txnTransitMetadata != null : !((Object)this$txnTransitMetadata).equals(other$txnTransitMetadata));
        }

        protected boolean canEqual(Object other) {
            return other instanceof EpochAndTxnTransitMetadata;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getCoordinatorEpoch();
            TransactionMetadata.TxnTransitMetadata $txnTransitMetadata = this.getTxnTransitMetadata();
            result = result * 59 + ($txnTransitMetadata == null ? 43 : ((Object)$txnTransitMetadata).hashCode());
            return result;
        }

        public String toString() {
            return "TransactionCoordinator.EpochAndTxnTransitMetadata(coordinatorEpoch=" + this.getCoordinatorEpoch() + ", txnTransitMetadata=" + this.getTxnTransitMetadata() + ")";
        }

        public EpochAndTxnTransitMetadata(int coordinatorEpoch, TransactionMetadata.TxnTransitMetadata txnTransitMetadata) {
            this.coordinatorEpoch = coordinatorEpoch;
            this.txnTransitMetadata = txnTransitMetadata;
        }
    }

    private static class PreSendResult {
        private TransactionMetadata transactionMetadata;
        private TransactionMetadata.TxnTransitMetadata txnTransitMetadata;

        public PreSendResult(TransactionMetadata transactionMetadata, TransactionMetadata.TxnTransitMetadata txnTransitMetadata) {
            this.transactionMetadata = transactionMetadata;
            this.txnTransitMetadata = txnTransitMetadata;
        }

        public TransactionMetadata getTransactionMetadata() {
            return this.transactionMetadata;
        }

        public TransactionMetadata.TxnTransitMetadata getTxnTransitMetadata() {
            return this.txnTransitMetadata;
        }

        public void setTransactionMetadata(TransactionMetadata transactionMetadata) {
            this.transactionMetadata = transactionMetadata;
        }

        public void setTxnTransitMetadata(TransactionMetadata.TxnTransitMetadata txnTransitMetadata) {
            this.txnTransitMetadata = txnTransitMetadata;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof PreSendResult)) {
                return false;
            }
            PreSendResult other = (PreSendResult)o;
            if (!other.canEqual(this)) {
                return false;
            }
            TransactionMetadata this$transactionMetadata = this.getTransactionMetadata();
            TransactionMetadata other$transactionMetadata = other.getTransactionMetadata();
            if (this$transactionMetadata == null ? other$transactionMetadata != null : !((Object)this$transactionMetadata).equals(other$transactionMetadata)) {
                return false;
            }
            TransactionMetadata.TxnTransitMetadata this$txnTransitMetadata = this.getTxnTransitMetadata();
            TransactionMetadata.TxnTransitMetadata other$txnTransitMetadata = other.getTxnTransitMetadata();
            return !(this$txnTransitMetadata == null ? other$txnTransitMetadata != null : !((Object)this$txnTransitMetadata).equals(other$txnTransitMetadata));
        }

        protected boolean canEqual(Object other) {
            return other instanceof PreSendResult;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            TransactionMetadata $transactionMetadata = this.getTransactionMetadata();
            result = result * 59 + ($transactionMetadata == null ? 43 : ((Object)$transactionMetadata).hashCode());
            TransactionMetadata.TxnTransitMetadata $txnTransitMetadata = this.getTxnTransitMetadata();
            result = result * 59 + ($txnTransitMetadata == null ? 43 : ((Object)$txnTransitMetadata).hashCode());
            return result;
        }

        public String toString() {
            return "TransactionCoordinator.PreSendResult(transactionMetadata=" + this.getTransactionMetadata() + ", txnTransitMetadata=" + this.getTxnTransitMetadata() + ")";
        }
    }
}

