/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.transactions.cleanup;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.codec.Transcoder;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.InsertOptions;
import com.couchbase.client.java.kv.LookupInResult;
import com.couchbase.client.java.kv.MutateInOptions;
import com.couchbase.client.java.kv.MutateInSpec;
import com.couchbase.client.java.kv.RemoveOptions;
import com.couchbase.transactions.TransactionGetResult;
import com.couchbase.transactions.cleanup.CleanupRequest;
import com.couchbase.transactions.cleanup.ClusterData;
import com.couchbase.transactions.cleanup.TransactionsCleanup;
import com.couchbase.transactions.components.ATREntry;
import com.couchbase.transactions.components.ATRUtil;
import com.couchbase.transactions.components.DocRecord;
import com.couchbase.transactions.components.DocumentGetter;
import com.couchbase.transactions.config.TransactionConfig;
import com.couchbase.transactions.error.internal.ErrorClasses;
import com.couchbase.transactions.forwards.ForwardCompatibility;
import com.couchbase.transactions.forwards.ForwardCompatibilityStages;
import com.couchbase.transactions.forwards.Supported;
import com.couchbase.transactions.log.SimpleEventBusLogger;
import com.couchbase.transactions.log.TransactionCleanupAttempt;
import com.couchbase.transactions.log.TransactionLogger;
import com.couchbase.transactions.support.AttemptStates;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import com.couchbase.transactions.util.DebugUtil;
import com.couchbase.transactions.util.TriFunction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

@Stability.Internal
public class Cleaner {
    private final TransactionConfig config;
    private final ClusterData clusterData;
    private final SimpleEventBusLogger LOGGER;
    private final EventBus eventBus;
    private static final Duration TIME_BEFORE_REHANDLING_FAILED_CLEANUP_DEFAULT = Duration.ofSeconds(10L);
    protected Optional<Duration> timeBeforeRehandlingFailedCleanupDefault = Optional.empty();
    private static final int BEING_LOGGING_FAILED_CLEANUPS_AT_WARN_AFTER_X_MINUTES = 2880;

    public Cleaner(TransactionConfig config, ClusterData clusterData) {
        this.eventBus = clusterData.cluster().environment().eventBus();
        this.LOGGER = new SimpleEventBusLogger(this.eventBus, TransactionsCleanup.CATEGORY + ".cleaner");
        this.config = config;
        this.clusterData = clusterData;
    }

    private Transcoder getTranscoder() {
        return this.clusterData.cluster().environment().transcoder();
    }

    public Duration timeBeforeRehandlingFailedCleanup() {
        return this.timeBeforeRehandlingFailedCleanupDefault.orElse(TIME_BEFORE_REHANDLING_FAILED_CLEANUP_DEFAULT);
    }

    Mono<Void> cleanupDocs(TransactionLogger perEntryLog, CleanupRequest req, SpanWrapper pspan) {
        String attemptId = req.attemptId();
        switch (req.state()) {
            case COMMITTED: {
                Mono<Void> inserts = this.commitDocs(perEntryLog, attemptId, req.stagedInserts(), req, pspan);
                Mono<Void> replaces = this.commitDocs(perEntryLog, attemptId, req.stagedReplaces(), req, pspan);
                Mono<Void> removes = this.removeDocsStagedForRemoval(perEntryLog, attemptId, req.stagedRemoves(), req, pspan);
                return inserts.then(replaces).then(removes);
            }
            case ABORTED: {
                Mono<Void> inserts = this.removeDocs(perEntryLog, attemptId, req.stagedInserts(), req, pspan);
                Mono<Void> replaces = this.removeTxnLinks(perEntryLog, attemptId, req.stagedReplaces(), req, pspan);
                Mono<Void> removes = this.removeTxnLinks(perEntryLog, attemptId, req.stagedRemoves(), req, pspan);
                return inserts.then(replaces).then(removes);
            }
            case PENDING: {
                perEntryLog.logDefer(req.attemptId(), "No docs cleanup possible as txn in state %s, just removing", Event.Severity.DEBUG, new Object[]{req.state()});
                return Mono.empty();
            }
        }
        perEntryLog.logDefer(req.attemptId(), "No docs cleanup to do as txn in state %s, just removing", Event.Severity.DEBUG, new Object[]{req.state()});
        return Mono.empty();
    }

