/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.transactions.cleanup;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.transactions.cleanup.Cleaner;
import com.couchbase.transactions.cleanup.CleanerFactory;
import com.couchbase.transactions.cleanup.CleanupFailedEvent;
import com.couchbase.transactions.cleanup.CleanupRequest;
import com.couchbase.transactions.cleanup.ClusterData;
import com.couchbase.transactions.cleanup.LostCleanupDistributed;
import com.couchbase.transactions.components.ActiveTransactionRecord;
import com.couchbase.transactions.config.TransactionConfig;
import com.couchbase.transactions.error.internal.ThreadStopRequested;
import com.couchbase.transactions.log.SimpleEventBusLogger;
import com.couchbase.transactions.log.TransactionCleanupAttempt;
import com.couchbase.transactions.log.TransactionLogEvent;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Stability.Internal
public class TransactionsCleanup {
    public static String CATEGORY = TransactionLogEvent.DEFAULT_CATEGORY + ".cleanup";
    public static String CATEGORY_STATS = TransactionLogEvent.DEFAULT_CATEGORY + ".cleanup.stats";
    public static String CATEGORY_CLIENT_RECORD = TransactionLogEvent.DEFAULT_CATEGORY + ".clientrecord";
    public static String LOST_CATEGORY = TransactionLogEvent.DEFAULT_CATEGORY + ".cleanup.lost";
    public static String REGULAR_CATEGORY = TransactionLogEvent.DEFAULT_CATEGORY + ".cleanup.regular";
    private final DelayQueue<CleanupRequest> cleanupQueue = new DelayQueue();
    private final TransactionConfig config;
    private volatile boolean stop = false;
    private final CountDownLatch stopLatch;
    private final ClusterData clusterData;
    private final LostCleanupDistributed lostCleanup;
    private final SimpleEventBusLogger LOGGER;
    private final SimpleEventBusLogger LOGGER_REGULAR;
    private final CleanerFactory cleanerFactory;

    public TransactionsCleanup(TransactionConfig config, ClusterData clusterData) {
        this.LOGGER = new SimpleEventBusLogger(clusterData.cluster().environment().eventBus(), CATEGORY);
        this.LOGGER_REGULAR = new SimpleEventBusLogger(clusterData.cluster().environment().eventBus(), REGULAR_CATEGORY);
        this.config = config;
        this.clusterData = clusterData;
        this.lostCleanup = new LostCleanupDistributed(config, clusterData, () -> this.getCleaner());
        int countdown = 0;
        this.cleanerFactory = config.cleanerFactory();
        this.LOGGER.info("Cleanup settings; regular cleanup thread enabled=" + config.runRegularAttemptsCleanupThread() + ", lost cleanup thread enabled=" + config.runLostAttemptsCleanupThread());
        if (config.runRegularAttemptsCleanupThread()) {
            this.runRegularAttemptsCleanupThread();
            ++countdown;
        }
        if (config.runLostAttemptsCleanupThread()) {
            this.lostCleanup.start();
        }
        this.stopLatch = new CountDownLatch(countdown);
    }

    boolean logDebug() {
        return true;
    }

    boolean logVerbose() {
        return true;
    }

    public ClusterData clusterData() {
        return this.clusterData;
    }

