/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.io.netty.util.HashedWheelTimer;
import org.apache.pulsar.shade.io.netty.util.Timer;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionMetadataStoreService {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionMetadataStoreService.class);
    private final Map<TransactionCoordinatorID, TransactionMetadataStore> stores;
    private final TransactionMetadataStoreProvider transactionMetadataStoreProvider;
    private final PulsarService pulsarService;
    private final TransactionBufferClient tbClient;
    private final TransactionTimeoutTrackerFactory timeoutTrackerFactory;
    private static final long endTransactionRetryIntervalTime = 1000L;
    private final Timer transactionOpRetryTimer;
    private final ConcurrentLongHashMap<Semaphore> tcLoadSemaphores;
    private final ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> pendingConnectRequests;
    private final ExecutorService internalPinnedExecutor;
    private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
    private final ThreadFactory threadFactory = new DefaultThreadFactory("transaction-coordinator-thread-factory");

    public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider, PulsarService pulsarService, TransactionBufferClient tbClient, HashedWheelTimer timer) {
        this.pulsarService = pulsarService;
        this.stores = new ConcurrentHashMap<TransactionCoordinatorID, TransactionMetadataStore>();
        this.transactionMetadataStoreProvider = transactionMetadataStoreProvider;
        this.tbClient = tbClient;
        this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, timer);
        this.transactionOpRetryTimer = timer;
        this.tcLoadSemaphores = new ConcurrentLongHashMap();
        this.pendingConnectRequests = new ConcurrentLongHashMap();
        this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(this.threadFactory);
    }

    @Deprecated
    public void start() {
        this.pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener(){

            @Override
            public void onLoad(NamespaceBundle bundle) {
                TransactionMetadataStoreService.this.pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete((topics, ex) -> {
                    if (ex == null) {
                        for (String topic : topics) {
                            TopicName name = TopicName.get(topic);
                            if (!TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName().equals(TopicName.get(name.getPartitionedTopicName()).getLocalName()) || !name.isPartitioned()) continue;
                            TransactionMetadataStoreService.this.handleTcClientConnect(TransactionCoordinatorID.get(name.getPartitionIndex()));
                        }
                    } else {
                        LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.", (Object)bundle, ex);
                    }
                });
            }

            @Override
            public void unLoad(NamespaceBundle bundle) {
                TransactionMetadataStoreService.this.pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle).whenComplete((topics, ex) -> {
                    if (ex == null) {
                        for (String topic : topics) {
                            TopicName name = TopicName.get(topic);
                            if (!TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName().equals(TopicName.get(name.getPartitionedTopicName()).getLocalName()) || !name.isPartitioned()) continue;
                            TransactionMetadataStoreService.this.removeTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex()));
                        }
                    } else {
                        LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.", (Object)bundle, ex);
                    }
                });
            }

            @Override
            public boolean test(NamespaceBundle namespaceBundle) {
                return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE);
            }
        });
    }

    public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        this.internalPinnedExecutor.execute(() -> {
            if (this.stores.get(tcId) != null) {
                completableFuture.complete(null);
            } else {
                this.pulsarService.getBrokerService().checkTopicNsOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition((int)tcId.getId()).toString()).thenRun(() -> this.internalPinnedExecutor.execute(() -> {
                    Semaphore tcLoadSemaphore = this.tcLoadSemaphores.computeIfAbsent(tcId.getId(), id -> new Semaphore(1));
                    Deque deque = this.pendingConnectRequests.computeIfAbsent(tcId.getId(), id -> new ConcurrentLinkedDeque());
                    if (tcLoadSemaphore.tryAcquire()) {
                        if (this.stores.get(tcId) != null) {
                            completableFuture.complete(null);
                        }
                        ((CompletableFuture)this.openTransactionMetadataStore(tcId).thenAccept(store -> this.internalPinnedExecutor.execute(() -> {
                            block2: {
                                this.stores.put(tcId, (TransactionMetadataStore)store);
                                LOG.info("Added new transaction meta store {}", (Object)tcId);
                                long endTime = System.currentTimeMillis() + 30000L;
                                while (System.currentTimeMillis() < endTime) {
                                    CompletableFuture future = (CompletableFuture)deque.poll();
                                    if (future != null) {
                                        future.complete(null);
                                        continue;
                                    }
                                    break block2;
                                }
                                deque.clear();
                            }
                            completableFuture.complete(null);
                            tcLoadSemaphore.release();
                        }))).exceptionally(e -> {
                            this.internalPinnedExecutor.execute(() -> {
                                block2: {
                                    completableFuture.completeExceptionally(e.getCause());
                                    tcLoadSemaphore.release();
                                    long endTime = System.currentTimeMillis() + 30000L;
                                    while (System.currentTimeMillis() < endTime) {
                                        CompletableFuture future = (CompletableFuture)deque.poll();
                                        if (future != null) {
                                            future.completeExceptionally((Throwable)e);
                                            continue;
                                        }
                                        break block2;
                                    }
                                    deque.clear();
                                }
                                LOG.error("Add transaction metadata store with id {} error", (Object)tcId.getId(), e);
                            });
                            return null;
                        });
                    } else {
                        deque.add(completableFuture);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Handle tc client connect added into pending queue! tcId : {}", (Object)tcId.toString());
                        }
                    }
                }));
            }
        });
        return completableFuture;
    }

    public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) {
        return this.pulsarService.getBrokerService().getManagedLedgerConfig(MLTransactionLogImpl.getMLTransactionLogName(tcId)).thenCompose(v -> {
            TransactionTimeoutTracker timeoutTracker = this.timeoutTrackerFactory.newTracker(tcId);
            TransactionRecoverTrackerImpl recoverTracker = new TransactionRecoverTrackerImpl(this, timeoutTracker, tcId.getId());
            return this.transactionMetadataStoreProvider.openStore(tcId, this.pulsarService.getManagedLedgerFactory(), (ManagedLedgerConfig)v, timeoutTracker, recoverTracker);
        });
    }

    public CompletableFuture<Void> removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
        Semaphore tcLoadSemaphore = this.tcLoadSemaphores.computeIfAbsent(tcId.getId(), id -> new Semaphore(1));
        if (tcLoadSemaphore.tryAcquire()) {
            TransactionMetadataStore metadataStore = this.stores.remove(tcId);
            if (metadataStore != null) {
                metadataStore.closeAsync().whenComplete((v, ex) -> {
                    if (ex != null) {
                        LOG.error("Close transaction metadata store with id " + tcId, ex);
                    } else {
                        LOG.info("Removed and closed transaction meta store {}", (Object)tcId);
                    }
                });
            }
            tcLoadSemaphore.release();
            return CompletableFuture.completedFuture(null);
        }
        return FutureUtil.failedFuture(new BrokerServiceException.ServiceUnitNotReadyException("Could not remove TransactionMetadataStore, it is doing other operations!"));
    }

    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.newTransaction(timeoutInMills);
    }

    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.addProducedPartitionToTxn(txnId, partitions);
    }

    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnId, List<TransactionSubscription> partitions) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.addAckedPartitionToTxn(txnId, partitions);
    }

    public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnId) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.getTxnMeta(txnId);
    }

    public long getLowWaterMark(TxnID txnID) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnID);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return 0L;
        }
        return store.getLowWaterMark();
    }

    public CompletableFuture<Void> updateTxnStatus(TxnID txnId, TxnStatus newStatus, TxnStatus expectedStatus, boolean isTimeout) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture(new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.updateTxnStatus(txnId, newStatus, expectedStatus, isTimeout);
    }

    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        return this.endTransaction(txnID, txnAction, isTimeout, completableFuture);
    }

    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout, CompletableFuture<Void> completableFuture) {
        TxnStatus newStatus;
        switch (txnAction) {
            case 0: {
                newStatus = TxnStatus.COMMITTING;
                break;
            }
            case 1: {
                newStatus = TxnStatus.ABORTING;
                break;
            }
            default: {
                TransactionCoordinatorException.UnsupportedTxnActionException exception = new TransactionCoordinatorException.UnsupportedTxnActionException(txnID, txnAction);
                LOG.error(exception.getMessage());
                completableFuture.completeExceptionally(exception);
                return completableFuture;
            }
        }
        ((CompletableFuture)this.getTxnMeta(txnID).thenAccept(txnMeta -> {
            TxnStatus txnStatus = txnMeta.status();
            if (txnStatus == TxnStatus.OPEN) {
                ((CompletableFuture)this.updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout).thenAccept(v -> ((CompletableFuture)this.endTxnInTransactionBuffer(txnID, txnAction).thenAccept(a -> completableFuture.complete(null))).exceptionally(e -> {
                    if (TransactionMetadataStoreService.isRetryableException(e.getCause())) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                        }
                        this.transactionOpRetryTimer.newTimeout(timeout -> this.endTransaction(txnID, txnAction, isTimeout, completableFuture), 1000L, TimeUnit.MILLISECONDS);
                        return null;
                    }
                    LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                    completableFuture.completeExceptionally(e.getCause());
                    return null;
                }))).exceptionally(e -> {
                    if (TransactionMetadataStoreService.isRetryableException(e.getCause())) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("EndTransaction UpdateTxnStatus op retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                        }
                        this.transactionOpRetryTimer.newTimeout(timeout -> this.endTransaction(txnID, txnAction, isTimeout, completableFuture), 1000L, TimeUnit.MILLISECONDS);
                        return null;
                    }
                    LOG.error("EndTransaction UpdateTxnStatus fail! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                    completableFuture.completeExceptionally(e.getCause());
                    return null;
                });
            } else if (txnStatus == TxnStatus.COMMITTING && txnAction == TxnAction.COMMIT.getValue() || txnStatus == TxnStatus.ABORTING && txnAction == TxnAction.ABORT.getValue()) {
                ((CompletableFuture)this.endTxnInTransactionBuffer(txnID, txnAction).thenAccept(k -> completableFuture.complete(null))).exceptionally(e -> {
                    if (TransactionMetadataStoreService.isRetryableException(e.getCause())) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                        }
                        this.transactionOpRetryTimer.newTimeout(timeout -> this.endTransaction(txnID, txnAction, isTimeout, completableFuture), 1000L, TimeUnit.MILLISECONDS);
                        return null;
                    }
                    LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                    completableFuture.completeExceptionally(e.getCause());
                    return null;
                });
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", (Object)txnID, (Object)txnAction);
                }
                completableFuture.completeExceptionally(new CoordinatorException.InvalidTxnStatusException(txnID, newStatus, txnStatus));
            }
        })).exceptionally(e -> {
            if (TransactionMetadataStoreService.isRetryableException(e.getCause())) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("End transaction op retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, e});
                }
                this.transactionOpRetryTimer.newTimeout(timeout -> this.endTransaction(txnID, txnAction, isTimeout, completableFuture), 1000L, TimeUnit.MILLISECONDS);
                return null;
            }
            completableFuture.completeExceptionally(e.getCause());
            return null;
        });
        return completableFuture;
    }

    public void handleOpFail(Throwable e, TransactionCoordinatorID tcId) {
        if (e.getCause() instanceof ManagedLedgerException.ManagedLedgerFencedException || e instanceof ManagedLedgerException.ManagedLedgerFencedException) {
            this.removeTransactionMetadataStore(tcId);
        }
    }

    public void endTransactionForTimeout(TxnID txnID) {
        ((CompletableFuture)this.getTxnMeta(txnID).thenCompose(txnMeta -> {
            if (txnMeta.status() == TxnStatus.OPEN) {
                return this.endTransaction(txnID, 1, true);
            }
            return null;
        })).exceptionally(e -> {
            if (TransactionMetadataStoreService.isRetryableException(e.getCause())) {
                this.endTransaction(txnID, 1, true);
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("Transaction have been handle complete, don't need to handle by transaction timeout! TxnId : {}", (Object)txnID);
            }
            return null;
        });
    }

    private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction) {
        CompletableFuture resultFuture = new CompletableFuture();
        ArrayList completableFutureList = new ArrayList();
        this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
            if (throwable != null) {
                resultFuture.completeExceptionally((Throwable)throwable);
                return;
            }
            long lowWaterMark = this.getLowWaterMark(txnID);
            txnMeta.ackedPartitions().forEach(tbSub -> {
                CompletableFuture<Object> actionFuture = new CompletableFuture();
                if (0 == txnAction) {
                    actionFuture = this.tbClient.commitTxnOnSubscription(tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else if (1 == txnAction) {
                    actionFuture = this.tbClient.abortTxnOnSubscription(tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else {
                    actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
                }
                completableFutureList.add(actionFuture);
            });
            txnMeta.producedPartitions().forEach(partition -> {
                CompletableFuture<Object> actionFuture = new CompletableFuture();
                if (0 == txnAction) {
                    actionFuture = this.tbClient.commitTxnOnTopic((String)partition, txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else if (1 == txnAction) {
                    actionFuture = this.tbClient.abortTxnOnTopic((String)partition, txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                } else {
                    actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
                }
                completableFutureList.add(actionFuture);
            });
            try {
                FutureUtil.waitForAll(completableFutureList).whenComplete((ignored, waitThrowable) -> {
                    if (waitThrowable != null) {
                        resultFuture.completeExceptionally((Throwable)waitThrowable);
                        return;
                    }
                    resultFuture.complete(null);
                });
            }
            catch (Exception e) {
                resultFuture.completeExceptionally(e);
            }
        });
        return resultFuture.thenCompose(future -> this.endTxnInTransactionMetadataStore(txnID, txnAction));
    }

    private static boolean isRetryableException(Throwable e) {
        return (e instanceof CoordinatorException.TransactionMetadataStoreStateException || e instanceof TransactionBufferClientException.RequestTimeoutException || e instanceof ManagedLedgerException || e instanceof PulsarClientException.BrokerPersistenceException || e instanceof PulsarClientException.LookupException || e instanceof TransactionBufferClientException.ReachMaxPendingOpsException || e instanceof PulsarClientException.ConnectException) && !(e instanceof ManagedLedgerException.ManagedLedgerFencedException);
    }

    private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID txnID, int txnAction) {
        if (TxnAction.COMMIT.getValue() == txnAction) {
            return this.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false);
        }
        if (TxnAction.ABORT.getValue() == txnAction) {
            return this.updateTxnStatus(txnID, TxnStatus.ABORTED, TxnStatus.ABORTING, false);
        }
        return FutureUtil.failedFuture(new CoordinatorException.InvalidTxnStatusException("Unsupported txnAction " + txnAction));
    }

    private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnId) {
        return new TransactionCoordinatorID(txnId.getMostSigBits());
    }

    @VisibleForTesting
    public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
        return Collections.unmodifiableMap(this.stores);
    }

    public void close() {
        this.internalPinnedExecutor.shutdown();
    }
}