    private Mono<Void> commitDocs(TransactionLogger perEntryLog, String attemptId, List<DocRecord> docs, CleanupRequest req, SpanWrapper pspan) {
        return this.doPerDoc(perEntryLog, attemptId, docs, pspan, true, (bucket, doc, lir) -> {
            assert (doc.links().isDocumentInTransaction());
            assert (doc.links().stagedContent().isPresent());
            JsonObject content = JsonObject.fromJson((String)doc.links().stagedContent().get());
            return this.beforeCommitDoc(doc.id()).then(Mono.defer(() -> {
                if (lir.isDeleted()) {
                    return bucket.insert(doc.id(), (Object)content, (InsertOptions)OptionsWrapperUtil.wrap(InsertOptions.insertOptions(), this.config, req.durabilityLevel(), bucket.core()).clientContext(OptionsWrapperUtil.createClientContext("Cleaner::commitDocs")));
                }
                return bucket.mutateIn(doc.id(), Arrays.asList(MutateInSpec.remove((String)"txn").xattr(), MutateInSpec.replace((String)"", (Object)content)), OptionsWrapperUtil.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("Cleaner::commitDocs"))).cas(doc.cas()), this.config, req.durabilityLevel(), bucket.core()));
            })).doOnSubscribe(v -> perEntryLog.logDefer(attemptId, "removing txn links and writing content to doc %s", Event.Severity.DEBUG, DebugUtil.docId(doc))).then();
        });
    }

    private Mono<Void> removeTxnLinks(TransactionLogger perEntryLog, String attemptId, List<DocRecord> docs, CleanupRequest req, SpanWrapper pspan) {
        return this.doPerDoc(perEntryLog, attemptId, docs, pspan, false, (bucket, doc, lir) -> this.beforeRemoveLinks(doc.id()).then(bucket.mutateIn(doc.id(), Arrays.asList(MutateInSpec.remove((String)"txn").xattr()), OptionsWrapperUtil.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("Cleaner::removeTxnLinks"))).accessDeleted(lir.isDeleted()).cas(doc.cas()), this.config, req.durabilityLevel(), bucket.core()))).doOnSubscribe(v -> perEntryLog.logDefer(attemptId, "removing txn links from doc %s", Event.Severity.DEBUG, DebugUtil.docId(doc))).then());
    }

    private Mono<Void> removeDocsStagedForRemoval(TransactionLogger perEntryLog, String attemptId, List<DocRecord> docs, CleanupRequest req, SpanWrapper pspan) {
        return this.doPerDoc(perEntryLog, attemptId, docs, pspan, true, (collection, doc, lir) -> {
            if (doc.links().isDocumentBeingRemoved()) {
                return this.beforeRemoveDocStagedForRemoval(doc.id()).then(collection.remove(doc.id(), OptionsWrapperUtil.wrap(RemoveOptions.removeOptions(), this.config, req.durabilityLevel(), collection.core()).cas(doc.cas()))).doOnSubscribe(v -> perEntryLog.debug(attemptId, "removing doc %s", doc.id())).then();
            }
            return Mono.create(v -> {
                perEntryLog.debug(attemptId, "doc %s does not have expected remove indication, skipping", DebugUtil.docId(doc));
                v.success();
            });
        });
    }

    private Mono<Void> removeDocs(TransactionLogger perEntryLog, String attemptId, List<DocRecord> docs, CleanupRequest req, SpanWrapper pspan) {
        return this.doPerDoc(perEntryLog, attemptId, docs, pspan, false, (collection, doc, lir) -> this.beforeRemoveDoc(doc.id()).then(Mono.defer(() -> {
            if (lir.isDeleted()) {
                return doc.collection().mutateIn(doc.id(), Collections.singletonList(MutateInSpec.remove((String)"txn").xattr()), OptionsWrapperUtil.wrap(MutateInOptions.mutateInOptions(), this.config, req.durabilityLevel(), doc.collection().core()).cas(doc.cas()).accessDeleted(true));
            }
            return collection.remove(doc.id(), OptionsWrapperUtil.wrap(RemoveOptions.removeOptions(), this.config, req.durabilityLevel(), doc.collection().core()).cas(doc.cas()));
        })).doOnSubscribe(v -> perEntryLog.debug(attemptId, "removing doc %s", DebugUtil.docId(doc))).then());
    }

    private Mono<Void> doPerDoc(TransactionLogger perEntryLog, String attemptId, List<DocRecord> docs, SpanWrapper pspan, boolean requireCrc32ToMatchStaging, TriFunction<ReactiveCollection, TransactionGetResult, LookupInResult, Mono<Void>> perDoc) {
        return Flux.fromIterable(docs).concatMap(docRecord -> {
            ReactiveCollection collection = this.clusterData.getBucketFromName(docRecord.bucketName()).scope(docRecord.scopeName()).collection(docRecord.collectionName());
            return this.beforeDocGet(docRecord.id()).then(this.doPerDocGotDoc(perEntryLog, attemptId, pspan, requireCrc32ToMatchStaging, perDoc, (DocRecord)docRecord, collection));
        }).then();
    }

    private Mono<Void> doPerDocGotDoc(TransactionLogger perEntryLog, String attemptId, SpanWrapper pspan, boolean requireCrc32ToMatchStaging, TriFunction<ReactiveCollection, TransactionGetResult, LookupInResult, Mono<Void>> perDoc, DocRecord docRecord, ReactiveCollection collection) {
        return DocumentGetter.justGetDoc(collection, this.config, docRecord.id(), pspan, this.getTranscoder(), true).flatMap(docOpt -> {
            if (docOpt.isPresent()) {
                TransactionGetResult doc = (TransactionGetResult)((Tuple2)docOpt.get()).getT1();
                LookupInResult lir = (LookupInResult)((Tuple2)docOpt.get()).getT2();
                perEntryLog.debug(attemptId, "handling doc %s with cas %d and links %s, isTombstone=%s", DebugUtil.docId(doc), doc.cas(), doc.links(), lir.isDeleted());
                if (!doc.links().isDocumentInTransaction()) {
                    perEntryLog.debug(attemptId, "no staged content for doc %s, assuming it was committed and skipping", DebugUtil.docId(doc));
                    return Mono.empty();
                }
                if (!doc.links().stagedAttemptId().get().equals(attemptId)) {
                    perEntryLog.debug(attemptId, "for doc %s, staged version is for a different attempt %s, skipping", DebugUtil.docId(doc), doc.links().stagedAttemptId().get());
                    return Mono.empty();
                }
                if (requireCrc32ToMatchStaging && doc.links().crc32OfStaging().isPresent()) {
                    String crc32WhenStaging = doc.links().crc32OfStaging().get();
                    String crc32Now = doc.documentMetadata().get().crc32().get();
                    perEntryLog.debug(attemptId, "checking whether document %s has changed since staging, crc32 then %s now %s", DebugUtil.docId(doc), crc32WhenStaging, crc32Now);
                    if (!crc32Now.equals(crc32WhenStaging)) {
                        perEntryLog.warn(attemptId, "document %s has changed since staging, ignoring it to avoid data loss", DebugUtil.docId(doc));
                        return Mono.empty();
                    }
                }
                return (Mono)perDoc.apply(collection, doc, lir);
            }
            perEntryLog.debug(attemptId, "could not get doc %s, skipping", DebugUtil.docId(collection, docRecord.id()));
            return Mono.empty();
        }).onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            perEntryLog.debug(attemptId, "got exception while handling doc %s: %s", DebugUtil.docId(collection, docRecord.id()), DebugUtil.dbg(err));
            if (ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                perEntryLog.debug(attemptId, "got CAS mismatch while cleaning up doc %s, failing this cleanup attempt (it will be retried)", DebugUtil.docId(collection, docRecord.id()));
                return Mono.error((Throwable)err);
            }
            return Mono.error((Throwable)err);
        });
    }

    public Mono<TransactionCleanupAttempt> cleanupATREntry(ReactiveCollection atrCollection, String atrId, String attemptId, ATREntry atrEntry, boolean isRegularCleanup) {
        CleanupRequest req = CleanupRequest.fromAtrEntry(atrCollection, atrEntry);
        return this.performCleanup(req, isRegularCleanup, null);
    }

    public Mono<TransactionCleanupAttempt> performCleanup(CleanupRequest req, boolean isRegularCleanup, @Nullable SpanWrapper pspan) {
        SpanWrapper span = SpanWrapper.create(this.config, "transaction_atr_cleanup", pspan);
        return Mono.defer(() -> {
            ReactiveCollection atrCollection = req.atrCollection();
            String atrId = req.atrId();
            String attemptId = req.attemptId();
            TransactionLogger perEntryLog = new TransactionLogger(this.clusterData.cluster().environment().eventBus(), ATRUtil.getAtrDebug(atrCollection, atrId).toString(), false, Event.Severity.INFO);
            Event.Severity logLevel = Event.Severity.DEBUG;
            perEntryLog.logDefer(attemptId, "Cleaning up ATR entry (isRegular=%s) %s", logLevel, isRegularCleanup, req);
            Mono<Void> cleanupDocs = this.cleanupDocs(perEntryLog, req, span);
            Mono<Object> cleanupEntry = this.removeATREntry(req.state(), atrCollection, atrId, attemptId, perEntryLog, span, req);
            return ForwardCompatibility.check(ForwardCompatibilityStages.CLEANUP_ENTRY, req.forwardCompatibility(), perEntryLog, Supported.SUPPORTED).then(cleanupDocs).doOnSuccess(v -> this.onCleanupDocsCompleted()).then(cleanupEntry).then(Mono.fromCallable(() -> {
                this.onCleanupCompleted();
                TransactionCleanupAttempt event = new TransactionCleanupAttempt(Event.Severity.DEBUG, true, isRegularCleanup, perEntryLog.logs(), attemptId, atrId, atrCollection, req, "");
                this.eventBus.publish((Event)event);
                return event;
            })).onErrorResume(err -> {
                long ageInMinutes = TimeUnit.MILLISECONDS.toMinutes(req.ageMillis());
                perEntryLog.logDefer(attemptId, "error while attempting to cleanup ATR entry %s, entry is %d mins old, aborting, lost txn cleanup process will retry later: %s", Event.Severity.WARN, ATRUtil.getAtrDebug(atrCollection, atrId), ageInMinutes, DebugUtil.dbg(err));
                Event.Severity level = Event.Severity.DEBUG;
                String addlDebug = "";
                if (ageInMinutes >= 2880L) {
                    level = Event.Severity.WARN;
                    addlDebug = "despite being " + ageInMinutes + " mins old which could indicate a serious error - please raise with support.  Diagnostics: ";
                }
                TransactionCleanupAttempt event = new TransactionCleanupAttempt(level, false, isRegularCleanup, perEntryLog.logs(), attemptId, atrId, atrCollection, req, addlDebug);
                this.eventBus.publish((Event)event);
                return Mono.just((Object)((Object)event));
            }).doOnSubscribe(v -> span.start()).doFinally(v -> span.finish());
        });
    }

    Mono<Object> removeATREntry(AttemptStates state, ReactiveCollection atrCollection, String atrId, String attemptId, TransactionLogger perEntryLog, SpanWrapper pspan, CleanupRequest req) {
        ArrayList<Object> specs = new ArrayList<Object>();
        if (state == AttemptStates.PENDING) {
            specs.add(MutateInSpec.insert((String)("attempts." + attemptId + "." + "p"), (Object)0).xattr());
        }
        specs.add(MutateInSpec.remove((String)("attempts." + attemptId)).xattr());
        return this.beforeAtrRemove().then(atrCollection.mutateIn(atrId, specs, OptionsWrapperUtil.wrap((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("Cleaner::removeATREntry")), this.config, req.durabilityLevel(), atrCollection.core()))).doOnNext(v -> perEntryLog.debug(attemptId, "successfully removed ATR entry")).onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            perEntryLog.debug(attemptId, "got exception while removing ATR entry %s: %s", atrId, DebugUtil.dbg(err));
            if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                perEntryLog.logDefer(attemptId, "failed to remove %s as entry isn't there, likely due to concurrent cleanup", Event.Severity.DEBUG, ATRUtil.getAtrDebug(atrCollection, atrId));
                return Mono.empty();
            }
            if (ec == ErrorClasses.FAIL_PATH_ALREADY_EXISTS) {
                perEntryLog.logDefer(attemptId, "not removing %s as it has changed from PENDING to COMMITTED", Event.Severity.DEBUG, ATRUtil.getAtrDebug(atrCollection, atrId));
                return Mono.error((Throwable)err);
            }
            return Mono.error((Throwable)err);
        }).map(v -> v);
    }

    protected Mono<Integer> beforeCommitDoc(String id) {
        return Mono.just((Object)1);
    }

    protected Mono<Integer> beforeDocGet(String id) {
        return Mono.just((Object)1);
    }

    protected Mono<Integer> beforeRemoveDocStagedForRemoval(String id) {
        return Mono.just((Object)1);
    }

    protected Mono<Integer> beforeRemoveDoc(String id) {
        return Mono.just((Object)1);
    }

    protected Mono<Integer> beforeAtrGet(String id) {
        return Mono.just((Object)1);
    }

    protected Mono<Integer> beforeAtrRemove() {
        return Mono.just((Object)1);
    }

    protected Mono<Integer> beforeRemoveLinks(String id) {
        return Mono.just((Object)1);
    }

    void onCleanupDocsCompleted() {
    }

    void onCleanupCompleted() {
    }
}

