/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import io.streamnative.pulsar.handlers.kop.KopBrokerLookupManager;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.ResponseContext;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMarkerChannelManager;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionMarkerRequestCompletionHandler
implements Consumer<ResponseContext> {
    private static final Logger log = LoggerFactory.getLogger(TransactionMarkerRequestCompletionHandler.class);
    private final TransactionStateManager txnStateManager;
    private final TransactionMarkerChannelManager txnMarkerChannelManager;
    private final List<TransactionMarkerChannelManager.TxnIdAndMarkerEntry> txnIdAndMarkerEntries;
    private final String namespacePrefixForUserTopics;

    @Override
    public void accept(ResponseContext responseContext) {
        WriteTxnMarkersResponse writeTxnMarkerResponse = (WriteTxnMarkersResponse)responseContext.getResponse();
        if (log.isDebugEnabled()) {
            log.debug("Received WriteTxnMarker response {} from node {} with correlation id {}", new Object[]{responseContext.getResponseDescription(), responseContext.getRemoteAddress(), responseContext.getCorrelationId()});
        }
        this.txnIdAndMarkerEntries.forEach(txnIdAndMarker -> {
            String transactionalId = txnIdAndMarker.getTransactionalId();
            WriteTxnMarkersRequest.TxnMarkerEntry txnMarker = txnIdAndMarker.getEntry();
            Map errors = (Map)writeTxnMarkerResponse.errorsByProducerId().get(txnMarker.producerId());
            if (errors == null) {
                throw new IllegalStateException("WriteTxnMarkerResponse does not contain expected error map for producer id " + txnMarker.producerId());
            }
            Either<Errors, Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> errorsAndData = this.txnStateManager.getTransactionState(transactionalId);
            if (errorsAndData.isLeft()) {
                switch (errorsAndData.getLeft()) {
                    case NOT_COORDINATOR: {
                        log.info("I am no longer the coordinator for {}; cancel sending transaction markers {} to the brokers", (Object)transactionalId, (Object)txnMarker);
                        this.txnMarkerChannelManager.removeMarkersForTxnId(transactionalId);
                        break;
                    }
                    case COORDINATOR_LOAD_IN_PROGRESS: {
                        log.info("I am loading the transaction partition that contains {} which means the current markers have to be obsoleted; cancel sending transaction markers {} to the brokers", (Object)transactionalId, (Object)txnMarker);
                        this.txnMarkerChannelManager.removeMarkersForTxnId(transactionalId);
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unhandled error " + errorsAndData.getLeft() + " when fetching current transaction state");
                    }
                }
                return;
            }
            if (!errorsAndData.getRight().isPresent()) {
                throw new IllegalStateException("The coordinator still owns the transaction partition for " + transactionalId + ", but there is no metadata in the cache; this is not expected");
            }
            this.tryAddTxnMarker(transactionalId, txnMarker, errors, errorsAndData.getRight().get());
        });
    }

    private void tryAddTxnMarker(String transactionalId, WriteTxnMarkersRequest.TxnMarkerEntry txnMarker, Map<TopicPartition, Errors> errors, TransactionStateManager.CoordinatorEpochAndTxnMetadata epochAndMetadata) {
        AbortSendingRetryPartitions abortSendOrRetryPartitions = this.hasAbortSendOrRetryPartitions(transactionalId, txnMarker, epochAndMetadata, errors);
        if (abortSendOrRetryPartitions.abortSending.get()) {
            return;
        }
        if (abortSendOrRetryPartitions.retryPartitions.isEmpty()) {
            this.txnMarkerChannelManager.maybeWriteTxnCompletion(transactionalId);
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Re-enqueuing {} transaction markers for transactional id {} under coordinator epoch {}", new Object[]{txnMarker.transactionResult(), transactionalId, txnMarker.coordinatorEpoch()});
        }
        this.txnMarkerChannelManager.addTxnMarkersToBrokerQueue(transactionalId, txnMarker.producerId(), txnMarker.producerEpoch(), txnMarker.transactionResult(), txnMarker.coordinatorEpoch(), abortSendOrRetryPartitions.retryPartitions, this.namespacePrefixForUserTopics);
    }

    private AbortSendingRetryPartitions hasAbortSendOrRetryPartitions(String transactionalId, WriteTxnMarkersRequest.TxnMarkerEntry txnMarker, TransactionStateManager.CoordinatorEpochAndTxnMetadata epochAndMetadata, Map<TopicPartition, Errors> errors) {
        TransactionMetadata txnMetadata = epochAndMetadata.getTransactionMetadata();
        AbortSendingRetryPartitions abortSendingAndRetryPartitions = new AbortSendingRetryPartitions();
        if (epochAndMetadata.getCoordinatorEpoch().intValue() != txnMarker.coordinatorEpoch()) {
            log.info("Transaction coordinator epoch for {} has changed from {} to {}; cancel sending transaction markers {} to the brokers", new Object[]{transactionalId, txnMarker.coordinatorEpoch(), epochAndMetadata.getCoordinatorEpoch(), txnMarker});
            this.txnMarkerChannelManager.removeMarkersForTxnId(transactionalId);
            abortSendingAndRetryPartitions.abortSending.set(true);
        } else {
            txnMetadata.inLock(() -> {
                block8: for (Map.Entry errorsEntry : errors.entrySet()) {
                    TopicPartition topicPartition = (TopicPartition)errorsEntry.getKey();
                    Errors error = (Errors)errorsEntry.getValue();
                    switch (error) {
                        case NONE: {
                            txnMetadata.removePartition(topicPartition);
                            continue block8;
                        }
                        case CORRUPT_MESSAGE: 
                        case MESSAGE_TOO_LARGE: 
                        case RECORD_LIST_TOO_LARGE: 
                        case INVALID_REQUIRED_ACKS: {
                            throw new IllegalStateException("Received fatal error " + error.exceptionName() + " while sending txn marker for " + transactionalId);
                        }
                        case UNKNOWN_TOPIC_OR_PARTITION: 
                        case NOT_ENOUGH_REPLICAS: 
                        case NOT_ENOUGH_REPLICAS_AFTER_APPEND: 
                        case REQUEST_TIMED_OUT: 
                        case KAFKA_STORAGE_ERROR: {
                            log.info("Sending {}'s transaction marker for partition {} has failed with error {}, retrying with current coordinator epoch {}", new Object[]{transactionalId, topicPartition, error.exceptionName(), epochAndMetadata.getCoordinatorEpoch()});
                            abortSendingAndRetryPartitions.retryPartitions.add(topicPartition);
                            continue block8;
                        }
                        case LEADER_NOT_AVAILABLE: 
                        case NOT_LEADER_OR_FOLLOWER: {
                            log.info("Sending {}'s transaction marker for partition {} has failed with error {}, retrying with current coordinator epoch {} and invalidating cache", new Object[]{transactionalId, topicPartition, error.exceptionName(), epochAndMetadata.getCoordinatorEpoch()});
                            KopBrokerLookupManager.removeTopicManagerCache(KopTopic.toString(topicPartition, this.namespacePrefixForUserTopics));
                            abortSendingAndRetryPartitions.retryPartitions.add(topicPartition);
                            continue block8;
                        }
                        case INVALID_PRODUCER_EPOCH: 
                        case TRANSACTION_COORDINATOR_FENCED: {
                            log.info("Sending {}'s transaction marker for partition {} has permanently failed with error {} with the current coordinator epoch {}; cancel sending any more transaction markers {} to the brokers", new Object[]{transactionalId, topicPartition, error.exceptionName(), epochAndMetadata.getCoordinatorEpoch(), txnMarker});
                            this.txnMarkerChannelManager.removeMarkersForTxnId(transactionalId);
                            abortSendingAndRetryPartitions.abortSending.set(true);
                            continue block8;
                        }
                        case UNSUPPORTED_FOR_MESSAGE_FORMAT: 
                        case UNSUPPORTED_VERSION: {
                            log.info("Sending {}'s transaction marker from partition {} has failed with  {}. This partition will be removed from the set of partitions waiting for completion", new Object[]{transactionalId, topicPartition, error.name()});
                            txnMetadata.removePartition(topicPartition);
                            continue block8;
                        }
                    }
                    throw new IllegalStateException(String.format("Unexpected error %s while sending txn marker for %s", error.exceptionName(), transactionalId));
                }
                return null;
            });
        }
        return abortSendingAndRetryPartitions;
    }

    public TransactionMarkerRequestCompletionHandler(TransactionStateManager txnStateManager, TransactionMarkerChannelManager txnMarkerChannelManager, List<TransactionMarkerChannelManager.TxnIdAndMarkerEntry> txnIdAndMarkerEntries, String namespacePrefixForUserTopics) {
        this.txnStateManager = txnStateManager;
        this.txnMarkerChannelManager = txnMarkerChannelManager;
        this.txnIdAndMarkerEntries = txnIdAndMarkerEntries;
        this.namespacePrefixForUserTopics = namespacePrefixForUserTopics;
    }

    private static class AbortSendingRetryPartitions {
        private AtomicBoolean abortSending = new AtomicBoolean(false);
        private Set<TopicPartition> retryPartitions = new HashSet<TopicPartition>();

        private AbortSendingRetryPartitions() {
        }
    }
}

