/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;

import com.google.common.annotations.VisibleForTesting;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.streamnative.pulsar.handlers.kop.EndPoint;
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
import io.streamnative.pulsar.handlers.kop.KopBrokerLookupManager;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMarkerChannelHandler;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMarkerChannelInitializer;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMarkerRequestCompletionHandler;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionMetadata;
import io.streamnative.pulsar.handlers.kop.coordinator.transaction.TransactionStateManager;
import io.streamnative.pulsar.handlers.kop.scala.Either;
import io.streamnative.pulsar.handlers.kop.utils.KopTopic;
import io.streamnative.pulsar.handlers.kop.utils.ssl.SSLUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.AuthenticationUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.ChannelFutures;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionMarkerChannelManager {
    private static final Logger log = LoggerFactory.getLogger(TransactionMarkerChannelManager.class);
    private final String tenant;
    private final KafkaServiceConfiguration kafkaConfig;
    private final EventLoopGroup eventLoopGroup;
    private final boolean enableTls;
    private final SslContextFactory sslContextFactory;
    private final EndPoint sslEndPoint;
    private final KopBrokerLookupManager kopBrokerLookupManager;
    private final Bootstrap bootstrap;
    private final Map<InetSocketAddress, CompletableFuture<TransactionMarkerChannelHandler>> handlerMap = new ConcurrentHashMap<InetSocketAddress, CompletableFuture<TransactionMarkerChannelHandler>>();
    private TransactionStateManager txnStateManager;
    @VisibleForTesting
    private ConcurrentHashMap<String, PendingCompleteTxn> transactionsWithPendingMarkers = new ConcurrentHashMap();
    private Map<InetSocketAddress, TxnMarkerQueue> markersQueuePerBroker = new ConcurrentHashMap<InetSocketAddress, TxnMarkerQueue>();
    private TxnMarkerQueue markersQueueForUnknownBroker = new TxnMarkerQueue(null);
    private BlockingQueue<PendingCompleteTxn> txnLogAppendRetryQueue = new LinkedBlockingQueue<PendingCompleteTxn>();
    private volatile boolean closed;
    private final String namespacePrefixForUserTopics;
    private final ScheduledExecutorService scheduler;
    private ScheduledFuture<?> drainQueuedTransactionMarkersHandle;
    private Authentication authentication;

    public TransactionMarkerChannelManager(String tenant, KafkaServiceConfiguration kafkaConfig, TransactionStateManager txnStateManager, KopBrokerLookupManager kopBrokerLookupManager, boolean enableTls, String namespacePrefixForUserTopics, ScheduledExecutorService scheduler) throws Exception {
        this.tenant = tenant;
        this.kafkaConfig = kafkaConfig;
        this.namespacePrefixForUserTopics = namespacePrefixForUserTopics;
        this.txnStateManager = txnStateManager;
        this.kopBrokerLookupManager = kopBrokerLookupManager;
        this.enableTls = enableTls;
        this.scheduler = scheduler;
        if (this.enableTls) {
            this.sslContextFactory = SSLUtils.createSslContextFactory(kafkaConfig);
            this.sslEndPoint = EndPoint.getSslEndPoint(kafkaConfig.getKafkaListeners());
        } else {
            this.sslContextFactory = null;
            this.sslEndPoint = null;
        }
        if (kafkaConfig.isAuthenticationEnabled()) {
            String auth = kafkaConfig.getBrokerClientAuthenticationPlugin();
            String authParams = kafkaConfig.getBrokerClientAuthenticationParameters();
            this.authentication = AuthenticationUtil.create((String)auth, (String)authParams);
            this.authentication.start();
        }
        this.eventLoopGroup = new NioEventLoopGroup();
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(this.eventLoopGroup);
        this.bootstrap.channel(NioSocketChannel.class);
        this.bootstrap.handler((ChannelHandler)new TransactionMarkerChannelInitializer(kafkaConfig, enableTls, this));
    }

    public CompletableFuture<TransactionMarkerChannelHandler> getChannel(InetSocketAddress socketAddress) {
        if (this.closed) {
            return FutureUtil.failedFuture((Throwable)new Exception("This TransactionMarkerChannelManager is closed"));
        }
        this.ensureDrainQueuedTransactionMarkersActivity();
        return this.handlerMap.computeIfAbsent(socketAddress, address -> {
            CompletableFuture handlerFuture = new CompletableFuture();
            ((CompletableFuture)ChannelFutures.toCompletableFuture((ChannelFuture)this.bootstrap.connect((SocketAddress)socketAddress)).thenAccept(channel -> handlerFuture.complete((TransactionMarkerChannelHandler)channel.pipeline().get("txnHandler")))).exceptionally(e -> {
                handlerFuture.completeExceptionally((Throwable)e);
                return null;
            });
            return handlerFuture;
        });
    }

    public void channelFailed(InetSocketAddress socketAddress, TransactionMarkerChannelHandler handler) {
        log.error("channelFailed {} {}", (Object)socketAddress, (Object)handler);
        this.handlerMap.computeIfPresent(socketAddress, (kek, value) -> {
            if (value.isCompletedExceptionally() || value.isCancelled()) {
                return null;
            }
            TransactionMarkerChannelHandler currentValue = value.getNow(null);
            if (currentValue == handler) {
                log.error("channelFailed removing {} {}", (Object)socketAddress, (Object)handler);
                return null;
            }
            return value;
        });
    }

    public void addTxnMarkersToSend(Integer coordinatorEpoch, TransactionResult txnResult, TransactionMetadata txnMetadata, TransactionMetadata.TxnTransitMetadata newMetadata, String namespacePrefix) {
        this.ensureDrainQueuedTransactionMarkersActivity();
        String transactionalId = txnMetadata.getTransactionalId();
        PendingCompleteTxn pendingCompleteTxn = new PendingCompleteTxn(transactionalId, coordinatorEpoch, txnMetadata, newMetadata);
        this.transactionsWithPendingMarkers.put(transactionalId, pendingCompleteTxn);
        this.addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.getProducerId(), txnMetadata.getProducerEpoch(), txnResult, coordinatorEpoch, txnMetadata.getTopicPartitions(), namespacePrefix);
        this.maybeWriteTxnCompletion(transactionalId);
    }

    private boolean hasPendingMarkersToWrite(TransactionMetadata txnMetadata) {
        AtomicBoolean isHas = new AtomicBoolean(true);
        txnMetadata.inLock(() -> {
            isHas.set(!txnMetadata.getTopicPartitions().isEmpty());
            return null;
        });
        return isHas.get();
    }

    public void maybeWriteTxnCompletion(String transactionalId) {
        this.ensureDrainQueuedTransactionMarkersActivity();
        Optional.ofNullable(this.transactionsWithPendingMarkers.get(transactionalId)).ifPresent(pendingCompleteTxn -> {
            if (!this.hasPendingMarkersToWrite(pendingCompleteTxn.txnMetadata) && this.transactionsWithPendingMarkers.remove(transactionalId, pendingCompleteTxn)) {
                this.writeTxnCompletion((PendingCompleteTxn)pendingCompleteTxn);
            }
        });
    }

    public void addTxnMarkersToBrokerQueue(String transactionalId, Long producerId, Short producerEpoch, TransactionResult result, Integer coordinatorEpoch, Set<TopicPartition> topicPartitions, String namespacePrefixForUserTopics) {
        this.ensureDrainQueuedTransactionMarkersActivity();
        Integer txnTopicPartition = this.txnStateManager.partitionFor(transactionalId);
        ConcurrentHashMap addressAndPartitionMap = new ConcurrentHashMap();
        CopyOnWriteArrayList unknownBrokerTopicList = new CopyOnWriteArrayList();
        ArrayList addressFutureList = new ArrayList();
        for (TopicPartition topicPartition : topicPartitions) {
            String pulsarTopic = new KopTopic(topicPartition.topic(), namespacePrefixForUserTopics).getPartitionName(topicPartition.partition());
            CompletableFuture addFuture = new CompletableFuture();
            addressFutureList.add(addFuture);
            this.kopBrokerLookupManager.isTopicExists(pulsarTopic).thenAccept(isTopicExists -> {
                if (!isTopicExists.booleanValue()) {
                    Either<Errors, Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> transactionState = this.txnStateManager.getTransactionState(transactionalId);
                    if (transactionState.isLeft()) {
                        log.info("Encountered {} trying to fetch transaction metadata for {} with coordinator epoch {}; cancel sending markers to its partition leaders", new Object[]{transactionState.getLeft(), transactionalId, coordinatorEpoch});
                        this.transactionsWithPendingMarkers.remove(transactionalId);
                        addFuture.complete(null);
                        return;
                    }
                    Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata> epochAndTxnMetadata = transactionState.getRight();
                    if (epochAndTxnMetadata.isPresent()) {
                        if (!coordinatorEpoch.equals(epochAndTxnMetadata.get().getCoordinatorEpoch())) {
                            log.info("The cached metadata has changed to {} (old coordinator epoch is {}) since preparing to send markers; cancel sending markers to its partition leaders", epochAndTxnMetadata, (Object)coordinatorEpoch);
                            this.transactionsWithPendingMarkers.remove(transactionalId);
                        } else {
                            log.info("Couldn't find leader endpoint for partitions {} while trying to send transaction markers for {}, these partitions are likely deleted already and hence can be skipped", (Object)topicPartition, (Object)transactionalId);
                            TransactionMetadata txnMetadata = epochAndTxnMetadata.get().getTransactionMetadata();
                            txnMetadata.inLock(() -> {
                                topicPartitions.forEach(txnMetadata::removePartition);
                                return null;
                            });
                            this.maybeWriteTxnCompletion(transactionalId);
                        }
                    } else {
                        log.error("The coordinator still owns the transaction partition for {}, but there is no metadata in the cache; this is not expected", (Object)transactionalId);
                        addFuture.complete(null);
                        return;
                    }
                    addFuture.complete(null);
                    return;
                }
                CompletableFuture<Optional<InetSocketAddress>> addressFuture = this.kopBrokerLookupManager.findBroker(pulsarTopic, this.sslEndPoint);
                addressFuture.whenComplete((address, throwable) -> {
                    if (throwable != null) {
                        log.warn("Failed to find broker for topic partition {}", (Object)topicPartition, throwable);
                        unknownBrokerTopicList.add(topicPartition);
                        addFuture.complete(null);
                        return;
                    }
                    if (!address.isPresent()) {
                        log.warn("No address for broker for topic partition {}", (Object)topicPartition);
                        unknownBrokerTopicList.add(topicPartition);
                        addFuture.complete(null);
                        return;
                    }
                    addressAndPartitionMap.compute((InetSocketAddress)address.get(), (__, set) -> {
                        if (set == null) {
                            set = new ArrayList<TopicPartition>();
                        }
                        set.add(topicPartition);
                        return set;
                    });
                    addFuture.complete(null);
                });
            });
        }
        FutureUtil.waitForAll(addressFutureList).whenComplete((ignored, throwable) -> {
            addressAndPartitionMap.forEach((address, partitions) -> {
                WriteTxnMarkersRequest.TxnMarkerEntry entry = new WriteTxnMarkersRequest.TxnMarkerEntry(producerId.longValue(), producerEpoch.shortValue(), coordinatorEpoch.intValue(), result, partitions);
                TxnMarkerQueue markerQueue = this.markersQueuePerBroker.computeIfAbsent((InetSocketAddress)address, key -> new TxnMarkerQueue((InetSocketAddress)address));
                markerQueue.addMarkers(txnTopicPartition, new TxnIdAndMarkerEntry(transactionalId, entry));
            });
            if (unknownBrokerTopicList.size() > 0) {
                WriteTxnMarkersRequest.TxnMarkerEntry entry = new WriteTxnMarkersRequest.TxnMarkerEntry(producerId.longValue(), producerEpoch.shortValue(), coordinatorEpoch.intValue(), result, unknownBrokerTopicList);
                this.markersQueueForUnknownBroker.addMarkers(txnTopicPartition, new TxnIdAndMarkerEntry(transactionalId, entry));
            }
        });
    }

    private void writeTxnCompletion(PendingCompleteTxn pendingCompleteTxn) {
        block9: {
            Either<Errors, Optional<TransactionStateManager.CoordinatorEpochAndTxnMetadata>> errorsAndData;
            int coordinatorEpoch;
            TransactionMetadata.TxnTransitMetadata newMetadata;
            TransactionMetadata txnMetadata;
            String transactionalId;
            block8: {
                transactionalId = pendingCompleteTxn.transactionalId;
                txnMetadata = pendingCompleteTxn.txnMetadata;
                newMetadata = pendingCompleteTxn.newMetadata;
                coordinatorEpoch = pendingCompleteTxn.coordinatorEpoch;
                if (log.isDebugEnabled()) {
                    log.debug("Completed sending transaction markers for {}; begin transition to {}", (Object)transactionalId, (Object)newMetadata.getTxnState());
                }
                if (!(errorsAndData = this.txnStateManager.getTransactionState(transactionalId)).isLeft()) break block8;
                switch (errorsAndData.getLeft()) {
                    case NOT_COORDINATOR: {
                        log.info("No longer the coordinator for {} with coordinator epoch {}; cancel appending {} to transaction log", new Object[]{transactionalId, coordinatorEpoch, newMetadata});
                        break block9;
                    }
                    case COORDINATOR_LOAD_IN_PROGRESS: {
                        log.info("Loading the transaction partition that contains {} while my current coordinator epoch is {}; so cancel appending {} to transaction log since the loading process will continue the remaining work", new Object[]{transactionalId, coordinatorEpoch, newMetadata});
                        break block9;
                    }
                    default: {
                        throw new IllegalStateException("Unhandled error {} when fetching current transaction state", errorsAndData.getLeft().exception());
                    }
                }
            }
            if (!errorsAndData.getRight().isPresent()) {
                String errorMsg = String.format("The coordinator still owns the transaction partition for %s, but there is no metadata in the cache; this is not expected", transactionalId);
                throw new IllegalStateException(errorMsg);
            }
            TransactionStateManager.CoordinatorEpochAndTxnMetadata epochAndMetadata = errorsAndData.getRight().get();
            if (epochAndMetadata.getCoordinatorEpoch() == coordinatorEpoch) {
                log.debug("Sending {}'s transaction markers for {} with coordinator epoch {} succeeded, trying to append complete transaction log now", new Object[]{transactionalId, txnMetadata, coordinatorEpoch});
                this.tryAppendToLog(new PendingCompleteTxn(transactionalId, coordinatorEpoch, txnMetadata, newMetadata));
            } else {
                log.info("The cached metadata {} has changed to {} after completed sending the markers with coordinator epoch {}; abort transiting the metadata to {} as it may have been updated by another process", new Object[]{txnMetadata, epochAndMetadata, coordinatorEpoch, newMetadata});
            }
        }
    }

    private void tryAppendToLog(final PendingCompleteTxn txnLogAppend) {
        this.txnStateManager.appendTransactionToLog(txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, txnLogAppend.newMetadata, new TransactionStateManager.ResponseCallback(){

            @Override
            public void complete() {
                if (log.isDebugEnabled()) {
                    log.debug("Completed transaction for {} with coordinator epoch {}, final state after commit: {}", new Object[]{txnLogAppend.transactionalId, txnLogAppend.coordinatorEpoch, txnLogAppend.txnMetadata.getState()});
                }
            }

            @Override
            public void fail(Errors errors) {
                switch (errors) {
                    case NOT_COORDINATOR: {
                        log.info("No longer the coordinator for transactionalId: {} while trying to append to transaction log, skip writing to transaction log", (Object)txnLogAppend.transactionalId);
                        break;
                    }
                    case COORDINATOR_NOT_AVAILABLE: {
                        log.info("Not available to append {}: possible causes include {}, {}, {} and {}; retry appending", new Object[]{txnLogAppend, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NOT_ENOUGH_REPLICAS, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.REQUEST_TIMED_OUT});
                        TransactionMarkerChannelManager.this.txnLogAppendRetryQueue.add(txnLogAppend);
                        break;
                    }
                    case COORDINATOR_LOAD_IN_PROGRESS: {
                        log.info("Coordinator is loading the partition {} and hence cannot complete append of {}; skip writing to transaction log as the loading process should complete it", (Object)TransactionMarkerChannelManager.this.txnStateManager.partitionFor(txnLogAppend.transactionalId), (Object)txnLogAppend);
                        break;
                    }
                    default: {
                        String errorMsg = String.format("Unexpected error %s while appending to transaction log for %s", errors.exceptionName(), txnLogAppend.transactionalId);
                        log.error(errorMsg);
                        throw new IllegalStateException(errorMsg);
                    }
                }
            }
        }, null);
    }

    public void removeMarkersForTxnTopicPartition(Integer txnTopicPartitionId) {
        this.ensureDrainQueuedTransactionMarkersActivity();
        BlockingQueue<TxnIdAndMarkerEntry> unknownBrokerMarkerEntries = this.markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId);
        if (unknownBrokerMarkerEntries != null) {
            unknownBrokerMarkerEntries.forEach(markerEntry -> this.removeMarkersForTxnId(markerEntry.getTransactionalId()));
        }
        this.markersQueuePerBroker.forEach((__, txnMarkerQueue) -> {
            BlockingQueue<TxnIdAndMarkerEntry> markerEntries = txnMarkerQueue.removeMarkersForTxnTopicPartition(txnTopicPartitionId);
            if (markerEntries != null) {
                markerEntries.forEach(markerEntry -> this.removeMarkersForTxnId(markerEntry.getTransactionalId()));
            }
        });
    }

    public void removeMarkersForTxnId(String transactionalId) {
        this.transactionsWithPendingMarkers.remove(transactionalId);
    }

    private void retryLogAppends() {
        ArrayList txnLogAppendRetries = new ArrayList();
        this.txnLogAppendRetryQueue.drainTo(txnLogAppendRetries);
        for (PendingCompleteTxn pendingCompleteTxn : txnLogAppendRetries) {
            if (log.isDebugEnabled()) {
                log.debug("Retry appending {} transaction log", (Object)pendingCompleteTxn);
            }
            this.tryAppendToLog(pendingCompleteTxn);
        }
    }

    private void drainQueuedTransactionMarkers() {
        this.retryLogAppends();
        ArrayList txnIdAndMarkerEntries = new ArrayList();
        this.markersQueueForUnknownBroker.forEachTxnTopicPartition((__, queue) -> queue.drainTo(txnIdAndMarkerEntries));
        for (TxnIdAndMarkerEntry txnIdAndMarker : txnIdAndMarkerEntries) {
            String transactionalId = txnIdAndMarker.getTransactionalId();
            long producerId = txnIdAndMarker.getEntry().producerId();
            short producerEpoch = txnIdAndMarker.getEntry().producerEpoch();
            TransactionResult txnResult = txnIdAndMarker.getEntry().transactionResult();
            int coordinatorEpoch = txnIdAndMarker.getEntry().coordinatorEpoch();
            List topicPartitions = txnIdAndMarker.getEntry().partitions();
            this.addTxnMarkersToBrokerQueue(transactionalId, producerId, producerEpoch, txnResult, coordinatorEpoch, new HashSet<TopicPartition>(topicPartitions), this.namespacePrefixForUserTopics);
        }
        for (TxnMarkerQueue txnMarkerQueue : this.markersQueuePerBroker.values()) {
            txnIdAndMarkerEntries.clear();
            txnMarkerQueue.forEachTxnTopicPartition((__, queue) -> queue.drainTo(txnIdAndMarkerEntries));
            if (txnIdAndMarkerEntries.isEmpty()) continue;
            this.getChannel(txnMarkerQueue.address).whenComplete((channelHandler, throwable) -> {
                ArrayList<WriteTxnMarkersRequest.TxnMarkerEntry> sendEntries = new ArrayList<WriteTxnMarkersRequest.TxnMarkerEntry>();
                for (TxnIdAndMarkerEntry txnIdAndMarkerEntry : txnIdAndMarkerEntries) {
                    sendEntries.add(txnIdAndMarkerEntry.entry);
                }
                channelHandler.enqueueWriteTxnMarkers(sendEntries, new TransactionMarkerRequestCompletionHandler(this.txnStateManager, this, txnIdAndMarkerEntries, this.namespacePrefixForUserTopics));
            });
        }
    }

    private synchronized void ensureDrainQueuedTransactionMarkersActivity() {
        if (this.drainQueuedTransactionMarkersHandle != null || this.closed) {
            return;
        }
        this.drainQueuedTransactionMarkersHandle = this.scheduler.scheduleWithFixedDelay(() -> this.drainQueuedTransactionMarkers(), 100L, 100L, TimeUnit.MILLISECONDS);
    }

    private synchronized void stopDrainQueuedTransactionMarkersHandleActivity() {
        if (this.drainQueuedTransactionMarkersHandle != null) {
            this.drainQueuedTransactionMarkersHandle.cancel(false);
        }
    }

    public void close() {
        this.closed = true;
        this.stopDrainQueuedTransactionMarkersHandleActivity();
        this.handlerMap.forEach((address, handler) -> {
            try {
                TransactionMarkerChannelHandler transactionMarkerChannelHandler = (TransactionMarkerChannelHandler)((Object)((Object)handler.get()));
                transactionMarkerChannelHandler.close();
            }
            catch (InterruptedException | ExecutionException err) {
                log.info("Cannot close TransactionMarkerChannelHandler for {}", address, (Object)err);
            }
        });
        if (this.authentication != null) {
            try {
                this.authentication.close();
            }
            catch (IOException e) {
                log.error("Transaction marker authentication close failed.", (Throwable)e);
            }
        }
    }

    public String getAuthenticationUsername() {
        return this.tenant;
    }

    public KafkaServiceConfiguration getKafkaConfig() {
        return this.kafkaConfig;
    }

    public ConcurrentHashMap<String, PendingCompleteTxn> getTransactionsWithPendingMarkers() {
        return this.transactionsWithPendingMarkers;
    }

    public Authentication getAuthentication() {
        return this.authentication;
    }

    private static class TxnMarkerQueue {
        private final InetSocketAddress address;
        private final Map<Integer, BlockingQueue<TxnIdAndMarkerEntry>> markersPerPartition = new ConcurrentHashMap<Integer, BlockingQueue<TxnIdAndMarkerEntry>>();

        public TxnMarkerQueue(InetSocketAddress address) {
            this.address = address;
        }

        public BlockingQueue<TxnIdAndMarkerEntry> removeMarkersForTxnTopicPartition(Integer partition) {
            return this.markersPerPartition.remove(partition);
        }

        public void addMarkers(Integer txnTopicPartition, TxnIdAndMarkerEntry txnIdAndMarker) {
            BlockingQueue markersQueue = this.markersPerPartition.computeIfAbsent(txnTopicPartition, k -> new BlockingArrayQueue());
            markersQueue.add(txnIdAndMarker);
        }

        public void forEachTxnTopicPartition(BiConsumer<Integer, BlockingQueue<TxnIdAndMarkerEntry>> f) {
            for (Map.Entry<Integer, BlockingQueue<TxnIdAndMarkerEntry>> entry : this.markersPerPartition.entrySet()) {
                Integer partition = entry.getKey();
                BlockingQueue<TxnIdAndMarkerEntry> queue = entry.getValue();
                if (queue.isEmpty()) continue;
                f.accept(partition, queue);
            }
        }
    }

    protected static class PendingCompleteTxn {
        private final String transactionalId;
        private final Integer coordinatorEpoch;
        private final TransactionMetadata txnMetadata;
        private final TransactionMetadata.TxnTransitMetadata newMetadata;

        public PendingCompleteTxn(String transactionalId, Integer coordinatorEpoch, TransactionMetadata txnMetadata, TransactionMetadata.TxnTransitMetadata newMetadata) {
            this.transactionalId = transactionalId;
            this.coordinatorEpoch = coordinatorEpoch;
            this.txnMetadata = txnMetadata;
            this.newMetadata = newMetadata;
        }

        public String getTransactionalId() {
            return this.transactionalId;
        }

        public Integer getCoordinatorEpoch() {
            return this.coordinatorEpoch;
        }

        public TransactionMetadata getTxnMetadata() {
            return this.txnMetadata;
        }

        public TransactionMetadata.TxnTransitMetadata getNewMetadata() {
            return this.newMetadata;
        }

        public String toString() {
            return "TransactionMarkerChannelManager.PendingCompleteTxn(transactionalId=" + this.getTransactionalId() + ", coordinatorEpoch=" + this.getCoordinatorEpoch() + ", txnMetadata=" + this.getTxnMetadata() + ", newMetadata=" + this.getNewMetadata() + ")";
        }
    }

    protected static class TxnIdAndMarkerEntry {
        private final String transactionalId;
        private final WriteTxnMarkersRequest.TxnMarkerEntry entry;

        public String getTransactionalId() {
            return this.transactionalId;
        }

        public WriteTxnMarkersRequest.TxnMarkerEntry getEntry() {
            return this.entry;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof TxnIdAndMarkerEntry)) {
                return false;
            }
            TxnIdAndMarkerEntry other = (TxnIdAndMarkerEntry)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$transactionalId = this.getTransactionalId();
            String other$transactionalId = other.getTransactionalId();
            if (this$transactionalId == null ? other$transactionalId != null : !this$transactionalId.equals(other$transactionalId)) {
                return false;
            }
            WriteTxnMarkersRequest.TxnMarkerEntry this$entry = this.getEntry();
            WriteTxnMarkersRequest.TxnMarkerEntry other$entry = other.getEntry();
            return !(this$entry == null ? other$entry != null : !this$entry.equals(other$entry));
        }

        protected boolean canEqual(Object other) {
            return other instanceof TxnIdAndMarkerEntry;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $transactionalId = this.getTransactionalId();
            result = result * 59 + ($transactionalId == null ? 43 : $transactionalId.hashCode());
            WriteTxnMarkersRequest.TxnMarkerEntry $entry = this.getEntry();
            result = result * 59 + ($entry == null ? 43 : $entry.hashCode());
            return result;
        }

        public TxnIdAndMarkerEntry(String transactionalId, WriteTxnMarkersRequest.TxnMarkerEntry entry) {
            this.transactionalId = transactionalId;
            this.entry = entry;
        }

        public String toString() {
            return "TransactionMarkerChannelManager.TxnIdAndMarkerEntry(transactionalId=" + this.getTransactionalId() + ", entry=" + this.getEntry() + ")";
        }
    }
}

