/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionMetadataStoreService {
    private static final Logger LOG = LoggerFactory.getLogger(TransactionMetadataStoreService.class);
    private final Map<TransactionCoordinatorID, TransactionMetadataStore> stores;
    private final TransactionMetadataStoreProvider transactionMetadataStoreProvider;
    private final PulsarService pulsarService;
    private final TransactionBufferClient tbClient;
    private final TransactionTimeoutTrackerFactory timeoutTrackerFactory;
    private static final long endTransactionRetryIntervalTime = 1000L;
    private final Timer transactionOpRetryTimer;
    private final ConcurrentLongHashMap<Semaphore> tcLoadSemaphores;
    private final ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> pendingConnectRequests;
    private final ExecutorService internalPinnedExecutor;
    private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;
    private final ThreadFactory threadFactory = new DefaultThreadFactory("transaction-coordinator-thread-factory");

    public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider, PulsarService pulsarService, TransactionBufferClient tbClient, HashedWheelTimer timer) {
        this.pulsarService = pulsarService;
        this.stores = new ConcurrentHashMap<TransactionCoordinatorID, TransactionMetadataStore>();
        this.transactionMetadataStoreProvider = transactionMetadataStoreProvider;
        this.tbClient = tbClient;
        this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, timer);
        this.transactionOpRetryTimer = timer;
        this.tcLoadSemaphores = ConcurrentLongHashMap.newBuilder().build();
        this.pendingConnectRequests = ConcurrentLongHashMap.newBuilder().build();
        this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(this.threadFactory);
    }

    public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tcId) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        this.internalPinnedExecutor.execute(() -> {
            if (this.stores.get(tcId) != null) {
                completableFuture.complete(null);
            } else {
                ((CompletableFuture)this.pulsarService.getBrokerService().checkTopicNsOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition((int)tcId.getId()).toString()).thenRun(() -> this.internalPinnedExecutor.execute(() -> {
                    Semaphore tcLoadSemaphore = (Semaphore)this.tcLoadSemaphores.computeIfAbsent(tcId.getId(), id -> new Semaphore(1));
                    Deque deque = (Deque)this.pendingConnectRequests.computeIfAbsent(tcId.getId(), id -> new ConcurrentLinkedDeque());
                    if (tcLoadSemaphore.tryAcquire()) {
                        if (this.stores.get(tcId) != null) {
                            completableFuture.complete(null);
                            tcLoadSemaphore.release();
                            return;
                        }
                        ((CompletableFuture)this.openTransactionMetadataStore(tcId).thenAccept(store -> this.internalPinnedExecutor.execute(() -> {
                            block2: {
                                this.stores.put(tcId, (TransactionMetadataStore)store);
                                LOG.info("Added new transaction meta store {}", (Object)tcId);
                                long endTime = System.currentTimeMillis() + 30000L;
                                while (System.currentTimeMillis() < endTime) {
                                    CompletableFuture future = (CompletableFuture)deque.poll();
                                    if (future != null) {
                                        future.complete(null);
                                        continue;
                                    }
                                    break block2;
                                }
                                deque.clear();
                            }
                            completableFuture.complete(null);
                            tcLoadSemaphore.release();
                        }))).exceptionally(e -> {
                            this.internalPinnedExecutor.execute(() -> {
                                block2: {
                                    completableFuture.completeExceptionally(e.getCause());
                                    tcLoadSemaphore.release();
                                    long endTime = System.currentTimeMillis() + 30000L;
                                    while (System.currentTimeMillis() < endTime) {
                                        CompletableFuture future = (CompletableFuture)deque.poll();
                                        if (future != null) {
                                            future.completeExceptionally((Throwable)e);
                                            continue;
                                        }
                                        break block2;
                                    }
                                    deque.clear();
                                }
                                LOG.error("Add transaction metadata store with id {} error", (Object)tcId.getId(), e);
                            });
                            return null;
                        });
                    } else {
                        deque.add(completableFuture);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Handle tc client connect added into pending queue! tcId : {}", (Object)tcId.toString());
                        }
                    }
                }))).exceptionally(ex -> {
                    Throwable realCause = FutureUtil.unwrapCompletionException((Throwable)ex);
                    completableFuture.completeExceptionally(realCause);
                    return null;
                });
            }
        });
        return completableFuture;
    }

    public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) {
        Timer brokerClientSharedTimer = this.pulsarService.getBrokerClientSharedTimer();
        ServiceConfiguration serviceConfiguration = this.pulsarService.getConfiguration();
        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
        txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionLogBatchedWriteEnabled());
        txnLogBufferedWriterConfig.setBatchedWriteMaxRecords(serviceConfiguration.getTransactionLogBatchedWriteMaxRecords());
        txnLogBufferedWriterConfig.setBatchedWriteMaxSize(serviceConfiguration.getTransactionLogBatchedWriteMaxSize());
        txnLogBufferedWriterConfig.setBatchedWriteMaxDelayInMillis(serviceConfiguration.getTransactionLogBatchedWriteMaxDelayInMillis());
        return this.pulsarService.getBrokerService().getManagedLedgerConfig(MLTransactionLogImpl.getMLTransactionLogName((TransactionCoordinatorID)tcId)).thenCompose(v -> {
            TransactionTimeoutTracker timeoutTracker = this.timeoutTrackerFactory.newTracker(tcId);
            TransactionRecoverTrackerImpl recoverTracker = new TransactionRecoverTrackerImpl(this, timeoutTracker, tcId.getId());
            return this.transactionMetadataStoreProvider.openStore(tcId, this.pulsarService.getManagedLedgerFactory(), v, timeoutTracker, (TransactionRecoverTracker)recoverTracker, this.pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(), txnLogBufferedWriterConfig, brokerClientSharedTimer);
        });
    }

    public CompletableFuture<Void> removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
        Semaphore tcLoadSemaphore = (Semaphore)this.tcLoadSemaphores.computeIfAbsent(tcId.getId(), id -> new Semaphore(1));
        if (tcLoadSemaphore.tryAcquire()) {
            TransactionMetadataStore metadataStore = this.stores.remove(tcId);
            if (metadataStore != null) {
                metadataStore.closeAsync().whenComplete((v, ex) -> {
                    if (ex != null) {
                        LOG.error("Close transaction metadata store with id " + tcId, ex);
                    } else {
                        LOG.info("Removed and closed transaction meta store {}", (Object)tcId);
                    }
                });
            }
            tcLoadSemaphore.release();
            return CompletableFuture.completedFuture(null);
        }
        return FutureUtil.failedFuture((Throwable)new BrokerServiceException.ServiceUnitNotReadyException("Could not remove TransactionMetadataStore, it is doing other operations!"));
    }

    public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.newTransaction(timeoutInMills);
    }

    public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.addProducedPartitionToTxn(txnId, partitions);
    }

    public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnId, List<TransactionSubscription> partitions) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.addAckedPartitionToTxn(txnId, partitions);
    }

    public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnId) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.getTxnMeta(txnId);
    }

    public long getLowWaterMark(TxnID txnID) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnID);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return 0L;
        }
        return store.getLowWaterMark();
    }

    public CompletableFuture<Void> updateTxnStatus(TxnID txnId, TxnStatus newStatus, TxnStatus expectedStatus, boolean isTimeout) {
        TransactionCoordinatorID tcId = this.getTcIdFromTxnId(txnId);
        TransactionMetadataStore store = this.stores.get(tcId);
        if (store == null) {
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.CoordinatorNotFoundException(tcId));
        }
        return store.updateTxnStatus(txnId, newStatus, expectedStatus, isTimeout);
    }

    public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolean isTimeout) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.endTransaction(txnID, txnAction, isTimeout, future);
        return future;
    }

    public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout, CompletableFuture<Void> future) {
        TxnStatus newStatus;
        switch (txnAction) {
            case 0: {
                newStatus = TxnStatus.COMMITTING;
                break;
            }
            case 1: {
                newStatus = TxnStatus.ABORTING;
                break;
            }
            default: {
                TransactionCoordinatorException.UnsupportedTxnActionException exception = new TransactionCoordinatorException.UnsupportedTxnActionException(txnID, txnAction);
                LOG.error(exception.getMessage());
                future.completeExceptionally(exception);
                return;
            }
        }
        ((CompletableFuture)this.getTxnMeta(txnID).thenCompose(txnMeta -> {
            if (txnMeta.status() == TxnStatus.OPEN) {
                return this.updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout).thenCompose(__ -> this.endTxnInTransactionBuffer(txnID, txnAction));
            }
            return this.fakeAsyncCheckTxnStatus(txnMeta.status(), txnAction, txnID, newStatus).thenCompose(__ -> this.endTxnInTransactionBuffer(txnID, txnAction));
        })).whenComplete((__, ex) -> {
            if (ex == null) {
                future.complete(null);
                return;
            }
            if (!TransactionMetadataStoreService.isRetryableException(ex)) {
                LOG.error("End transaction fail! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, ex});
                future.completeExceptionally((Throwable)ex);
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, TxnAction : {}", new Object[]{txnID, txnAction, ex});
            }
            this.transactionOpRetryTimer.newTimeout(timeout -> this.endTransaction(txnID, txnAction, isTimeout, future), 1000L, TimeUnit.MILLISECONDS);
        });
    }

    private CompletionStage<Void> fakeAsyncCheckTxnStatus(TxnStatus txnStatus, int txnAction, TxnID txnID, TxnStatus expectStatus) {
        if (!(switch (txnStatus) {
            case TxnStatus.COMMITTING -> txnAction == TxnAction.COMMIT.getValue();
            case TxnStatus.ABORTING -> txnAction == TxnAction.ABORT.getValue();
            default -> false;
        })) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", (Object)txnID, (Object)txnAction);
            }
            return FutureUtil.failedFuture((Throwable)new CoordinatorException.InvalidTxnStatusException(txnID, expectStatus, txnStatus));
        }
        return CompletableFuture.completedFuture(null);
    }

    public void handleOpFail(Throwable e, TransactionCoordinatorID tcId) {
        if (e instanceof ManagedLedgerException.ManagedLedgerFencedException) {
            this.removeTransactionMetadataStore(tcId);
        }
    }

    public void endTransactionForTimeout(TxnID txnID) {
        ((CompletableFuture)this.getTxnMeta(txnID).thenCompose(txnMeta -> {
            if (txnMeta.status() == TxnStatus.OPEN) {
                return this.endTransaction(txnID, 1, true);
            }
            return null;
        })).exceptionally(e -> {
            if (TransactionMetadataStoreService.isRetryableException(e)) {
                this.endTransaction(txnID, 1, true);
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("Transaction have been handle complete, don't need to handle by transaction timeout! TxnId : {}", (Object)txnID);
            }
            return null;
        });
    }

    private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction) {
        return this.getTxnMeta(txnID).thenCompose(txnMeta -> {
            long lowWaterMark = this.getLowWaterMark(txnID);
            Stream<CompletableFuture> onSubFutureStream = txnMeta.ackedPartitions().stream().map(tbSub -> {
                switch (txnAction) {
                    case 0: {
                        return this.tbClient.commitTxnOnSubscription(tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                    }
                    case 1: {
                        return this.tbClient.abortTxnOnSubscription(tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                    }
                }
                return FutureUtil.failedFuture((Throwable)new IllegalStateException("Unsupported txnAction " + txnAction));
            });
            Stream<CompletableFuture> onTopicFutureStream = txnMeta.producedPartitions().stream().map(partition -> {
                switch (txnAction) {
                    case 0: {
                        return this.tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                    }
                    case 1: {
                        return this.tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(), lowWaterMark);
                    }
                }
                return FutureUtil.failedFuture((Throwable)new IllegalStateException("Unsupported txnAction " + txnAction));
            });
            return FutureUtil.waitForAll((Collection)Stream.concat(onSubFutureStream, onTopicFutureStream).collect(Collectors.toList())).thenCompose(__ -> this.endTxnInTransactionMetadataStore(txnID, txnAction));
        });
    }

    private static boolean isRetryableException(Throwable ex) {
        Throwable realCause = FutureUtil.unwrapCompletionException((Throwable)ex);
        return (realCause instanceof CoordinatorException.TransactionMetadataStoreStateException || realCause instanceof TransactionBufferClientException.RequestTimeoutException || realCause instanceof ManagedLedgerException || realCause instanceof PulsarClientException.BrokerPersistenceException || realCause instanceof PulsarClientException.LookupException || realCause instanceof TransactionBufferClientException.ReachMaxPendingOpsException || realCause instanceof PulsarClientException.ConnectException) && !(realCause instanceof ManagedLedgerException.ManagedLedgerFencedException);
    }

    private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID txnID, int txnAction) {
        if (TxnAction.COMMIT.getValue() == txnAction) {
            return this.updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false);
        }
        if (TxnAction.ABORT.getValue() == txnAction) {
            return this.updateTxnStatus(txnID, TxnStatus.ABORTED, TxnStatus.ABORTING, false);
        }
        return FutureUtil.failedFuture((Throwable)new CoordinatorException.InvalidTxnStatusException("Unsupported txnAction " + txnAction));
    }

    private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnId) {
        return new TransactionCoordinatorID(txnId.getMostSigBits());
    }

    @VisibleForTesting
    public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
        return Collections.unmodifiableMap(this.stores);
    }

    public void close() {
        this.internalPinnedExecutor.shutdown();
        this.stores.forEach((tcId, metadataStore) -> metadataStore.closeAsync().whenComplete((v, ex) -> {
            if (ex != null) {
                LOG.error("Close transaction metadata store with id " + tcId, ex);
            } else {
                LOG.info("Removed and closed transaction meta store {}", tcId);
            }
        }));
        this.stores.clear();
    }
}

