/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.transactions.cleanup;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.manager.bucket.BucketSettings;
import com.couchbase.transactions.atr.ATRIds;
import com.couchbase.transactions.cleanup.ATRStats;
import com.couchbase.transactions.cleanup.AccessError;
import com.couchbase.transactions.cleanup.Cleaner;
import com.couchbase.transactions.cleanup.CleanupRequest;
import com.couchbase.transactions.cleanup.ClientRecord;
import com.couchbase.transactions.cleanup.ClientRecordDetails;
import com.couchbase.transactions.cleanup.ClusterData;
import com.couchbase.transactions.cleanup.TransactionsCleanup;
import com.couchbase.transactions.components.ATR;
import com.couchbase.transactions.components.ATRUtil;
import com.couchbase.transactions.components.ActiveTransactionRecord;
import com.couchbase.transactions.components.CasMode;
import com.couchbase.transactions.config.MergedTransactionConfig;
import com.couchbase.transactions.config.TransactionConfig;
import com.couchbase.transactions.error.internal.ThreadStopRequested;
import com.couchbase.transactions.log.SimpleEventBusLogger;
import com.couchbase.transactions.log.TransactionCleanupAttempt;
import com.couchbase.transactions.log.TransactionCleanupEndRunEvent;
import com.couchbase.transactions.log.TransactionCleanupStartRunEvent;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import com.couchbase.transactions.support.SpanWrapperUtil;
import com.couchbase.transactions.util.DebugUtil;
import com.couchbase.transactions.util.SchedulerUtil;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuples;

@Stability.Internal
public class LostCleanupDistributed {
    private final ClientRecord clientRecord;
    private final SimpleEventBusLogger LOGGER;
    private final ClusterData clusterData;
    private final MergedTransactionConfig config;
    private final Supplier<Cleaner> cleanerSupplier;
    private volatile boolean stop = false;
    private CountDownLatch stopLatch;
    private Disposable cleanupThreadLauncher;
    private final Duration actualCleanupWindow;
    private final String clientUuid = UUID.randomUUID().toString();
    private final String bp;
    private final Set<String> bucketThreads = ConcurrentHashMap.newKeySet();
    private static final Duration DEFAULT_SAFETY_MARGIN = Duration.of(1500L, ChronoUnit.MILLIS);
    private static final int DEFAULT_CLEANUP_BUCKET_POLLING_INTERVAL_SECS = 600;
    private static final int MAX_BUCKETS = 200;

    public LostCleanupDistributed(TransactionConfig config, ClusterData clusterData, Supplier<Cleaner> cleanerSupplier) {
        this(new MergedTransactionConfig(config), clusterData, cleanerSupplier);
    }

    public LostCleanupDistributed(MergedTransactionConfig config, ClusterData clusterData, Supplier<Cleaner> cleanerSupplier) {
        this.LOGGER = new SimpleEventBusLogger(clusterData.cluster().environment().eventBus(), TransactionsCleanup.LOST_CATEGORY);
        this.clientRecord = config.clientRecordFactory().create(config, clusterData);
        this.clusterData = clusterData;
        this.config = config;
        this.cleanerSupplier = cleanerSupplier;
        this.actualCleanupWindow = config.cleanupWindow();
        this.bp = String.format("Client %s", this.clientUuid.substring(0, 5));
    }

    public String clientUuid() {
        return this.clientUuid;
    }

    public void stop() {
        this.stopThreads();
        this.clientRecord.removeClientFromAllBuckets(this.clientUuid).onErrorResume(err -> {
            this.LOGGER.warn(String.format("%s failed to remove from all buckets with err: %s", this.bp, err));
            return Mono.empty();
        }).blockLast();
        this.LOGGER.info(String.format("%s stopped lost cleanup process", this.bp));
    }

