/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.internals.ProducerBatch;
import org.apache.kafka.clients.producer.internals.ProducerIdAndEpoch;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.PrimitiveRef;
import org.slf4j.Logger;

public class TransactionManager {
    private static final int NO_INFLIGHT_REQUEST_CORRELATION_ID = -1;
    private static final int NO_LAST_ACKED_SEQUENCE_NUMBER = -1;
    private final Logger log;
    private final String transactionalId;
    private final int transactionTimeoutMs;
    private final TopicPartitionBookkeeper topicPartitionBookkeeper;
    private final Map<TopicPartition, TxnOffsetCommitRequest.CommittedOffset> pendingTxnOffsetCommits;
    private final Set<TopicPartition> partitionsWithUnresolvedSequences;
    private final PriorityQueue<TxnRequestHandler> pendingRequests;
    private final Set<TopicPartition> newPartitionsInTransaction;
    private final Set<TopicPartition> pendingPartitionsInTransaction;
    private final Set<TopicPartition> partitionsInTransaction;
    private TransactionalRequestResult pendingResult;
    private final long retryBackoffMs;
    private static final long ADD_PARTITIONS_RETRY_BACKOFF_MS = 20L;
    private int inFlightRequestCorrelationId = -1;
    private Node transactionCoordinator;
    private Node consumerGroupCoordinator;
    private volatile State currentState = State.UNINITIALIZED;
    private volatile RuntimeException lastError = null;
    private volatile ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(-1L, -1);
    private volatile boolean transactionStarted = false;

    public TransactionManager(LogContext logContext, String transactionalId, int transactionTimeoutMs, long retryBackoffMs) {
        this.transactionalId = transactionalId;
        this.log = logContext.logger(TransactionManager.class);
        this.transactionTimeoutMs = transactionTimeoutMs;
        this.transactionCoordinator = null;
        this.consumerGroupCoordinator = null;
        this.newPartitionsInTransaction = new HashSet<TopicPartition>();
        this.pendingPartitionsInTransaction = new HashSet<TopicPartition>();
        this.partitionsInTransaction = new HashSet<TopicPartition>();
        this.pendingRequests = new PriorityQueue<TxnRequestHandler>(10, Comparator.comparingInt(o -> o.priority().priority));
        this.pendingTxnOffsetCommits = new HashMap<TopicPartition, TxnOffsetCommitRequest.CommittedOffset>();
        this.partitionsWithUnresolvedSequences = new HashSet<TopicPartition>();
        this.retryBackoffMs = retryBackoffMs;
        this.topicPartitionBookkeeper = new TopicPartitionBookkeeper();
    }

    TransactionManager() {
        this(new LogContext(), null, 0, 100L);
    }

    public synchronized TransactionalRequestResult initializeTransactions() {
        return this.handleCachedTransactionRequestResult(() -> {
            this.transitionTo(State.INITIALIZING);
            this.setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
            InitProducerIdRequestData requestData = new InitProducerIdRequestData().setTransactionalId(this.transactionalId).setTransactionTimeoutMs(this.transactionTimeoutMs);
            InitProducerIdHandler handler = new InitProducerIdHandler(new InitProducerIdRequest.Builder(requestData));
            this.enqueueRequest(handler);
            return handler.result;
        }, State.INITIALIZING);
    }

    public synchronized void beginTransaction() {
        this.ensureTransactional();
        this.maybeFailWithError();
        this.transitionTo(State.IN_TRANSACTION);
    }

    public synchronized TransactionalRequestResult beginCommit() {
        return this.handleCachedTransactionRequestResult(() -> {
            this.maybeFailWithError();
            this.transitionTo(State.COMMITTING_TRANSACTION);
            return this.beginCompletingTransaction(TransactionResult.COMMIT);
        }, State.COMMITTING_TRANSACTION);
    }

    public synchronized TransactionalRequestResult beginAbort() {
        return this.handleCachedTransactionRequestResult(() -> {
            if (this.currentState != State.ABORTABLE_ERROR) {
                this.maybeFailWithError();
            }
            this.transitionTo(State.ABORTING_TRANSACTION);
            this.newPartitionsInTransaction.clear();
            return this.beginCompletingTransaction(TransactionResult.ABORT);
        }, State.ABORTING_TRANSACTION);
    }