    public List<TransactionCleanupAttempt> forceCleanupQueueEmpty() {
        this.LOGGER.info("Forcing synchronous cleanup of all " + this.cleanupQueue.size() + " cleanup requests");
        SpanWrapper span = SpanWrapper.create(this.config, "transaction_force_cleanup_queue_empty");
        span.start();
        Flux attempts = Flux.fromStream(this.cleanupQueue.stream()).flatMap(req -> {
            this.LOGGER_REGULAR.info("Forcing cleanup of " + req.attemptId());
            Cleaner cleaner = this.getCleaner();
            return cleaner.performCleanup((CleanupRequest)req, true, span).onErrorResume(err -> {
                this.LOGGER_REGULAR.info(String.format("While trying to force cleanup for attempt %s got error '%s', continuing and leaving a lost txn", req.attemptId(), err.toString()));
                return Mono.empty();
            });
        });
        this.LOGGER_REGULAR.info("Finished synchronous cleanup of all cleanup requests");
        span.finish();
        return (List)attempts.collectList().block();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopBackgroundProcesses() {
        TransactionsCleanup transactionsCleanup = this;
        synchronized (transactionsCleanup) {
            this.stop = true;
        }
        this.LOGGER.info(String.format("Waiting for %d regular background threads to exit", this.stopLatch.getCount()));
        try {
            if (!this.stopLatch.await(10L, TimeUnit.SECONDS)) {
                this.LOGGER.info("Background threads did not stop in expected time");
            }
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.lostCleanup.stop();
        this.LOGGER.info("Background threads have exitted");
    }

    public LostCleanupDistributed lostCleanup() {
        return this.lostCleanup;
    }

    private void runRegularAttemptsCleanupThread() {
        Objects.requireNonNull(this.LOGGER);
        this.LOGGER_REGULAR.info("Starting background cleanup thread to find transactions from this client");
        Flux.interval((Duration)Duration.of(100L, ChronoUnit.MILLIS), (Scheduler)Schedulers.elastic()).flatMap(v -> {
            if (this.stop) {
                this.LOGGER_REGULAR.info("Stopping background cleanup thread for transactions from this client");
                this.stopLatch.countDown();
                return Mono.error((Throwable)new ThreadStopRequested());
            }
            return Mono.just((Object)v);
        }).flatMap(v -> {
            ArrayList<CleanupRequest> requests = new ArrayList<CleanupRequest>();
            CleanupRequest head = null;
            do {
                if ((head = (CleanupRequest)this.cleanupQueue.poll()) == null) continue;
                requests.add(head);
            } while (head != null);
            return Flux.fromIterable(requests);
        }).flatMap(req -> {
            SpanWrapper span = SpanWrapper.create(this.config, "transaction_cleanup");
            Cleaner cleaner = this.getCleaner();
            return cleaner.performCleanup((CleanupRequest)req, true, span).doOnSuccess(result -> {
                this.LOGGER_REGULAR.debug(String.format("result of cleanup request %s: success=%s", req, result.success()));
                if (!result.success()) {
                    result.logs().forEach(log -> this.LOGGER_REGULAR.debug(String.format("result of cleanup request %s log: %s", req, log.toString())));
                }
            }).onErrorResume(err -> {
                Duration timeBeforeRehandlingFailedCleanupDefault = cleaner.timeBeforeRehandlingFailedCleanup();
                CleanupFailedEvent ev = new CleanupFailedEvent((CleanupRequest)req, (Throwable)err);
                this.clusterData.cluster().environment().eventBus().publish((Event)ev);
                this.LOGGER_REGULAR.debug(String.format("error while handling cleanup request %s, retrying in %dmsecs: '%s'", req, timeBeforeRehandlingFailedCleanupDefault.toMillis(), err));
                CleanupRequest newReq = new CleanupRequest(req.attemptId(), req.atrId(), req.atrCollection(), req.state(), req.stagedReplaces(), req.stagedRemoves(), req.stagedInserts(), timeBeforeRehandlingFailedCleanupDefault, req.forwardCompatibility(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - req.createdAt()), req.durabilityLevel());
                this.cleanupQueue.add(newReq);
                return Mono.empty();
            }).doOnSubscribe(v -> span.start()).doFinally(v -> span.finish());
        }).retryWhen(Retry.allBut((Class[])new Class[]{ThreadStopRequested.class}).exponentialBackoff(Duration.of(10L, ChronoUnit.MILLIS), Duration.of(2000L, ChronoUnit.MILLIS)).doOnRetry(v -> this.LOGGER_REGULAR.debug(String.format("retrying regular cleanup on error '%s'", v.exception()))).retryMax(100000L).toReactorRetry()).subscribe(next -> {}, err -> {
            if (!(err instanceof ThreadStopRequested)) {
                this.LOGGER_REGULAR.warn("regular cleanup thread ended with exception " + err);
            }
        }, () -> this.LOGGER_REGULAR.warn("regular cleanup thread ending"));
    }

    public Cleaner getCleaner() {
        return this.cleanerFactory.create(this.config, this.clusterData);
    }

    public void clearCleanupQueue() {
        this.cleanupQueue.clear();
    }

    public int cleanupQueueLength() {
        return this.cleanupQueue.size();
    }

    public void add(CleanupRequest cleanupRequest) {
        this.cleanupQueue.add(cleanupRequest);
    }

    public Mono<Void> forceATRCleanup(ReactiveCollection atrCollection, String atrId) {
        SpanWrapper span = SpanWrapper.create(this.config, "transaction_force_atr_cleanup");
        return ActiveTransactionRecord.getAtr(atrCollection, atrId, OptionsWrapperUtil.kvTimeoutNonMutating(this.config, atrCollection.core()), span).flatMap(v -> {
            if (v.isPresent()) {
                return Mono.just(v.get());
            }
            return Mono.empty();
        }).doOnError(err -> this.LOGGER_REGULAR.debug(String.format("Got error '%s' while cleaning up ATR %s/%s", err, atrCollection.name(), atrId))).flatMapMany(v -> Flux.fromIterable(v.entries())).flatMap(atrEntry -> {
            Cleaner cleaner = this.getCleaner();
            CleanupRequest req = CleanupRequest.fromAtrEntry(atrCollection, atrEntry);
            return cleaner.performCleanup(req, false, span);
        }).then().doOnSubscribe(v -> span.start()).doFinally(s -> span.finish());
    }
}