    public void stopThreads() {
        int numBucketThreads = this.bucketThreads.size();
        this.LOGGER.info(String.format("%s stopping lost cleanup process, waiting for %d threads to end", this.bp, numBucketThreads));
        this.stopLatch = new CountDownLatch(numBucketThreads);
        this.stop = true;
        try {
            if (!this.stopLatch.await(10L, TimeUnit.SECONDS)) {
                this.LOGGER.info("Lost background cleanup threads did not stop in time");
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        if (this.cleanupThreadLauncher != null) {
            this.cleanupThreadLauncher.dispose();
        }
    }

    private boolean logDebug() {
        return true;
    }

    private boolean logVerbose() {
        return true;
    }

    public static List<String> atrsToHandle(int indexOfThisClient, int numActiveClients, int numAtrs) {
        List<String> allAtrs = ATRIds.allAtrs(numAtrs);
        ArrayList<String> out = new ArrayList<String>();
        for (int i = indexOfThisClient; i < allAtrs.size(); i += numActiveClients) {
            out.add(allAtrs.get(i));
        }
        return out;
    }

    private RequestTracer tracer() {
        return this.clusterData.cluster().environment().requestTracer();
    }

    public Flux<TransactionCleanupAttempt> handleATRCleanup(String bp, ReactiveCollection atrCollection, String atrId, ATRStats stats, Duration safetyMargin) {
        return this.handleATRCleanup(bp, atrCollection, atrId, stats, safetyMargin, null);
    }

    public Flux<TransactionCleanupAttempt> handleATRCleanup(String bp, ReactiveCollection atrCollection, String atrId, ATRStats stats, Duration safetyMargin, SpanWrapper pspan) {
        return Flux.defer(() -> {
            long start = System.nanoTime();
            AtomicLong timeToFetchAtr = new AtomicLong(0L);
            AtomicReference<CasMode> casMode = new AtomicReference<CasMode>(CasMode.UNKNOWN);
            SpanWrapper span = SpanWrapperUtil.createOp(null, this.tracer(), atrCollection, atrId, "cleanup.atr", pspan);
            Cleaner cleaner = this.cleanerSupplier.get();
            return cleaner.beforeAtrGet(atrId).then(ActiveTransactionRecord.getAtr(atrCollection, atrId, OptionsWrapperUtil.kvTimeoutNonMutating(this.config, atrCollection.core()), span)).flatMap(v -> {
                timeToFetchAtr.set(System.nanoTime());
                if (v.isPresent()) {
                    return Mono.just((Object)((ATR)v.get()));
                }
                return Mono.empty();
            }).doOnError(err -> {
                this.LOGGER.debug(String.format("%s Got error '%s' while getting ATR %s/", bp, err, ATRUtil.getAtrDebug(atrCollection, atrId)));
                stats.errored = Optional.of(err);
            }).flatMapMany(atr -> {
                Collection expired;
                casMode.set(atr.casMode());
                stats.numEntries = atr.entries().size();
                stats.exists = true;
                stats.errored = Optional.empty();
                stats.expired = expired = (Collection)atr.entries().stream().filter(v -> v.hasExpired(safetyMargin.toMillis())).collect(Collectors.toList());
                span.attribute("db.couchbase.transactions.cleanup.atr.num_entries", stats.numEntries);
                span.attribute("db.couchbase.transactions.cleanup.atr.num_expired", stats.expired);
                return Flux.fromIterable((Iterable)expired).publishOn(SchedulerUtil.schedulerCleanup);
            }).concatMap(atrEntry -> {
                this.LOGGER.verbose(String.format("%s Found expired attempt %s, expires after %d, age %d (started %d, now %d)", bp, atrEntry.attemptId(), atrEntry.expiresAfterMsecs().orElse(-1), atrEntry.ageMsecs(), atrEntry.timestampStartMsecs().orElse(0L), atrEntry.cas() / 1000000L));
                stats.expiredEntryCleanupTotalAttempts.incrementAndGet();
                CleanupRequest req = CleanupRequest.fromAtrEntry(atrCollection, atrEntry);
                return cleaner.performCleanup(req, false, span).onErrorResume(err -> {
                    stats.expiredEntryCleanupFailedAttempts.incrementAndGet();
                    return Mono.empty();
                });
            }).doFinally(v -> {
                long now = System.nanoTime();
                this.LOGGER.verbose(String.format("%s processed ATR %s after %s\u00b5s (%d fetching ATR), CAS=%s: %s", bp, ATRUtil.getAtrDebug(atrCollection, atrId), TimeUnit.NANOSECONDS.toMicros(now - start), TimeUnit.NANOSECONDS.toMicros(timeToFetchAtr.get() - start), casMode.get(), stats));
                span.finish();
            }).onErrorResume(err -> Mono.empty());
        });
    }

    Mono<Void> createThreadForBucketIfNeeded(String bucketName) {
        return Mono.defer(() -> {
            boolean wasNotTracking = this.bucketThreads.add(bucketName);
            if (wasNotTracking) {
                this.LOGGER.info(String.format("%s will start cleaning lost transactions on bucket %s", this.bp, RedactableArgument.redactMeta((Object)bucketName)));
                return this.clusterData.getBucketFromName(bucketName).waitUntilReady(Duration.ofSeconds(Integer.MAX_VALUE)).then(this.clusterData.getBucketDefaultCollection(bucketName)).flatMap(collection -> this.perBucketThread((ReactiveCollection)collection).onErrorResume(err -> {
                    this.bucketThreads.remove(bucketName);
                    if (err instanceof ThreadStopRequested || err instanceof AccessError) {
                        return Mono.empty();
                    }
                    this.LOGGER.warn(String.format("%s %s/%s lost transactions thread has ended on error %s", this.bp, RedactableArgument.redactMeta((Object)collection.bucketName()), RedactableArgument.redactMeta((Object)collection.name()), DebugUtil.dbg(err)));
                    return Mono.error((Throwable)err);
                }));
            }
            return Mono.empty();
        });
    }

    void periodicallyConfigureFromExistingBuckets() {
        Scheduler scheduler = Schedulers.newBoundedElastic((int)200, (int)Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, (String)"cb-txn-cleanup");
        this.cleanupThreadLauncher = Flux.interval((Duration)Duration.ZERO, (Duration)Duration.ofSeconds(Integer.parseInt(System.getProperty("com.couchbase.transactions.cleanupBucketPollingIntervalSecs", Integer.toString(600))))).concatMap(v -> this.clusterData.cluster().buckets().reactive().getAllBuckets()).doOnNext(v -> this.LOGGER.debug(String.format("%s found %d buckets", this.bp, v.size()))).concatMap(buckets -> Flux.fromIterable(buckets.values())).map(BucketSettings::name).publishOn(scheduler).flatMap(this::createThreadForBucketIfNeeded).doOnCancel(() -> {
            this.LOGGER.info(String.format("%s has been told to cancel", this.bp));
            this.stopLatch.countDown();
        }).subscribe(v -> this.LOGGER.warn(String.format("%s lost transactions cleanup thread(s) ending", this.bp)), err -> {
            if (err instanceof ThreadStopRequested) {
                this.LOGGER.info(String.format("%s lost transactions cleanup told to stop", this.bp));
            } else {
                this.LOGGER.warn(String.format("%s lost transactions cleanup ended with exception " + err, this.bp));
            }
        });
    }

    void start() {
        if (this.config.metadataCollection().isPresent()) {
            this.perBucketThread(this.config.metadataCollection().get().reactive()).subscribe(ignored -> {}, err -> {
                if (!(err instanceof ThreadStopRequested)) {
                    this.LOGGER.warn(String.format("%s %s/%s lost transactions thread has ended on error %s", this.bp, RedactableArgument.redactMeta((Object)this.config.metadataCollection().get().bucketName()), RedactableArgument.redactMeta((Object)this.config.metadataCollection().get().name()), DebugUtil.dbg(err)));
                }
            });
        } else {
            this.periodicallyConfigureFromExistingBuckets();
        }
    }

    private Mono<Void> perBucketThread(ReactiveCollection collection) {
        return Mono.defer(() -> {
            String bp = "lost/" + RedactableArgument.redactMeta((Object)(collection.bucketName() + "." + collection.scopeName() + "." + collection.name())) + "/clientId=" + this.clientUuid.substring(0, 5);
            AtomicReference span = new AtomicReference();
            return Mono.fromRunnable(() -> span.set(SpanWrapperUtil.createOp(null, this.tracer(), collection, null, "cleanup.run", null).attribute("db.couchbase.transactions.cleanup.client_uuid", this.clientUuid))).then(this.clientRecord.processClient(this.clientUuid, collection, this.config, (SpanWrapper)span.get())).flatMap(clientDetails -> {
                long startOfRun = System.nanoTime();
                HashMap atrStats = new HashMap();
                List<String> atrsHandledByThisClient = LostCleanupDistributed.atrsToHandle(clientDetails.indexOfThisClient(), clientDetails.numActiveClients(), this.config.numAtrs());
                long checkAtrEveryNNanos = Math.max(1L, this.actualCleanupWindow.toNanos() / (long)atrsHandledByThisClient.size());
                if (atrsHandledByThisClient.size() < this.config.numAtrs()) {
                    atrsHandledByThisClient.forEach(id -> {});
                } else {
                    this.LOGGER.verbose(String.format("%s owns all %d ATRs and will check them over next %dmills, checking an ATR every %dnanos", bp, this.config.numAtrs(), this.actualCleanupWindow.toMillis(), checkAtrEveryNNanos));
                }
                TransactionCleanupStartRunEvent ev = new TransactionCleanupStartRunEvent(collection.bucketName(), collection.scopeName(), collection.name(), this.clientUuid, (ClientRecordDetails)clientDetails, this.actualCleanupWindow, atrsHandledByThisClient.size(), this.config.numAtrs(), Duration.ofMillis(checkAtrEveryNNanos));
                this.clusterData.cluster().environment().eventBus().publish((Event)ev);
                return Flux.zip((Publisher)Flux.fromIterable(atrsHandledByThisClient), (Publisher)Flux.interval((Duration)Duration.of(checkAtrEveryNNanos, ChronoUnit.NANOS))).publishOn(SchedulerUtil.schedulerCleanup).flatMap(v -> {
                    String atrId = (String)v.getT1();
                    this.LOGGER.verbose(String.format("%s checking for lost txns in atr %s", bp, ATRUtil.getAtrDebug(collection, atrId)));
                    ATRStats stats = new ATRStats();
                    return this.checkIfThreadStopped(collection, atrId).thenMany(this.handleATRCleanup(bp, collection, atrId, stats, DEFAULT_SAFETY_MARGIN, (SpanWrapper)span.get())).then(Mono.fromRunnable(() -> atrStats.put(atrId, stats))).thenReturn((Object)atrId);
                }).onErrorResume(err -> {
                    if (err instanceof ThreadStopRequested) {
                        return Mono.error((Throwable)err);
                    }
                    this.LOGGER.info(String.format("%s lost cleanup thread got error '%s', continuing", bp, err));
                    return Mono.empty();
                }).then().thenReturn((Object)Tuples.of(atrStats, (Object)((Object)ev), (Object)startOfRun));
            }).doOnNext(stats -> {
                Duration timeForRun = Duration.ofNanos(System.nanoTime() - (Long)stats.getT3());
                TransactionCleanupEndRunEvent ev = new TransactionCleanupEndRunEvent((TransactionCleanupStartRunEvent)((Object)((Object)((Object)stats.getT2()))), (Map)stats.getT1(), timeForRun);
                this.clusterData.cluster().environment().eventBus().publish((Event)ev);
            }).retryWhen(Retry.allBut((Class[])new Class[]{ThreadStopRequested.class, AccessError.class}).exponentialBackoff(Duration.ofMillis(Math.min(1000L, this.config.cleanupWindow().toMillis())), this.config.cleanupWindow()).doOnRetry(v -> this.LOGGER.debug(String.format("%s retrying lost cleanup on error %s after %s", bp, DebugUtil.dbg(v.exception()), v.backoff()))).toReactorRetry()).doOnNext(v -> ((SpanWrapper)span.get()).finish()).doOnError(err -> ((SpanWrapper)span.get()).failWith((Throwable)err)).repeat().then();
        });
    }

    private Mono<String> checkIfThreadStopped(ReactiveCollection collection, String v) {
        return Mono.defer(() -> {
            if (this.stop) {
                this.LOGGER.info(String.format("%s Stopping background cleanup thread for lost transactions on %s/%s", this.bp, collection.bucketName(), collection.name()));
                this.stopLatch.countDown();
                return Mono.error((Throwable)new ThreadStopRequested());
            }
            return Mono.just((Object)v);
        });
    }
}