    private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
        if (!this.newPartitionsInTransaction.isEmpty()) {
            this.enqueueRequest(this.addPartitionsToTransactionHandler());
        }
        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(this.transactionalId, this.producerIdAndEpoch.producerId, this.producerIdAndEpoch.epoch, transactionResult);
        EndTxnHandler handler = new EndTxnHandler(builder);
        this.enqueueRequest(handler);
        return handler.result;
    }

    public synchronized TransactionalRequestResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
        this.ensureTransactional();
        this.maybeFailWithError();
        if (this.currentState != State.IN_TRANSACTION) {
            throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an active transaction");
        }
        this.log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, (Object)consumerGroupId);
        AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(this.transactionalId, this.producerIdAndEpoch.producerId, this.producerIdAndEpoch.epoch, consumerGroupId);
        AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
        this.enqueueRequest(handler);
        return handler.result;
    }

    public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
        if (this.isPartitionAdded(topicPartition) || this.isPartitionPendingAdd(topicPartition)) {
            return;
        }
        this.log.debug("Begin adding new partition {} to transaction", (Object)topicPartition);
        this.topicPartitionBookkeeper.addPartition(topicPartition);
        this.newPartitionsInTransaction.add(topicPartition);
    }

    RuntimeException lastError() {
        return this.lastError;
    }

    public synchronized void failIfNotReadyForSend() {
        if (this.hasError()) {
            throw new KafkaException("Cannot perform send because at least one previous transactional or idempotent request has failed with errors.", this.lastError);
        }
        if (this.isTransactional()) {
            if (!this.hasProducerId()) {
                throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.");
            }
            if (this.currentState != State.IN_TRANSACTION) {
                throw new IllegalStateException("Cannot call send in state " + (Object)((Object)this.currentState));
            }
        }
    }

    synchronized boolean isSendToPartitionAllowed(TopicPartition tp) {
        if (this.hasFatalError()) {
            return false;
        }
        return !this.isTransactional() || this.partitionsInTransaction.contains(tp);
    }

    public String transactionalId() {
        return this.transactionalId;
    }

    public boolean hasProducerId() {
        return this.producerIdAndEpoch.isValid();
    }

    public boolean isTransactional() {
        return this.transactionalId != null;
    }

    synchronized boolean hasPartitionsToAdd() {
        return !this.newPartitionsInTransaction.isEmpty() || !this.pendingPartitionsInTransaction.isEmpty();
    }

    synchronized boolean isCompleting() {
        return this.currentState == State.COMMITTING_TRANSACTION || this.currentState == State.ABORTING_TRANSACTION;
    }

    synchronized boolean hasError() {
        return this.currentState == State.ABORTABLE_ERROR || this.currentState == State.FATAL_ERROR;
    }

    synchronized boolean isAborting() {
        return this.currentState == State.ABORTING_TRANSACTION;
    }

    synchronized void transitionToAbortableError(RuntimeException exception) {
        if (this.currentState == State.ABORTING_TRANSACTION) {
            this.log.debug("Skipping transition to abortable error state since the transaction is already being aborted. Underlying exception: ", (Throwable)exception);
            return;
        }
        this.transitionTo(State.ABORTABLE_ERROR, exception);
    }

    synchronized void transitionToFatalError(RuntimeException exception) {
        this.transitionTo(State.FATAL_ERROR, exception);
    }

    synchronized boolean isPartitionAdded(TopicPartition partition) {
        return this.partitionsInTransaction.contains(partition);
    }

    synchronized boolean isPartitionPendingAdd(TopicPartition partition) {
        return this.newPartitionsInTransaction.contains(partition) || this.pendingPartitionsInTransaction.contains(partition);
    }

    ProducerIdAndEpoch producerIdAndEpoch() {
        return this.producerIdAndEpoch;
    }

    boolean hasProducerId(long producerId) {
        return this.producerIdAndEpoch.producerId == producerId;
    }

    boolean hasProducerIdAndEpoch(long producerId, short producerEpoch) {
        ProducerIdAndEpoch idAndEpoch = this.producerIdAndEpoch;
        return idAndEpoch.producerId == producerId && idAndEpoch.epoch == producerEpoch;
    }

    void setProducerIdAndEpoch(ProducerIdAndEpoch producerIdAndEpoch) {
        this.log.info("ProducerId set to {} with epoch {}", (Object)producerIdAndEpoch.producerId, (Object)producerIdAndEpoch.epoch);
        this.producerIdAndEpoch = producerIdAndEpoch;
    }

    synchronized void resetProducerId() {
        if (this.isTransactional()) {
            throw new IllegalStateException("Cannot reset producer state for a transactional producer. You must either abort the ongoing transaction or reinitialize the transactional producer instead");
        }
        this.setProducerIdAndEpoch(ProducerIdAndEpoch.NONE);
        this.topicPartitionBookkeeper.reset();
        this.partitionsWithUnresolvedSequences.clear();
    }

    synchronized void resetProducerIdIfNeeded() {
        if (this.shouldResetProducerStateAfterResolvingSequences()) {
            this.resetProducerId();
        }
    }

    synchronized Integer sequenceNumber(TopicPartition topicPartition) {
        if (!this.isTransactional()) {
            this.topicPartitionBookkeeper.addPartition(topicPartition);
        }
        return this.topicPartitionBookkeeper.getPartition(topicPartition).nextSequence;
    }

    synchronized void incrementSequenceNumber(TopicPartition topicPartition, int increment) {
        Integer currentSequence = this.sequenceNumber(topicPartition);
        currentSequence = DefaultRecordBatch.incrementSequence(currentSequence, increment);
        this.topicPartitionBookkeeper.getPartition(topicPartition).nextSequence = currentSequence;
    }

    synchronized void addInFlightBatch(ProducerBatch batch) {
        if (!batch.hasSequence()) {
            throw new IllegalStateException("Can't track batch for partition " + batch.topicPartition + " when sequence is not set.");
        }
        this.topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence.add(batch);
    }

    synchronized int firstInFlightSequence(TopicPartition topicPartition) {
        if (!this.hasInflightBatches(topicPartition)) {
            return -1;
        }
        SortedSet inflightBatches = this.topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence;
        if (inflightBatches.isEmpty()) {
            return -1;
        }
        return ((ProducerBatch)inflightBatches.first()).baseSequence();
    }

    synchronized ProducerBatch nextBatchBySequence(TopicPartition topicPartition) {
        SortedSet queue = this.topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence;
        return queue.isEmpty() ? null : (ProducerBatch)queue.first();
    }

    synchronized void removeInFlightBatch(ProducerBatch batch) {
        if (this.hasInflightBatches(batch.topicPartition)) {
            this.topicPartitionBookkeeper.getPartition(batch.topicPartition).inflightBatchesBySequence.remove(batch);
        }
    }

    private void maybeUpdateLastAckedSequence(TopicPartition topicPartition, int sequence) {
        if (sequence > this.lastAckedSequence(topicPartition).orElse(-1)) {
            this.topicPartitionBookkeeper.getPartition(topicPartition).lastAckedSequence = sequence;
        }
    }

    synchronized OptionalInt lastAckedSequence(TopicPartition topicPartition) {
        return this.topicPartitionBookkeeper.lastAckedSequence(topicPartition);
    }

    synchronized OptionalLong lastAckedOffset(TopicPartition topicPartition) {
        return this.topicPartitionBookkeeper.lastAckedOffset(topicPartition);
    }

    private void updateLastAckedOffset(ProduceResponse.PartitionResponse response, ProducerBatch batch) {
        if (response.baseOffset == -1L) {
            return;
        }
        long lastOffset = response.baseOffset + (long)batch.recordCount - 1L;
        OptionalLong lastAckedOffset = this.lastAckedOffset(batch.topicPartition);
        if (!lastAckedOffset.isPresent() && !this.isTransactional()) {
            this.topicPartitionBookkeeper.addPartition(batch.topicPartition);
        }
        if (lastOffset > lastAckedOffset.orElse(-1L)) {
            this.topicPartitionBookkeeper.getPartition(batch.topicPartition).lastAckedOffset = lastOffset;
        } else {
            this.log.trace("Partition {} keeps lastOffset at {}", (Object)batch.topicPartition, (Object)lastOffset);
        }
    }

    public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
        if (!this.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
            this.log.debug("Ignoring completed batch {} with producer id {}, epoch {}, and sequence number {} since the producerId has been reset internally", new Object[]{batch, batch.producerId(), batch.producerEpoch(), batch.baseSequence()});
            return;
        }
        this.maybeUpdateLastAckedSequence(batch.topicPartition, batch.baseSequence() + batch.recordCount - 1);
        this.log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}", new Object[]{batch.producerId(), batch.topicPartition, this.lastAckedSequence(batch.topicPartition).orElse(-1)});
        this.updateLastAckedOffset(response, batch);
        this.removeInFlightBatch(batch);
    }

    private void maybeTransitionToErrorState(RuntimeException exception) {
        if (exception instanceof ClusterAuthorizationException || exception instanceof TransactionalIdAuthorizationException || exception instanceof ProducerFencedException || exception instanceof UnsupportedVersionException) {
            this.transitionToFatalError(exception);
        } else if (this.isTransactional()) {
            this.transitionToAbortableError(exception);
        }
    }

    public synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) {
        this.maybeTransitionToErrorState(exception);
        if (!this.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
            this.log.debug("Ignoring failed batch {} with producer id {}, epoch {}, and sequence number {} since the producerId has been reset internally", new Object[]{batch, batch.producerId(), batch.producerEpoch(), batch.baseSequence(), exception});
            return;
        }
        if (exception instanceof OutOfOrderSequenceException && !this.isTransactional()) {
            this.log.error("The broker returned {} for topic-partition {} with producerId {}, epoch {}, and sequence number {}", new Object[]{exception, batch.topicPartition, batch.producerId(), batch.producerEpoch(), batch.baseSequence()});
            this.resetProducerId();
        } else {
            this.removeInFlightBatch(batch);
            if (adjustSequenceNumbers) {
                this.adjustSequencesDueToFailedBatch(batch);
            }
        }
    }

    private void adjustSequencesDueToFailedBatch(ProducerBatch batch) {
        if (!this.topicPartitionBookkeeper.contains(batch.topicPartition)) {
            return;
        }
        this.log.debug("producerId: {}, send to partition {} failed fatally. Reducing future sequence numbers by {}", new Object[]{batch.producerId(), batch.topicPartition, batch.recordCount});
        int currentSequence = this.sequenceNumber(batch.topicPartition);
        if ((currentSequence -= batch.recordCount) < 0) {
            throw new IllegalStateException("Sequence number for partition " + batch.topicPartition + " is going to become negative: " + currentSequence);
        }
        this.setNextSequence(batch.topicPartition, currentSequence);
        this.topicPartitionBookkeeper.getPartition(batch.topicPartition).resetSequenceNumbers(inFlightBatch -> {
            if (inFlightBatch.baseSequence() < batch.baseSequence()) {
                return;
            }
            int newSequence = inFlightBatch.baseSequence() - batch.recordCount;
            if (newSequence < 0) {
                throw new IllegalStateException("Sequence number for batch with sequence " + inFlightBatch.baseSequence() + " for partition " + batch.topicPartition + " is going to become negative: " + newSequence);
            }
            this.log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", new Object[]{inFlightBatch.baseSequence(), batch.topicPartition, newSequence});
            inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), newSequence, inFlightBatch.isTransactional());
        });
    }

    private void startSequencesAtBeginning(TopicPartition topicPartition) {
        PrimitiveRef.IntRef sequence = PrimitiveRef.ofInt(0);
        this.topicPartitionBookkeeper.getPartition(topicPartition).resetSequenceNumbers(inFlightBatch -> {
            this.log.info("Resetting sequence number of batch with current sequence {} for partition {} to {}", new Object[]{inFlightBatch.baseSequence(), inFlightBatch.topicPartition, sequence.value});
            inFlightBatch.resetProducerState(new ProducerIdAndEpoch(inFlightBatch.producerId(), inFlightBatch.producerEpoch()), sequence.value, inFlightBatch.isTransactional());
            sequence.value += inFlightBatch.recordCount;
        });
        this.setNextSequence(topicPartition, sequence.value);
        this.topicPartitionBookkeeper.getPartition(topicPartition).lastAckedSequence = -1;
    }

    private boolean hasInflightBatches(TopicPartition topicPartition) {
        return this.topicPartitionBookkeeper.contains(topicPartition) && !this.topicPartitionBookkeeper.getPartition(topicPartition).inflightBatchesBySequence.isEmpty();
    }

    synchronized boolean hasUnresolvedSequences() {
        return !this.partitionsWithUnresolvedSequences.isEmpty();
    }

    synchronized boolean hasUnresolvedSequence(TopicPartition topicPartition) {
        return this.partitionsWithUnresolvedSequences.contains(topicPartition);
    }

    synchronized void markSequenceUnresolved(TopicPartition topicPartition) {
        this.log.debug("Marking partition {} unresolved", (Object)topicPartition);
        this.partitionsWithUnresolvedSequences.add(topicPartition);
    }

    private boolean shouldResetProducerStateAfterResolvingSequences() {
        if (this.isTransactional()) {
            return false;
        }
        Iterator<TopicPartition> iter = this.partitionsWithUnresolvedSequences.iterator();
        while (iter.hasNext()) {
            TopicPartition topicPartition = iter.next();
            if (this.hasInflightBatches(topicPartition)) continue;
            if (this.isNextSequence(topicPartition, this.sequenceNumber(topicPartition))) {
                iter.remove();
                continue;
            }
            this.log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. Going to reset producer state.", new Object[]{topicPartition, this.lastAckedSequence(topicPartition).orElse(-1), this.sequenceNumber(topicPartition)});
            return true;
        }
        return false;
    }

    private boolean isNextSequence(TopicPartition topicPartition, int sequence) {
        return sequence - this.lastAckedSequence(topicPartition).orElse(-1) == 1;
    }

    private void setNextSequence(TopicPartition topicPartition, int sequence) {
        this.topicPartitionBookkeeper.getPartition(topicPartition).nextSequence = sequence;
    }

    synchronized TxnRequestHandler nextRequestHandler(boolean hasIncompleteBatches) {
        TxnRequestHandler nextRequestHandler;
        if (!this.newPartitionsInTransaction.isEmpty()) {
            this.enqueueRequest(this.addPartitionsToTransactionHandler());
        }
        if ((nextRequestHandler = this.pendingRequests.peek()) == null) {
            return null;
        }
        if (nextRequestHandler.isEndTxn() && hasIncompleteBatches) {
            return null;
        }
        this.pendingRequests.poll();
        if (this.maybeTerminateRequestWithError(nextRequestHandler)) {
            this.log.trace("Not sending transactional request {} because we are in an error state", nextRequestHandler.requestBuilder());
            return null;
        }
        if (nextRequestHandler.isEndTxn() && !this.transactionStarted) {
            nextRequestHandler.result.done();
            if (this.currentState != State.FATAL_ERROR) {
                this.log.debug("Not sending EndTxn for completed transaction since no partitions or offsets were successfully added");
                this.completeTransaction();
            }
            nextRequestHandler = this.pendingRequests.poll();
        }
        if (nextRequestHandler != null) {
            this.log.trace("Request {} dequeued for sending", nextRequestHandler.requestBuilder());
        }
        return nextRequestHandler;
    }

    synchronized void retry(TxnRequestHandler request) {
        request.setRetry();
        this.enqueueRequest(request);
    }

    synchronized void authenticationFailed(AuthenticationException e) {
        for (TxnRequestHandler request : this.pendingRequests) {
            request.fatalError(e);
        }
    }

    synchronized void close() {
        KafkaException shutdownException = new KafkaException("The producer closed forcefully");
        this.pendingRequests.forEach(handler -> handler.fatalError(shutdownException));
        if (this.pendingResult != null) {
            this.pendingResult.setError(shutdownException);
            this.pendingResult.done();
        }
    }

    Node coordinator(FindCoordinatorRequest.CoordinatorType type) {
        switch (type) {
            case GROUP: {
                return this.consumerGroupCoordinator;
            }
            case TRANSACTION: {
                return this.transactionCoordinator;
            }
        }
        throw new IllegalStateException("Received an invalid coordinator type: " + (Object)((Object)type));
    }

    void lookupCoordinator(TxnRequestHandler request) {
        this.lookupCoordinator(request.coordinatorType(), request.coordinatorKey());
    }

    void setInFlightCorrelationId(int correlationId) {
        this.inFlightRequestCorrelationId = correlationId;
    }

    private void clearInFlightCorrelationId() {
        this.inFlightRequestCorrelationId = -1;
    }

    boolean hasInFlightTransactionalRequest() {
        return this.inFlightRequestCorrelationId != -1;
    }

    boolean hasFatalError() {
        return this.currentState == State.FATAL_ERROR;
    }

    boolean hasAbortableError() {
        return this.currentState == State.ABORTABLE_ERROR;
    }

    synchronized boolean transactionContainsPartition(TopicPartition topicPartition) {
        return this.partitionsInTransaction.contains(topicPartition);
    }

    synchronized boolean hasPendingOffsetCommits() {
        return !this.pendingTxnOffsetCommits.isEmpty();
    }

    synchronized boolean hasPendingRequests() {
        return !this.pendingRequests.isEmpty();
    }

    synchronized boolean hasOngoingTransaction() {
        return this.currentState == State.IN_TRANSACTION || this.isCompleting() || this.hasAbortableError();
    }

    synchronized boolean canRetry(ProduceResponse.PartitionResponse response, ProducerBatch batch) {
        if (!this.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
            return false;
        }
        Errors error = response.error;
        if (!(error != Errors.OUT_OF_ORDER_SEQUENCE_NUMBER || this.hasUnresolvedSequence(batch.topicPartition) || !batch.sequenceHasBeenReset() && this.isNextSequence(batch.topicPartition, batch.baseSequence()))) {
            return true;
        }
        if (error == Errors.UNKNOWN_PRODUCER_ID) {
            if (response.logStartOffset == -1L) {
                return true;
            }
            if (batch.sequenceHasBeenReset()) {
                return true;
            }
            if (this.lastAckedOffset(batch.topicPartition).orElse(-1L) < response.logStartOffset) {
                this.startSequencesAtBeginning(batch.topicPartition);
                return true;
            }
        }
        return false;
    }

    synchronized boolean isReady() {
        return this.isTransactional() && this.currentState == State.READY;
    }

    private void transitionTo(State target) {
        this.transitionTo(target, null);
    }

    private void transitionTo(State target, RuntimeException error) {
        if (!this.currentState.isTransitionValid(this.currentState, target)) {
            String idString = this.transactionalId == null ? "" : "TransactionalId " + this.transactionalId + ": ";
            throw new KafkaException(idString + "Invalid transition attempted from state " + this.currentState.name() + " to state " + target.name());
        }
        if (target == State.FATAL_ERROR || target == State.ABORTABLE_ERROR) {
            if (error == null) {
                throw new IllegalArgumentException("Cannot transition to " + (Object)((Object)target) + " with a null exception");
            }
            this.lastError = error;
        } else {
            this.lastError = null;
        }
        if (this.lastError != null) {
            this.log.debug("Transition from state {} to error state {}", new Object[]{this.currentState, target, this.lastError});
        } else {
            this.log.debug("Transition from state {} to {}", (Object)this.currentState, (Object)target);
        }
        this.currentState = target;
    }

    private void ensureTransactional() {
        if (!this.isTransactional()) {
            throw new IllegalStateException("Transactional method invoked on a non-transactional producer.");
        }
    }

    private void maybeFailWithError() {
        if (this.hasError()) {
            if (this.lastError instanceof ProducerFencedException) {
                throw new ProducerFencedException("The producer has been rejected from the broker because it tried to use an old epoch with the transactionalId");
            }
            throw new KafkaException("Cannot execute transactional method because we are in an error state", this.lastError);
        }
    }

    private boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
        if (this.hasError()) {
            if (this.hasAbortableError() && requestHandler instanceof FindCoordinatorHandler) {
                return false;
            }
            requestHandler.fail(this.lastError);
            return true;
        }
        return false;
    }

    private void enqueueRequest(TxnRequestHandler requestHandler) {
        this.log.debug("Enqueuing transactional request {}", requestHandler.requestBuilder());
        this.pendingRequests.add(requestHandler);
    }

    private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, String coordinatorKey) {
        switch (type) {
            case GROUP: {
                this.consumerGroupCoordinator = null;
                break;
            }
            case TRANSACTION: {
                this.transactionCoordinator = null;
                break;
            }
            default: {
                throw new IllegalStateException("Invalid coordinator type: " + (Object)((Object)type));
            }
        }
        FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData().setKeyType(type.id()).setKey(coordinatorKey));
        this.enqueueRequest(new FindCoordinatorHandler(builder));
    }

    private void completeTransaction() {
        this.transitionTo(State.READY);
        this.lastError = null;
        this.transactionStarted = false;
        this.newPartitionsInTransaction.clear();
        this.pendingPartitionsInTransaction.clear();
        this.partitionsInTransaction.clear();
    }

    private TxnRequestHandler addPartitionsToTransactionHandler() {
        this.pendingPartitionsInTransaction.addAll(this.newPartitionsInTransaction);
        this.newPartitionsInTransaction.clear();
        AddPartitionsToTxnRequest.Builder builder = new AddPartitionsToTxnRequest.Builder(this.transactionalId, this.producerIdAndEpoch.producerId, this.producerIdAndEpoch.epoch, new ArrayList<TopicPartition>(this.pendingPartitionsInTransaction));
        return new AddPartitionsToTxnHandler(builder);
    }

    private TxnOffsetCommitHandler txnOffsetCommitHandler(TransactionalRequestResult result, Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            OffsetAndMetadata offsetAndMetadata = entry.getValue();
            TxnOffsetCommitRequest.CommittedOffset committedOffset = new TxnOffsetCommitRequest.CommittedOffset(offsetAndMetadata.offset(), offsetAndMetadata.metadata(), offsetAndMetadata.leaderEpoch());
            this.pendingTxnOffsetCommits.put(entry.getKey(), committedOffset);
        }
        TxnOffsetCommitRequest.Builder builder = new TxnOffsetCommitRequest.Builder(new TxnOffsetCommitRequestData().setTransactionalId(this.transactionalId).setGroupId(consumerGroupId).setProducerId(this.producerIdAndEpoch.producerId).setProducerEpoch(this.producerIdAndEpoch.epoch).setTopics(TxnOffsetCommitRequest.getTopics(this.pendingTxnOffsetCommits)));
        return new TxnOffsetCommitHandler(result, builder);
    }

    private TransactionalRequestResult handleCachedTransactionRequestResult(Supplier<TransactionalRequestResult> transactionalRequestResultSupplier, State targetState) {
        this.ensureTransactional();
        if (this.pendingResult != null && this.currentState == targetState) {
            TransactionalRequestResult result = this.pendingResult;
            if (result.isCompleted()) {
                this.pendingResult = null;
            }
            return result;
        }
        this.pendingResult = transactionalRequestResultSupplier.get();
        return this.pendingResult;
    }

    private class TxnOffsetCommitHandler
    extends TxnRequestHandler {
        private final TxnOffsetCommitRequest.Builder builder;

        private TxnOffsetCommitHandler(TransactionalRequestResult result, TxnOffsetCommitRequest.Builder builder) {
            super(result);
            this.builder = builder;
        }

        TxnOffsetCommitRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override
        Priority priority() {
            return Priority.ADD_PARTITIONS_OR_OFFSETS;
        }

        @Override
        FindCoordinatorRequest.CoordinatorType coordinatorType() {
            return FindCoordinatorRequest.CoordinatorType.GROUP;
        }

        @Override
        String coordinatorKey() {
            return this.builder.data.groupId();
        }

        @Override
        public void handleResponse(AbstractResponse response) {
            TxnOffsetCommitResponse txnOffsetCommitResponse = (TxnOffsetCommitResponse)response;
            boolean coordinatorReloaded = false;
            Map<TopicPartition, Errors> errors2 = txnOffsetCommitResponse.errors();
            TransactionManager.this.log.debug("Received TxnOffsetCommit response for consumer group {}: {}", (Object)this.builder.data.groupId(), errors2);
            for (Map.Entry<TopicPartition, Errors> entry : errors2.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                Errors error = entry.getValue();
                if (error == Errors.NONE) {
                    TransactionManager.this.pendingTxnOffsetCommits.remove(topicPartition);
                    continue;
                }
                if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) {
                    if (coordinatorReloaded) continue;
                    coordinatorReloaded = true;
                    TransactionManager.this.lookupCoordinator(FindCoordinatorRequest.CoordinatorType.GROUP, this.builder.data.groupId());
                    continue;
                }
                if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION || error == Errors.COORDINATOR_LOAD_IN_PROGRESS) continue;
                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    this.abortableError(GroupAuthorizationException.forGroupId(this.builder.data.groupId()));
                    break;
                }
                if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED || error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
                    this.fatalError(error.exception());
                    break;
                }
                this.fatalError(new KafkaException("Unexpected error in TxnOffsetCommitResponse: " + error.message()));
                break;
            }
            if (this.result.isCompleted()) {
                TransactionManager.this.pendingTxnOffsetCommits.clear();
            } else if (TransactionManager.this.pendingTxnOffsetCommits.isEmpty()) {
                this.result.done();
            } else {
                this.reenqueue();
            }
        }
    }

    private class AddOffsetsToTxnHandler
    extends TxnRequestHandler {
        private final AddOffsetsToTxnRequest.Builder builder;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        private AddOffsetsToTxnHandler(AddOffsetsToTxnRequest.Builder builder, Map<TopicPartition, OffsetAndMetadata> offsets) {
            super("AddOffsetsToTxn");
            this.builder = builder;
            this.offsets = offsets;
        }

        AddOffsetsToTxnRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override
        Priority priority() {
            return Priority.ADD_PARTITIONS_OR_OFFSETS;
        }

        @Override
        public void handleResponse(AbstractResponse response) {
            AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse)response;
            Errors error = addOffsetsToTxnResponse.error();
            if (error == Errors.NONE) {
                TransactionManager.this.log.debug("Successfully added partition for consumer group {} to transaction", (Object)this.builder.consumerGroupId());
                TransactionManager.this.pendingRequests.add(TransactionManager.this.txnOffsetCommitHandler(this.result, this.offsets, this.builder.consumerGroupId()));
                TransactionManager.this.transactionStarted = true;
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                TransactionManager.this.lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionManager.this.transactionalId);
                this.reenqueue();
            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                this.reenqueue();
            } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
                this.fatalError(error.exception());
            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                this.fatalError(error.exception());
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                this.abortableError(GroupAuthorizationException.forGroupId(this.builder.consumerGroupId()));
            } else {
                this.fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
            }
        }
    }

    private class EndTxnHandler
    extends TxnRequestHandler {
        private final EndTxnRequest.Builder builder;

        private EndTxnHandler(EndTxnRequest.Builder builder) {
            super("EndTxn(" + (Object)((Object)builder.result()) + ")");
            this.builder = builder;
        }

        EndTxnRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override
        Priority priority() {
            return Priority.END_TXN;
        }

        @Override
        boolean isEndTxn() {
            return true;
        }

        @Override
        public void handleResponse(AbstractResponse response) {
            EndTxnResponse endTxnResponse = (EndTxnResponse)response;
            Errors error = endTxnResponse.error();
            if (error == Errors.NONE) {
                TransactionManager.this.completeTransaction();
                this.result.done();
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                TransactionManager.this.lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionManager.this.transactionalId);
                this.reenqueue();
            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                this.reenqueue();
            } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
                this.fatalError(error.exception());
            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                this.fatalError(error.exception());
            } else if (error == Errors.INVALID_TXN_STATE) {
                this.fatalError(error.exception());
            } else {
                this.fatalError(new KafkaException("Unhandled error in EndTxnResponse: " + error.message()));
            }
        }
    }

    private class FindCoordinatorHandler
    extends TxnRequestHandler {
        private final FindCoordinatorRequest.Builder builder;

        private FindCoordinatorHandler(FindCoordinatorRequest.Builder builder) {
            super("FindCoordinator");
            this.builder = builder;
        }

        FindCoordinatorRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override
        Priority priority() {
            return Priority.FIND_COORDINATOR;
        }

        @Override
        FindCoordinatorRequest.CoordinatorType coordinatorType() {
            return null;
        }

        @Override
        String coordinatorKey() {
            return null;
        }

        @Override
        public void handleResponse(AbstractResponse response) {
            FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse)response;
            Errors error = findCoordinatorResponse.error();
            FindCoordinatorRequest.CoordinatorType coordinatorType = FindCoordinatorRequest.CoordinatorType.forId(this.builder.data().keyType());
            if (error == Errors.NONE) {
                Node node = findCoordinatorResponse.node();
                switch (coordinatorType) {
                    case GROUP: {
                        TransactionManager.this.consumerGroupCoordinator = node;
                        break;
                    }
                    case TRANSACTION: {
                        TransactionManager.this.transactionCoordinator = node;
                    }
                }
                this.result.done();
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
                this.reenqueue();
            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                this.fatalError(error.exception());
            } else if (findCoordinatorResponse.error() == Errors.GROUP_AUTHORIZATION_FAILED) {
                this.abortableError(GroupAuthorizationException.forGroupId(this.builder.data().key()));
            } else {
                this.fatalError(new KafkaException(String.format("Could not find a coordinator with type %s with key %s due tounexpected error: %s", new Object[]{coordinatorType, this.builder.data().key(), findCoordinatorResponse.data().errorMessage()})));
            }
        }
    }

    private class AddPartitionsToTxnHandler
    extends TxnRequestHandler {
        private final AddPartitionsToTxnRequest.Builder builder;
        private long retryBackoffMs;

        private AddPartitionsToTxnHandler(AddPartitionsToTxnRequest.Builder builder) {
            super("AddPartitionsToTxn");
            this.builder = builder;
            this.retryBackoffMs = TransactionManager.this.retryBackoffMs;
        }

        AddPartitionsToTxnRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override
        Priority priority() {
            return Priority.ADD_PARTITIONS_OR_OFFSETS;
        }

        @Override
        public void handleResponse(AbstractResponse response) {
            AddPartitionsToTxnResponse addPartitionsToTxnResponse = (AddPartitionsToTxnResponse)response;
            Map<TopicPartition, Errors> errors2 = addPartitionsToTxnResponse.errors();
            boolean hasPartitionErrors = false;
            HashSet<String> unauthorizedTopics = new HashSet<String>();
            this.retryBackoffMs = TransactionManager.this.retryBackoffMs;
            for (Map.Entry<TopicPartition, Errors> topicPartitionErrorEntry : errors2.entrySet()) {
                TopicPartition topicPartition = topicPartitionErrorEntry.getKey();
                Errors error = topicPartitionErrorEntry.getValue();
                if (error == Errors.NONE) continue;
                if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
                    TransactionManager.this.lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionManager.this.transactionalId);
                    this.reenqueue();
                    return;
                }
                if (error == Errors.CONCURRENT_TRANSACTIONS) {
                    this.maybeOverrideRetryBackoffMs();
                    this.reenqueue();
                    return;
                }
                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                    this.reenqueue();
                    return;
                }
                if (error == Errors.INVALID_PRODUCER_EPOCH) {
                    this.fatalError(error.exception());
                    return;
                }
                if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                    this.fatalError(error.exception());
                    return;
                }
                if (error == Errors.INVALID_PRODUCER_ID_MAPPING || error == Errors.INVALID_TXN_STATE) {
                    this.fatalError(new KafkaException(error.exception()));
                    return;
                }
                if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                    unauthorizedTopics.add(topicPartition.topic());
                    continue;
                }
                if (error == Errors.OPERATION_NOT_ATTEMPTED) {
                    TransactionManager.this.log.debug("Did not attempt to add partition {} to transaction because other partitions in the batch had errors.", (Object)topicPartition);
                    hasPartitionErrors = true;
                    continue;
                }
                TransactionManager.this.log.error("Could not add partition {} due to unexpected error {}", (Object)topicPartition, (Object)error);
                hasPartitionErrors = true;
            }
            Set<TopicPartition> partitions = errors2.keySet();
            TransactionManager.this.pendingPartitionsInTransaction.removeAll(partitions);
            if (!unauthorizedTopics.isEmpty()) {
                this.abortableError(new TopicAuthorizationException(unauthorizedTopics));
            } else if (hasPartitionErrors) {
                this.abortableError(new KafkaException("Could not add partitions to transaction due to errors: " + errors2));
            } else {
                TransactionManager.this.log.debug("Successfully added partitions {} to transaction", partitions);
                TransactionManager.this.partitionsInTransaction.addAll(partitions);
                TransactionManager.this.transactionStarted = true;
                this.result.done();
            }
        }

        @Override
        public long retryBackoffMs() {
            return Math.min(TransactionManager.this.retryBackoffMs, this.retryBackoffMs);
        }

        private void maybeOverrideRetryBackoffMs() {
            if (TransactionManager.this.partitionsInTransaction.isEmpty()) {
                this.retryBackoffMs = 20L;
            }
        }
    }

    private class InitProducerIdHandler
    extends TxnRequestHandler {
        private final InitProducerIdRequest.Builder builder;

        private InitProducerIdHandler(InitProducerIdRequest.Builder builder) {
            super("InitProducerId");
            this.builder = builder;
        }

        InitProducerIdRequest.Builder requestBuilder() {
            return this.builder;
        }

        @Override
        Priority priority() {
            return Priority.INIT_PRODUCER_ID;
        }

        @Override
        public void handleResponse(AbstractResponse response) {
            InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse)response;
            Errors error = initProducerIdResponse.error();
            if (error == Errors.NONE) {
                ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data.producerId(), initProducerIdResponse.data.producerEpoch());
                TransactionManager.this.setProducerIdAndEpoch(producerIdAndEpoch);
                TransactionManager.this.transitionTo(State.READY);
                TransactionManager.this.lastError = null;
                this.result.done();
            } else if (error == Errors.NOT_COORDINATOR || error == Errors.COORDINATOR_NOT_AVAILABLE) {
                TransactionManager.this.lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, TransactionManager.this.transactionalId);
                this.reenqueue();
            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
                this.reenqueue();
            } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
                this.fatalError(error.exception());
            } else {
                this.fatalError(new KafkaException("Unexpected error in InitProducerIdResponse; " + error.message()));
            }
        }
    }

    abstract class TxnRequestHandler
    implements RequestCompletionHandler {
        protected final TransactionalRequestResult result;
        private boolean isRetry = false;

        TxnRequestHandler(TransactionalRequestResult result) {
            this.result = result;
        }

        TxnRequestHandler(String operation) {
            this(new TransactionalRequestResult(operation));
        }

        void fatalError(RuntimeException e) {
            this.result.setError(e);
            TransactionManager.this.transitionToFatalError(e);
            this.result.done();
        }

        void abortableError(RuntimeException e) {
            this.result.setError(e);
            TransactionManager.this.transitionToAbortableError(e);
            this.result.done();
        }

        void fail(RuntimeException e) {
            this.result.setError(e);
            this.result.done();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void reenqueue() {
            TransactionManager transactionManager = TransactionManager.this;
            synchronized (transactionManager) {
                this.isRetry = true;
                TransactionManager.this.enqueueRequest(this);
            }
        }

        long retryBackoffMs() {
            return TransactionManager.this.retryBackoffMs;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onComplete(ClientResponse response) {
            if (response.requestHeader().correlationId() != TransactionManager.this.inFlightRequestCorrelationId) {
                this.fatalError(new RuntimeException("Detected more than one in-flight transactional request."));
            } else {
                TransactionManager.this.clearInFlightCorrelationId();
                if (response.wasDisconnected()) {
                    TransactionManager.this.log.debug("Disconnected from {}. Will retry.", (Object)response.destination());
                    if (this.needsCoordinator()) {
                        TransactionManager.this.lookupCoordinator(this.coordinatorType(), this.coordinatorKey());
                    }
                    this.reenqueue();
                } else if (response.versionMismatch() != null) {
                    this.fatalError(response.versionMismatch());
                } else if (response.hasResponse()) {
                    TransactionManager.this.log.trace("Received transactional response {} for request {}", (Object)response.responseBody(), this.requestBuilder());
                    TransactionManager transactionManager = TransactionManager.this;
                    synchronized (transactionManager) {
                        this.handleResponse(response.responseBody());
                    }
                } else {
                    this.fatalError(new KafkaException("Could not execute transactional request for unknown reasons"));
                }
            }
        }

        boolean needsCoordinator() {
            return this.coordinatorType() != null;
        }

        FindCoordinatorRequest.CoordinatorType coordinatorType() {
            return FindCoordinatorRequest.CoordinatorType.TRANSACTION;
        }

        String coordinatorKey() {
            return TransactionManager.this.transactionalId;
        }

        void setRetry() {
            this.isRetry = true;
        }

        boolean isRetry() {
            return this.isRetry;
        }

        boolean isEndTxn() {
            return false;
        }

        abstract AbstractRequest.Builder<?> requestBuilder();

        abstract void handleResponse(AbstractResponse var1);

        abstract Priority priority();
    }

    private static enum Priority {
        FIND_COORDINATOR(0),
        INIT_PRODUCER_ID(1),
        ADD_PARTITIONS_OR_OFFSETS(2),
        END_TXN(3);

        final int priority;

        private Priority(int priority) {
            this.priority = priority;
        }
    }

    private static enum State {
        UNINITIALIZED,
        INITIALIZING,
        READY,
        IN_TRANSACTION,
        COMMITTING_TRANSACTION,
        ABORTING_TRANSACTION,
        ABORTABLE_ERROR,
        FATAL_ERROR;


        private boolean isTransitionValid(State source, State target) {
            switch (target) {
                case INITIALIZING: {
                    return source == UNINITIALIZED;
                }
                case READY: {
                    return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
                }
                case IN_TRANSACTION: {
                    return source == READY;
                }
                case COMMITTING_TRANSACTION: {
                    return source == IN_TRANSACTION;
                }
                case ABORTING_TRANSACTION: {
                    return source == IN_TRANSACTION || source == ABORTABLE_ERROR;
                }
                case ABORTABLE_ERROR: {
                    return source == IN_TRANSACTION || source == COMMITTING_TRANSACTION || source == ABORTABLE_ERROR;
                }
            }
            return true;
        }
    }

    private static class TopicPartitionEntry {
        private int nextSequence = 0;
        private int lastAckedSequence = -1;
        private SortedSet<ProducerBatch> inflightBatchesBySequence = new TreeSet<ProducerBatch>(Comparator.comparingInt(ProducerBatch::baseSequence));
        private long lastAckedOffset = -1L;

        TopicPartitionEntry() {
        }

        public void resetSequenceNumbers(Consumer<ProducerBatch> resetSequence) {
            TreeSet<ProducerBatch> newInflights = new TreeSet<ProducerBatch>(Comparator.comparingInt(ProducerBatch::baseSequence));
            for (ProducerBatch inflightBatch : this.inflightBatchesBySequence) {
                resetSequence.accept(inflightBatch);
                newInflights.add(inflightBatch);
            }
            this.inflightBatchesBySequence = newInflights;
        }
    }

    private static class TopicPartitionBookkeeper {
        private final Map<TopicPartition, TopicPartitionEntry> topicPartitionBookkeeping = new HashMap<TopicPartition, TopicPartitionEntry>();

        private TopicPartitionBookkeeper() {
        }

        public TopicPartitionEntry getPartition(TopicPartition topic) {
            TopicPartitionEntry ent = this.topicPartitionBookkeeping.get(topic);
            if (ent == null) {
                throw new IllegalStateException("Trying to get the sequence number for " + topic + ", but the sequence number was never set for this partition.");
            }
            return ent;
        }

        public void addPartition(TopicPartition topic) {
            if (!this.topicPartitionBookkeeping.containsKey(topic)) {
                this.topicPartitionBookkeeping.put(topic, new TopicPartitionEntry());
            }
        }

        boolean contains(TopicPartition partition) {
            return this.topicPartitionBookkeeping.containsKey(partition);
        }

        public void reset() {
            this.topicPartitionBookkeeping.clear();
        }

        OptionalLong lastAckedOffset(TopicPartition partition) {
            TopicPartitionEntry entry = this.topicPartitionBookkeeping.get(partition);
            if (entry != null && entry.lastAckedOffset != -1L) {
                return OptionalLong.of(entry.lastAckedOffset);
            }
            return OptionalLong.empty();
        }

        OptionalInt lastAckedSequence(TopicPartition partition) {
            TopicPartitionEntry entry = this.topicPartitionBookkeeping.get(partition);
            if (entry != null && entry.lastAckedSequence != -1) {
                return OptionalInt.of(entry.lastAckedSequence);
            }
            return OptionalInt.empty();
        }
    }
}

