/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.transactions;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.core.error.CasMismatchException;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DocumentExistsException;
import com.couchbase.client.core.error.DocumentNotFoundException;
import com.couchbase.client.core.error.ErrorCodeAndMessage;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.TimeoutException;
import com.couchbase.client.core.error.context.ErrorContext;
import com.couchbase.client.core.error.context.QueryErrorContext;
import com.couchbase.client.core.error.context.ReducedKeyValueErrorContext;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.kv.MutationToken;
import com.couchbase.client.core.msg.query.QueryRequest;
import com.couchbase.client.core.node.NodeIdentifier;
import com.couchbase.client.core.retry.reactor.Jitter;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.retry.reactor.RetryExhaustedException;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.ReactiveScope;
import com.couchbase.client.java.SDKAccessUtil;
import com.couchbase.client.java.Scope;
import com.couchbase.client.java.codec.JsonSerializer;
import com.couchbase.client.java.codec.RawJsonTranscoder;
import com.couchbase.client.java.codec.Transcoder;
import com.couchbase.client.java.env.ClusterEnvironment;
import com.couchbase.client.java.json.JsonArray;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.InsertOptions;
import com.couchbase.client.java.kv.LookupInOptions;
import com.couchbase.client.java.kv.LookupInResult;
import com.couchbase.client.java.kv.LookupInSpec;
import com.couchbase.client.java.kv.MutateInMacro;
import com.couchbase.client.java.kv.MutateInOptions;
import com.couchbase.client.java.kv.MutateInResult;
import com.couchbase.client.java.kv.MutateInSpec;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.java.kv.RemoveOptions;
import com.couchbase.client.java.kv.StoreSemantics;
import com.couchbase.client.java.query.QueryMetaData;
import com.couchbase.client.java.query.QueryOptions;
import com.couchbase.client.java.query.QueryResult;
import com.couchbase.client.java.query.QueryScanConsistency;
import com.couchbase.client.java.query.QueryStatus;
import com.couchbase.client.java.query.ReactiveQueryResult;
import com.couchbase.transactions.TransactionContext;
import com.couchbase.transactions.TransactionDurabilityLevel;
import com.couchbase.transactions.TransactionGetResult;
import com.couchbase.transactions.TransactionInsertOptions;
import com.couchbase.transactions.TransactionJsonDocumentStatus;
import com.couchbase.transactions.TransactionQueryOptions;
import com.couchbase.transactions.TransactionReplaceOptions;
import com.couchbase.transactions.TransactionsReactive;
import com.couchbase.transactions.atr.ATRIds;
import com.couchbase.transactions.cleanup.CleanupRequest;
import com.couchbase.transactions.components.ATREntry;
import com.couchbase.transactions.components.ATRUtil;
import com.couchbase.transactions.components.ActiveTransactionRecord;
import com.couchbase.transactions.components.DocRecord;
import com.couchbase.transactions.components.DocumentGetter;
import com.couchbase.transactions.components.DocumentMetadata;
import com.couchbase.transactions.components.DurabilityLevelUtil;
import com.couchbase.transactions.components.SerializationUtil;
import com.couchbase.transactions.config.MergedTransactionConfig;
import com.couchbase.transactions.deferred.TransactionSerializedContext;
import com.couchbase.transactions.error.attempts.ActiveTransactionRecordEntryNotFound;
import com.couchbase.transactions.error.attempts.ActiveTransactionRecordFull;
import com.couchbase.transactions.error.attempts.ActiveTransactionRecordNotFound;
import com.couchbase.transactions.error.external.ForwardCompatibilityFailure;
import com.couchbase.transactions.error.external.PreviousOperationFailed;
import com.couchbase.transactions.error.external.TransactionOperationFailed;
import com.couchbase.transactions.error.internal.AttemptExpired;
import com.couchbase.transactions.error.internal.AttemptNotFoundOnQuery;
import com.couchbase.transactions.error.internal.ErrorClasses;
import com.couchbase.transactions.error.internal.ForwardCompatibilityRequiresRetry;
import com.couchbase.transactions.error.internal.RetryAtrCommit;
import com.couchbase.transactions.error.internal.RetryOperation;
import com.couchbase.transactions.error.internal.TransactionOperationFailedBuilder;
import com.couchbase.transactions.forwards.ForwardCompatibility;
import com.couchbase.transactions.forwards.ForwardCompatibilityStages;
import com.couchbase.transactions.forwards.Supported;
import com.couchbase.transactions.log.IllegalDocumentState;
import com.couchbase.transactions.log.TransactionLogger;
import com.couchbase.transactions.query.QueryAccessor;
import com.couchbase.transactions.support.AttemptStates;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import com.couchbase.transactions.support.SpanWrapperUtil;
import com.couchbase.transactions.support.StagedMutation;
import com.couchbase.transactions.support.StagedMutationType;
import com.couchbase.transactions.util.DebugUtil;
import com.couchbase.transactions.util.LogDeferThrowable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

public class AttemptContextReactive {
    private final MergedTransactionConfig config;
    private final ConcurrentLinkedQueue<StagedMutation> stagedMutations = new ConcurrentLinkedQueue();
    private final String attemptId;
    private final TransactionContext overall;
    private final Duration startTimeClient;
    private Optional<Duration> startTimeServer;
    private Optional<String> atrId = Optional.empty();
    private Optional<ReactiveCollection> atrCollection = Optional.empty();
    final TransactionLogger LOGGER;
    private AttemptStates state = AttemptStates.NOT_STARTED;
    private final TransactionsReactive parent;
    private final SpanWrapper attemptSpan;
    private final ConcurrentLinkedQueue<TransactionOperationFailed> errors = new ConcurrentLinkedQueue();
    private boolean expiryOvertimeMode = false;
    @Nullable
    private volatile NodeIdentifier queryTarget = null;
    private final AtomicInteger queryStatementIdx = new AtomicInteger(0);
    private volatile boolean isDone = false;
    private int EXPIRY_THRESHOLD = Integer.parseInt(System.getProperty("com.couchbase.transactions.expiryThresholdMs", "10"));
    public static String HOOK_ROLLBACK = "rollback";
    public static String HOOK_GET = "get";
    public static String HOOK_INSERT = "insert";
    public static String HOOK_REPLACE = "replace";
    public static String HOOK_REMOVE = "remove";
    public static String HOOK_BEFORE_COMMIT = "commit";
    public static String HOOK_ABORT_GET_ATR = "abortGetAtr";
    public static String HOOK_ROLLBACK_DOC = "rollbackDoc";
    public static String HOOK_DELETE_INSERTED = "deleteInserted";
    public static String HOOK_REMOVE_STAGED_INSERT = "removeStagedInsert";
    public static String HOOK_CREATE_STAGED_INSERT = "createdStagedInsert";
    public static String HOOK_INSERT_QUERY = "insertQuery";
    public static String HOOK_REMOVE_DOC = "removeDoc";
    public static String HOOK_COMMIT_DOC = "commitDoc";
    public static String HOOK_QUERY = "query";
    public static String HOOK_QUERY_BEGIN_WORK = "queryBeginWork";
    public static String HOOK_QUERY_COMMIT = "queryCommit";
    public static String HOOK_QUERY_ROLLBACK = "queryRollback";
    public static String HOOK_QUERY_KV_GET = "queryKvGet";
    public static String HOOK_QUERY_KV_REPLACE = "queryKvReplace";
    public static String HOOK_QUERY_KV_REMOVE = "queryKvRemove";
    public static String HOOK_QUERY_KV_INSERT = "queryKvInsert";
    public static String HOOK_ATR_COMMIT = "atrCommit";
    public static String HOOK_ATR_COMMIT_AMBIGUITY_RESOLUTION = "atrCommitAmbiguityResolution";
    public static String HOOK_ATR_ABORT = "atrAbort";
    public static String HOOK_ATR_ROLLBACK_COMPLETE = "atrRollbackComplete";
    public static String HOOK_ATR_PENDING = "atrPending";
    public static String HOOK_ATR_COMPLETE = "atrComplete";
    public static Duration DEFAULT_DELAY_RETRYING_OPERATION = Duration.ofMillis(3L);
    private static final reactor.util.retry.Retry RETRY_OPERATION_UNTIL_EXPIRY = Retry.anyOf((Class[])new Class[]{RetryOperation.class}).exponentialBackoff(Duration.of(1L, ChronoUnit.MILLIS), Duration.of(100L, ChronoUnit.MILLIS)).jitter(Jitter.random()).toReactorRetry();
    private static final reactor.util.retry.Retry RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY = Retry.anyOf((Class[])new Class[]{RetryOperation.class}).fixedBackoff(DEFAULT_DELAY_RETRYING_OPERATION).toReactorRetry();
    private final List<MutationToken> finalMutationTokens = new ArrayList<MutationToken>();

    public AttemptContextReactive(TransactionContext overall, MergedTransactionConfig config, String attemptId, TransactionsReactive parent, Optional<SpanWrapper> parentSpan) {
        Objects.requireNonNull(overall);
        Objects.requireNonNull(config);
        Objects.requireNonNull(attemptId);
        Objects.requireNonNull(parent);
        this.LOGGER = overall.LOGGER;
        this.overall = overall;
        this.config = config;
        this.attemptId = attemptId;
        this.startTimeClient = Duration.of(System.nanoTime(), ChronoUnit.MILLIS);
        this.parent = parent;
        this.attemptSpan = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "attempt", parentSpan.orElse(null));
    }

    private JsonObject dehydrate(boolean includeMutationBody) {
        JsonObject ob = JsonObject.create();
        ob.put("transactionId", this.transactionId());
        ob.put("attemptId", this.attemptId);
        this.atrId.ifPresent(v -> ob.put("atrId", v));
        this.atrCollection.ifPresent(ac -> {
            ob.put("atrBucket", ac.bucketName());
            ob.put("atrScope", ac.scopeName());
            ob.put("atrCollection", ac.name());
        });
        long transactionElapsedTimeMillis = (System.nanoTime() - this.overall.startTimeClient().toNanos()) / 1000000L;
        ob.put("transactionElapsedTimeMillis", transactionElapsedTimeMillis);
        this.startTimeServer.ifPresent(v -> ob.put("startTimeServerMillis", v.toMillis()));
        JsonArray mutations = JsonArray.create();
        this.stagedMutations.forEach(sm -> {
            JsonObject m = JsonObject.create();
            m.put("type", sm.type.toString());
            switch (sm.type) {
                case REMOVE: {
                    break;
                }
                default: {
                    if (!includeMutationBody) break;
                    m.put("content", Base64.getEncoder().encodeToString(sm.content));
                }
            }
            sm.doc.serialize(m);
            mutations.add(m);
        });
        ob.put("mutations", mutations);
        return ob;
    }

    private JsonObject makeQueryTxData() {
        ClusterEnvironment env = this.parent.cleanup().clusterData().cluster().environment();
        JsonObject out = JsonObject.create().put("id", JsonObject.create().put("txn", this.transactionId()).put("atmpt", this.attemptId)).put("state", JsonObject.create().put("timeLeftMs", this.expiryRemainingMillis())).put("config", JsonObject.create().put("kvTimeoutMs", this.config.keyValueTimeout().orElse(env.timeoutConfig().kvDurableTimeout()).toMillis()).put("durabilityLevel", this.config.transactionDurabilityLevel().name()).put("numAtrs", this.config.numAtrs()));
        JsonArray mutations = JsonArray.create();
        this.stagedMutations.forEach(sm -> mutations.add(JsonObject.create().put("scp", sm.doc.collection().scopeName()).put("coll", sm.doc.collection().name()).put("bkt", sm.doc.collection().bucketName()).put("id", sm.doc.id()).put("cas", Long.toString(sm.doc.cas())).put("type", sm.type.name())));
        out.put("mutations", mutations);
        if (this.atrCollection.isPresent() && this.atrId.isPresent()) {
            out.put("atr", JsonObject.create().put("id", this.atrId.get()).put("bkt", this.atrCollection.get().bucketName()).put("scp", this.atrCollection.get().scopeName()).put("coll", this.atrCollection.get().name()));
        } else if (this.config.metadataCollection().isPresent()) {
            Collection mc = this.config.metadataCollection().get();
            out.put("atr", JsonObject.create().put("bkt", mc.bucketName()).put("scp", mc.scopeName()).put("coll", mc.name()));
        }
        return out;
    }

    public AttemptContextReactive(JsonObject pill, TransactionContext overall, MergedTransactionConfig config, TransactionsReactive parent, Optional<SpanWrapper> parentSpan) {
        this.LOGGER = overall.LOGGER;
        this.overall = overall;
        this.config = config;
        this.attemptId = pill.getString("attemptId");
        this.startTimeClient = Duration.of(System.nanoTime(), ChronoUnit.MILLIS);
        this.atrId = Optional.of(pill.getString("atrId"));
        String atrBucket = pill.getString("atrBucket");
        String atrScope = pill.getString("atrScope");
        String atrCollection = pill.getString("atrCollection");
        this.atrCollection = Optional.of(parent.cleanup().clusterData().getBucketFromName(atrBucket).scope(atrScope).collection(atrCollection));
        this.parent = parent;
        this.state = AttemptStates.PENDING;
        JsonArray mutations = pill.getArray("mutations");
        mutations.iterator().forEachRemaining(v -> {
            JsonObject jo = (JsonObject)v;
            TransactionGetResult doc = TransactionGetResult.createFrom(jo, parent.cleanup().clusterData());
            StagedMutationType type = StagedMutationType.REMOVE;
            switch (jo.getString("type")) {
                case "INSERT": {
                    type = StagedMutationType.INSERT;
                    break;
                }
                case "REPLACE": {
                    type = StagedMutationType.REPLACE;
                }
            }
            switch (type) {
                case REMOVE: {
                    StagedMutation sm = new StagedMutation(doc, null, type, null);
                    this.stagedMutations.add(sm);
                    break;
                }
                default: {
                    byte[] bytes = Base64.getDecoder().decode(jo.getString("content"));
                    StagedMutation sm = new StagedMutation(doc, bytes, type, null);
                    this.stagedMutations.add(sm);
                }
            }
        });
        this.attemptSpan = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "attempt", parentSpan.orElse(null));
    }

    SpanWrapper span() {
        return this.attemptSpan;
    }

    Duration startTimeClient() {
        return this.startTimeClient;
    }

    public String attemptId() {
        return this.attemptId;
    }

    public String transactionId() {
        return this.overall.transactionId();
    }

    TransactionContext overall() {
        return this.overall;
    }

    private List<StagedMutation> stagedReplaces() {
        assert (!this.queryMode());
        return this.stagedMutations.stream().filter(v -> v.type == StagedMutationType.REPLACE).collect(Collectors.toList());
    }

    private List<StagedMutation> stagedRemoves() {
        assert (!this.queryMode());
        return this.stagedMutations.stream().filter(v -> v.type == StagedMutationType.REMOVE).collect(Collectors.toList());
    }

    private List<StagedMutation> stagedInserts() {
        assert (!this.queryMode());
        return this.stagedMutations.stream().filter(v -> v.type == StagedMutationType.INSERT).collect(Collectors.toList());
    }

    private Optional<StagedMutation> checkForOwnWrite(ReactiveCollection collection, String id) {
        assert (!this.queryMode());
        Optional<StagedMutation> ownReplace = this.stagedReplaces().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(id)).findFirst();
        if (ownReplace.isPresent()) {
            return ownReplace;
        }
        Optional<StagedMutation> ownInserted = this.stagedInserts().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(id)).findFirst();
        if (ownInserted.isPresent()) {
            return ownInserted;
        }
        return Optional.empty();
    }

    private Mono<Void> errorIfExpiredAndNotInExpiryOvertimeMode(String stage, Optional<String> docId) {
        if (this.expiryOvertimeMode) {
            this.LOGGER.info(this.attemptId, "not doing expiry check in %s as already in expiry-overtime-mode", stage);
            return Mono.empty();
        }
        if (this.hasExpiredClientSide(stage, docId)) {
            this.LOGGER.info(this.attemptId, "has expired in stage %s", stage);
            return Mono.error((Throwable)new AttemptExpired(this, "Attempt has expired in stage " + stage));
        }
        return Mono.empty();
    }

    private void checkExpiryPreCommitAndSetExpiryOvertimeMode(String stage, Optional<String> docId) {
        if (this.hasExpiredClientSide(stage, docId)) {
            this.LOGGER.info(this.attemptId, "has expired in stage %s, setting expiry-overtime-mode", stage);
            this.expiryOvertimeMode = true;
            throw TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build();
        }
    }

    private TransactionOperationFailed transactionIsDone() {
        return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new IllegalStateException("Cannot perform operations after transaction has been committed or rolled back")).doNotRollbackAttempt().build();
    }

    private TransactionOperationFailed existingError() {
        return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new PreviousOperationFailed()).build();
    }

    public Mono<Optional<TransactionGetResult>> getOptional(ReactiveCollection collection, String id) {
        return this.getInternal(collection, id);
    }

    private Mono<Optional<TransactionGetResult>> getInternal(ReactiveCollection collection, String id) {
        return Mono.defer(() -> {
            if (this.queryMode()) {
                return this.getWithQuery(collection, id);
            }
            return this.getWithKV(collection, id, Optional.empty());
        });
    }

    private Mono<Optional<TransactionGetResult>> getWithKV(ReactiveCollection collection, String id, Optional<String> resolvingMissingATREntry) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "user.get", this.attemptSpan);
            this.LOGGER.info(this.attemptId, "getting doc %s, resolvingMissingATREntry=%s", DebugUtil.docId(collection, id), resolvingMissingATREntry.orElse("<empty>"));
            if (this.isDone()) {
                return Mono.error((Throwable)this.transactionIsDone());
            }
            if (this.isExistingError()) {
                return Mono.error((Throwable)this.existingError());
            }
            this.checkExpiryPreCommitAndSetExpiryOvertimeMode(HOOK_GET, Optional.of(id));
            Optional<StagedMutation> ownWrite = this.checkForOwnWrite(collection, id);
            if (ownWrite.isPresent()) {
                this.LOGGER.info(this.attemptId, "found own-write of mutated doc %s", DebugUtil.docId(collection, id));
                return Mono.just(Optional.of(TransactionGetResult.createFrom(ownWrite.get().doc, ownWrite.get().content, TransactionJsonDocumentStatus.OWN_WRITE)));
            }
            Optional<TransactionGetResult> ownRemove = this.stagedRemoves().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(id)).findFirst().map(v -> v.doc);
            if (ownRemove.isPresent()) {
                this.LOGGER.info(this.attemptId, "found own-write of removed doc %s", DebugUtil.docId(collection, id));
                return Mono.just(Optional.empty());
            }
            return this.beforeDocGet(this, id).then(DocumentGetter.getAsync(this.cluster(), this.LOGGER, collection, this.config, id, this.attemptId, false, span, this.getTranscoder(), resolvingMissingATREntry)).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                this.LOGGER.warn(this.attemptId, "got error while getting doc %s in %dus: %s", DebugUtil.docId(collection, id), span.elapsed(), AttemptContextReactive.dbg(err));
                if (err instanceof ForwardCompatibilityRequiresRetry || err instanceof ForwardCompatibilityFailure) {
                    TransactionOperationFailedBuilder error = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new ForwardCompatibilityFailure());
                    if (err instanceof ForwardCompatibilityRequiresRetry) {
                        error.retryTransaction();
                    }
                    return Mono.error((Throwable)error.build());
                }
                if (ec == ErrorClasses.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                if (err instanceof ActiveTransactionRecordNotFound) {
                    return this.getWithKV(collection, id, Optional.of(((ActiveTransactionRecordNotFound)err).attemptId()));
                }
                if (err instanceof ActiveTransactionRecordEntryNotFound) {
                    this.LOGGER.info(this.attemptId, "trying to get doc again to see if it's in same state");
                    return this.getWithKV(collection, id, Optional.of(((ActiveTransactionRecordEntryNotFound)err).attemptId()));
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)builder.retryTransaction().build());
                }
                return Mono.error((Throwable)builder.build());
            }).flatMap(v -> {
                long elapsed = span.finish();
                if (v.isPresent()) {
                    this.LOGGER.info(this.attemptId, "completed get of %s in %dus", v.get(), elapsed);
                } else {
                    this.LOGGER.info(this.attemptId, "completed get of %s, could not find, in %dus", DebugUtil.docId(collection, id), elapsed);
                }
                return this.afterGetComplete(this, id).thenReturn(v);
            }).flatMap(doc -> {
                if (doc.isPresent()) {
                    return this.forwardCompatibilityCheck(ForwardCompatibilityStages.GETS, doc.flatMap(v -> v.links().forwardCompatibility())).thenReturn(doc);
                }
                return Mono.just((Object)doc);
            }).doOnError(err -> {
                if (err instanceof TransactionOperationFailed) {
                    this.errors.add((TransactionOperationFailed)err);
                }
                AttemptContextReactive.failSpan(span, err);
            }).doOnNext(v -> span.finish());
        });
    }

    private JsonObject makeTxdata() {
        return JsonObject.create().put("kv", true);
    }

    private Mono<Optional<TransactionGetResult>> getWithQuery(ReactiveCollection collection, String id) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "user.query_get", this.attemptSpan);
            int sidx = this.queryStatementIdx.getAndIncrement();
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(collection)).add(id);
            return this.queryWrapper(sidx, null, "EXECUTE __get", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_GET, false, true, this.makeTxdata(), params, span, false).map(result -> {
                Optional<Object> ret;
                List rows = result.rowsAsObject();
                if (rows.size() == 0) {
                    ret = Optional.empty();
                } else {
                    JsonObject row = (JsonObject)rows.get(0);
                    String scas = row.getString("scas");
                    long cas = Long.parseLong(scas);
                    JsonObject doc = row.getObject("doc");
                    JsonObject txnMeta = row.getObject("txnMeta");
                    ret = Optional.of(new TransactionGetResult(id, doc.toBytes(), cas, collection, null, null, null, this.getTranscoder(), Optional.ofNullable(txnMeta)));
                }
                return ret;
            }).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof DocumentNotFoundException) {
                    return Mono.just(Optional.empty());
                }
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                return Mono.error((Throwable)builder.build());
            }).doOnError(err -> {
                if (err instanceof TransactionOperationFailed) {
                    this.errors.add((TransactionOperationFailed)err);
                }
            }).doFinally(ignore -> span.finish());
        });
    }

    public Mono<TransactionGetResult> get(ReactiveCollection collection, String id) {
        return this.getOptional(collection, id).flatMap(doc -> {
            if (doc.isPresent()) {
                return Mono.just((Object)((TransactionGetResult)doc.get()));
            }
            TransactionOperationFailed err = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_NOT_FOUND).cause((Throwable)new DocumentNotFoundException((ErrorContext)ReducedKeyValueErrorContext.create((String)id))).build();
            this.errors.add(err);
            return Mono.error((Throwable)err);
        });
    }

    boolean hasExpiredClientSide(String place, Optional<String> docId) {
        boolean over = this.overall.hasExpiredClientSide();
        boolean hook = this.hasExpiredClientSideHook(this, place, docId);
        if (over) {
            this.LOGGER.info(this.attemptId, "expired in %s", place);
        }
        if (hook) {
            this.LOGGER.info(this.attemptId, "fake expiry in %s", place);
        }
        return over || hook;
    }

    List<MutationToken> finalMutationTokens() {
        return this.finalMutationTokens;
    }

    public Optional<String> atrId() {
        return this.atrId;
    }

    public Optional<ReactiveCollection> atrCollection() {
        return this.atrCollection;
    }

    List<String> stagedReplaceIds() {
        return this.stagedReplaces().stream().map(v -> v.doc.id()).collect(Collectors.toList());
    }

    List<String> stagedInsertIds() {
        return this.stagedInserts().stream().map(v -> v.doc.id()).collect(Collectors.toList());
    }

    List<String> stagedRemoveIds() {
        return this.stagedRemoves().stream().map(v -> v.doc.id()).collect(Collectors.toList());
    }

    public Mono<TransactionGetResult> insert(ReactiveCollection collection, String id, Object content) {
        return this.insert(collection, id, content, TransactionInsertOptions.DEFAULT);
    }

    private ReactiveCollection getAtrCollection(ReactiveCollection docCollection) {
        if (this.config.metadataCollection().isPresent()) {
            return this.config.metadataCollection().get().reactive();
        }
        return this.parent.cleanup().clusterData().getBucketFromName(docCollection.bucketName()).defaultCollection();
    }

    private static String makeKeyspace(ReactiveCollection collection) {
        return String.format("default:`%s`.`%s`.`%s`", collection.bucketName(), collection.scopeName(), collection.name());
    }

    public Mono<TransactionGetResult> insert(ReactiveCollection collection, String id, Object content, TransactionInsertOptions options) {
        return Mono.defer(() -> {
            TransactionInsertOptions.BuiltOptions built = options.build();
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "user.insert", this.attemptSpan);
            if (this.queryMode()) {
                return this.insertWithQuery(collection, id, content);
            }
            return Mono.defer(() -> {
                if (this.isDone()) {
                    return Mono.error((Throwable)this.transactionIsDone());
                }
                if (this.isExistingError()) {
                    return Mono.error((Throwable)this.existingError());
                }
                Optional<StagedMutation> existing = this.findStagedMutation(collection, id);
                if (existing.isPresent()) {
                    StagedMutation op = existing.get();
                    if (op.type == StagedMutationType.INSERT || op.type == StagedMutationType.REPLACE) {
                        return Mono.error((Throwable)new DocumentExistsException(null));
                    }
                }
                this.checkExpiryPreCommitAndSetExpiryOvertimeMode(HOOK_INSERT, Optional.of(id));
                this.initAtrIfNeeded(collection, id);
                return Mono.defer(() -> {
                    if (this.state == AttemptStates.NOT_STARTED) {
                        return this.atrPending(this.atrCollection.get(), span);
                    }
                    return Mono.empty();
                }).then(Mono.defer(() -> {
                    if (existing.isPresent() && ((StagedMutation)existing.get()).type == StagedMutationType.REMOVE) {
                        return this.createStagedReplace(((StagedMutation)existing.get()).doc, content, span, false);
                    }
                    return this.createStagedInsert(collection, id, content, span, built, Optional.empty());
                })).doFinally(v -> span.finish()).doOnError(err -> {
                    AttemptContextReactive.failSpan(span, err);
                    if (err instanceof TransactionOperationFailed) {
                        this.errors.add((TransactionOperationFailed)err);
                    }
                });
            });
        });
    }

    private Mono<TransactionGetResult> insertWithQuery(ReactiveCollection collection, String id, Object content) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "user.query_insert", this.attemptSpan);
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            JsonObject reencoded = JsonObject.fromJson((byte[])encoded.encoded());
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(collection)).add(id).add(reencoded).add(JsonObject.create());
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, null, "EXECUTE __insert", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_INSERT, false, true, this.makeTxdata(), params, span, false).map(result -> {
                JsonObject row = (JsonObject)result.rowsAsObject().get(0);
                String scas = row.getString("scas");
                long cas = Long.parseLong(scas);
                return TransactionGetResult.createFromInsert(collection, id, content.toString().getBytes(StandardCharsets.UTF_8), this.transactionId(), this.attemptId, null, null, null, null, cas, this.getTranscoder());
            }).onErrorResume(err -> {
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                if (err instanceof DocumentExistsException) {
                    return Mono.error((Throwable)err);
                }
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailed out = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).build();
                this.errors.add(out);
                return Mono.error((Throwable)out);
            }).doFinally(ignore -> span.finish());
        });
    }

    protected String randomAtrIdForVbucket(AttemptContextReactive self, Integer vbucketIdForDoc, int numAtrs) {
        return ATRIds.randomAtrIdForVbucket(vbucketIdForDoc, numAtrs);
    }

    private void initAtrIfNeeded(ReactiveCollection collection, String id) {
        if (!this.atrId.isPresent()) {
            long vbucketIdForDoc = ATRIds.vbucketForKey(id, 1024);
            String atr = this.randomAtrIdForVbucket(this, (int)vbucketIdForDoc, this.config.numAtrs());
            this.atrId = Optional.of(atr);
            this.atrCollection = this.config.metadataCollection().isPresent() ? Optional.of(this.config.metadataCollection().get().reactive()) : Optional.of(this.getAtrCollection(collection));
            this.LOGGER.info(this.attemptId, "First mutated doc in txn is '%s' on vbucket %d, so using atr %s", DebugUtil.docId(collection, id), vbucketIdForDoc, atr);
        }
    }

    public Mono<TransactionGetResult> replace(TransactionGetResult doc, Object content) {
        return this.replace(doc, content, TransactionReplaceOptions.DEFAULT);
    }

    public Mono<TransactionGetResult> replace(TransactionGetResult doc, Object content, TransactionReplaceOptions options) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "user.replace", this.attemptSpan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "replace doc %s", doc);
                if (this.isDone()) {
                    return Mono.error((Throwable)this.transactionIsDone());
                }
                if (this.isExistingError()) {
                    return Mono.error((Throwable)this.existingError());
                }
                this.checkExpiryPreCommitAndSetExpiryOvertimeMode(HOOK_REPLACE, Optional.of(doc.id()));
                if (this.queryMode()) {
                    return this.replaceWithQuery(doc, content);
                }
                Optional<StagedMutation> existing = this.findStagedMutation(doc);
                if (existing.isPresent()) {
                    StagedMutation op = existing.get();
                    this.LOGGER.info(this.attemptId, "found previous write of %s as %s on replace", new Object[]{DebugUtil.docId(doc), op.type});
                    if (op.type == StagedMutationType.REMOVE) {
                        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_NOT_FOUND).cause((Throwable)new DocumentNotFoundException(null)).build());
                    }
                }
                return this.checkAndHandleBlockingTxn(doc, span, ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_REPLACING).then(Mono.defer(() -> {
                    this.initAtrIfNeeded(doc.collection(), doc.id());
                    if (this.state == AttemptStates.NOT_STARTED) {
                        return this.atrPending(this.atrCollection.get(), span);
                    }
                    return Mono.empty();
                })).then(Mono.defer(() -> {
                    if (existing.isPresent() && ((StagedMutation)existing.get()).type == StagedMutationType.INSERT) {
                        return this.createStagedInsert(doc.collection(), doc.id(), content, span, TransactionInsertOptions.insertOptions().build(), Optional.of(doc.cas()));
                    }
                    return this.createStagedReplace(doc, content, span, doc.links().isDeleted());
                })).doOnError(err -> AttemptContextReactive.failSpan(span, err));
            }).doFinally(s -> span.finish());
        }).doOnError(err -> {
            if (err instanceof TransactionOperationFailed) {
                this.errors.add((TransactionOperationFailed)err);
            }
        });
    }

    private Mono<TransactionGetResult> replaceWithQuery(TransactionGetResult doc, Object content) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "user.query_replace", this.attemptSpan);
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            JsonObject reencoded = JsonObject.fromJson((byte[])encoded.encoded());
            JsonObject txData = this.makeTxdata().put("scas", Long.toString(doc.cas()));
            doc.txnMeta().ifPresent(v -> txData.put("txnMeta", v));
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(doc.collection())).add(doc.id()).add(reencoded).add(JsonObject.create());
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, null, "EXECUTE __update", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_REPLACE, false, true, txData, params, span, false).map(result -> {
                List rows = result.rowsAsObject();
                JsonObject row = (JsonObject)rows.get(0);
                String scas = row.getString("scas");
                long cas = Long.parseLong(scas);
                JsonObject updatedDoc = row.getObject("doc");
                return new TransactionGetResult(doc.id(), updatedDoc.toBytes(), cas, doc.collection(), null, null, null, this.getTranscoder(), Optional.empty());
            }).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    TransactionOperationFailed out = builder.retryTransaction().build();
                    this.errors.add(out);
                    return Mono.error((Throwable)out);
                }
                TransactionOperationFailed out = builder.build();
                this.errors.add(out);
                return Mono.error((Throwable)out);
            }).doFinally(ignore -> span.finish());
        });
    }

    private Mono<Void> removeWithQuery(TransactionGetResult doc) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "user.query_remove", this.attemptSpan);
            JsonObject txData = this.makeTxdata().put("scas", Long.toString(doc.cas()));
            doc.txnMeta().ifPresent(v -> txData.put("txnMeta", v));
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(doc.collection())).add(doc.id()).add(JsonObject.create());
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, null, "EXECUTE __delete", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_REMOVE, false, true, txData, params, span, false).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    TransactionOperationFailed out = builder.retryTransaction().build();
                    this.errors.add(out);
                    return Mono.error((Throwable)out);
                }
                TransactionOperationFailed out = builder.build();
                this.errors.add(out);
                return Mono.error((Throwable)out);
            }).doFinally(ignore -> span.finish());
        });
    }

    private Mono<Void> forwardCompatibilityCheck(ForwardCompatibilityStages stage, Optional<ForwardCompatibility> fc) {
        return ForwardCompatibility.check(stage, fc, this.logger(), Supported.SUPPORTED).onErrorResume(err -> {
            TransactionOperationFailedBuilder error = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new ForwardCompatibilityFailure());
            if (err instanceof ForwardCompatibilityRequiresRetry) {
                error.retryTransaction();
            }
            return Mono.error((Throwable)error.build());
        });
    }

    private Mono<Void> checkATREntryForBlockingDocInternal(TransactionGetResult doc, ReactiveCollection collection, SpanWrapper span) {
        return Mono.fromRunnable(() -> this.checkExpiryPreCommitAndSetExpiryOvertimeMode("staging.check_atr_entry_blocking_doc", Optional.empty())).then(this.beforeCheckATREntryForBlockingDoc(this, doc.links().atrId().get())).then(ActiveTransactionRecord.findEntryForTransaction(collection, doc.links().atrId().get(), doc.links().stagedAttemptId().get(), this.config, span, this.logger()).flatMap(atrEntry -> {
            if (atrEntry.isPresent()) {
                ATREntry ae = (ATREntry)atrEntry.get();
                this.LOGGER.info(this.attemptId, "fetched ATR entry for blocking txn: hasExpired=%s entry=%s", ae.hasExpired(), ae);
                return this.forwardCompatibilityCheck(ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_READING_ATR, ae.forwardCompatibility()).then(Mono.defer(() -> {
                    switch (ae.state()) {
                        case COMPLETED: 
                        case ROLLED_BACK: {
                            this.LOGGER.info(this.attemptId, "ATR entry state of %s indicates we can proceed to overwrite", new Object[]{((ATREntry)atrEntry.get()).state()});
                            return Mono.empty();
                        }
                    }
                    return Mono.error((Throwable)new RetryOperation());
                }));
            }
            this.LOGGER.info(this.attemptId, "blocking txn %s's entry has been removed indicating the txn expired, so proceeding to overwrite", doc.links().stagedAttemptId().get());
            return Mono.empty();
        })).retryWhen(Retry.anyOf((Class[])new Class[]{RetryOperation.class}).exponentialBackoff(Duration.ofMillis(50L), Duration.ofMillis(500L)).timeout(Duration.ofSeconds(1L)).toReactorRetry()).onErrorResume(err -> {
            if (err instanceof RetryExhaustedException) {
                this.LOGGER.info(this.attemptId, "still blocked by a valid transaction, retrying to unlock documents");
            } else {
                if (err instanceof DocumentNotFoundException) {
                    this.LOGGER.info(this.attemptId, "blocking txn's ATR has been removed so proceeding to overwrite");
                    return Mono.empty();
                }
                this.LOGGER.warn(this.attemptId, "got error in checkATREntryForBlockingDoc: %s", AttemptContextReactive.dbg(err));
            }
            return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_WRITE_WRITE_CONFLICT).cause((Throwable)err).retryTransaction().build());
        }).then();
    }

    private Mono<Void> checkATREntryForBlockingDoc(TransactionGetResult doc, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "staging.check_atr_blocking", pspan);
            ReactiveCollection collection = this.parent.cleanup().clusterData().cluster().bucket(doc.links().atrBucketName().get()).scope(doc.links().atrScopeName().get()).collection(doc.links().atrCollectionName().get()).reactive();
            return this.checkATREntryForBlockingDocInternal(doc, collection, span).doFinally(v -> span.finish()).doOnError(err -> AttemptContextReactive.failSpan(span, err));
        });
    }

    private RedactableArgument getAtrDebug(ReactiveCollection collection, Optional<String> atrId) {
        return ATRUtil.getAtrDebug(collection, atrId);
    }

    private RedactableArgument getAtrDebug(Optional<ReactiveCollection> collection, Optional<String> atrId) {
        return ATRUtil.getAtrDebug(collection, atrId);
    }

    private RemoveOptions wrap(RemoveOptions opts, SpanWrapper pspan, Core core) {
        return OptionsWrapperUtil.wrap(opts, pspan, this.config, core);
    }

    private MutateInOptions wrap(MutateInOptions opts, SpanWrapper pspan, Core core) {
        return OptionsWrapperUtil.wrap(opts, pspan, this.config, core);
    }

    private MutateInOptions wrap(SpanWrapper pspan, Core core) {
        return OptionsWrapperUtil.wrap(pspan, this.config, core);
    }

    private long expiryRemainingMillis() {
        long nowNanos = System.nanoTime();
        long expiredMillis = this.overall.timeSinceStartOfTransactionsMillis(nowNanos);
        long remainingMillis = this.config.expirationTime().toMillis() - expiredMillis;
        return Math.max(Math.min(remainingMillis, this.config.expirationTime().toMillis()), 0L);
    }

    private RequestTracer tracer() {
        return this.cluster().environment().requestTracer();
    }

    private Mono<MutateInResult> atrPending(ReactiveCollection collection, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, this.atrId.orElse(null), "atr.pending", pspan);
            String prefix = "attempts." + this.attemptId;
            if (!this.atrId.isPresent()) {
                return Mono.error((Throwable)new IllegalStateException("atrId not present"));
            }
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "about to set ATR %s to Pending", this.getAtrDebug(collection, this.atrId));
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_PENDING, Optional.empty());
            }).then(this.beforeAtrPending(this)).then(collection.mutateIn(this.atrId.get(), Arrays.asList(MutateInSpec.insert((String)(prefix + "." + "tid"), (Object)this.transactionId()).xattr().createPath(), MutateInSpec.insert((String)(prefix + "." + "st"), (Object)AttemptStates.PENDING.name()).xattr(), MutateInSpec.insert((String)(prefix + "." + "tst"), (Object)MutateInMacro.CAS), MutateInSpec.insert((String)(prefix + "." + "exp"), (Object)this.expiryRemainingMillis()).xattr(), MutateInSpec.insert((String)(prefix + "." + "d"), (Object)DurabilityLevelUtil.convertDurabilityLevel(this.config.durabilityLevel())).xattr(), MutateInSpec.replace((String)"", (Object)new byte[]{0})), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("atrPending"))).storeSemantics(StoreSemantics.UPSERT), span, collection.core()))).flatMap(v -> this.afterAtrPending(this).map(x -> v)).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while setting ATR %s to Pending in %dus: %s", this.getAtrDebug(collection, this.atrId), span.elapsed(), AttemptContextReactive.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(HOOK_ATR_PENDING, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeModeAndFail((Throwable)err, HOOK_ATR_PENDING, ec);
                }
                if (ec == ErrorClasses.FAIL_ATR_FULL) {
                    return Mono.error((Throwable)out.cause(new ActiveTransactionRecordFull(this, (Throwable)err)).build());
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    this.LOGGER.info(this.attemptId, "retrying the op on %s to resolve ambiguity", new Object[]{ec});
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.atrPending(collection, span));
                }
                if (ec == ErrorClasses.FAIL_PATH_ALREADY_EXISTS) {
                    this.LOGGER.info(this.attemptId, "assuming this is caused by resolved ambiguity, and proceeding as though successful", new Object[]{ec});
                    return Mono.empty();
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    this.LOGGER.info(this.attemptId, "transient error likely to be solved by retry", new Object[]{ec});
                    return Mono.error((Throwable)out.retryTransaction().build());
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)out.doNotRollbackAttempt().build());
                }
                return Mono.error((Throwable)out.build());
            }).doOnNext(v -> {
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "set ATR %s to Pending in %dus, got CAS (start time) %s", this.getAtrDebug(collection, this.atrId), elapsed, this.dbg((MutationResult)v));
                this.startTimeServer = Optional.of(Duration.ofNanos(v.cas()));
                this.state = AttemptStates.PENDING;
            }).doOnError(err -> {
                AttemptContextReactive.failSpan(span, err);
                span.finish();
            });
        });
    }

    private Transcoder getTranscoder() {
        return this.parent.cleanup().clusterData().cluster().environment().transcoder();
    }

    private static void failSpan(SpanWrapper span, Throwable err) {
        span.failWith(err);
        if (err instanceof TransactionOperationFailed) {
            TransactionOperationFailed e = (TransactionOperationFailed)err;
            span.attribute("db.couchbase.transactions.retry", e.retryTransaction());
            span.attribute("db.couchbase.transactions.failure_reason", e.getCause() == null ? e.causingErrorClass().toString() : e.getCause().getMessage());
        }
    }

    private Mono<TransactionGetResult> createStagedReplace(TransactionGetResult doc, Object content, SpanWrapper pspan, boolean accessDeleted) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "staging.replace", pspan);
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            JsonObject txn = this.createDocumentMetadata("replace", doc);
            return this.beforeStagedReplace(this, doc.id()).then(doc.collection().mutateIn(doc.id(), Arrays.asList(MutateInSpec.upsert((String)"txn", (Object)txn).xattr().createPath(), MutateInSpec.upsert((String)"txn.op.stgd", (Object)encoded.encoded()).xattr(), MutateInSpec.upsert((String)"txn.op.crc32", (Object)MutateInMacro.VALUE_CRC_32C).xattr()), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().accessDeleted(accessDeleted).clientContext(OptionsWrapperUtil.createClientContext("createStagedReplace"))).cas(doc.cas()), span, doc.collection().core()))).doOnSubscribe(v -> this.LOGGER.info(this.attemptId, "about to replace doc %s with cas %d, accessDeleted=%s", DebugUtil.docId(doc), doc.cas(), accessDeleted)).flatMap(result -> this.afterStagedReplaceComplete(this, doc.id()).map(x -> result)).doOnNext(updatedDoc -> {
                long elapsed = span.finish();
                doc.cas(updatedDoc.cas());
                this.LOGGER.info(this.attemptId, "replaced doc %s got %s, in %dus", DebugUtil.docId(doc), this.dbg((MutationResult)updatedDoc), elapsed);
                this.addStagedMutation(new StagedMutation(doc, encoded.encoded(), StagedMutationType.REPLACE, (MutateInResult)updatedDoc));
            }).thenReturn((Object)doc).onErrorResume(err -> this.handleErrorOnStagedMutation("replacing", doc, (Throwable)err, span.elapsed()).thenReturn((Object)doc)).doFinally(v -> span.finish()).doOnError(err -> AttemptContextReactive.failSpan(span, err));
        });
    }

    private JsonObject createDocumentMetadata(String opType, @Nullable TransactionGetResult doc) {
        JsonObject op = JsonObject.create().put("type", opType);
        JsonObject ret = JsonObject.create().put("id", JsonObject.create().put("txn", this.transactionId()).put("atmpt", this.attemptId)).put("atr", JsonObject.create().put("id", this.atrId.get()).put("bkt", this.atrCollection.get().bucketName()).put("scp", this.atrCollection.get().scopeName()).put("coll", this.atrCollection.get().name())).put("op", op);
        JsonObject restore = JsonObject.create();
        if (doc != null) {
            doc.documentMetadata().flatMap(DocumentMetadata::cas).ifPresent(v -> restore.put("CAS", v));
            doc.documentMetadata().flatMap(DocumentMetadata::exptime).ifPresent(v -> restore.put("exptime", (Number)v));
            doc.documentMetadata().flatMap(DocumentMetadata::revid).ifPresent(v -> restore.put("revid", v));
        }
        if (restore.size() > 0) {
            ret.put("restore", restore);
        }
        return ret;
    }

    private Mono<Void> createStagedRemove(TransactionGetResult doc, SpanWrapper pspan, boolean accessDeleted) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "staging.remove", pspan);
            JsonObject txn = this.createDocumentMetadata("remove", doc);
            return this.beforeStagedRemove(this, doc.id()).doOnSubscribe(x -> this.LOGGER.info(this.attemptId, "about to remove doc %s with cas %d", DebugUtil.docId(doc), doc.cas())).then(doc.collection().mutateIn(doc.id(), Arrays.asList(MutateInSpec.upsert((String)"txn", (Object)txn).xattr().createPath(), MutateInSpec.upsert((String)"txn.op.crc32", (Object)MutateInMacro.VALUE_CRC_32C).xattr()), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().accessDeleted(accessDeleted).clientContext(OptionsWrapperUtil.createClientContext("createdStagedRemove"))).cas(doc.cas()), span, doc.collection().core()))).flatMap(updatedDoc -> this.afterStagedRemoveComplete(this, doc.id()).thenReturn(updatedDoc)).doOnNext(updatedDoc -> {
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "staged remove of doc %s got %s, in %dus", DebugUtil.docId(doc), this.dbg((MutationResult)updatedDoc), elapsed);
                doc.cas(updatedDoc.cas());
                this.addStagedMutation(new StagedMutation(doc, null, StagedMutationType.REMOVE, (MutateInResult)updatedDoc));
            }).then().onErrorResume(err -> this.handleErrorOnStagedMutation("removing", doc, (Throwable)err, span.elapsed())).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private void addStagedMutation(StagedMutation sm) {
        this.removeStagedMutation(sm.doc);
        this.stagedMutations.add(sm);
    }

    private Mono<Void> handleErrorOnStagedMutation(String stage, TransactionGetResult doc, Throwable err, long elapsed) {
        ErrorClasses ec = ErrorClasses.classify(err);
        TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause(err);
        this.LOGGER.info(this.attemptId, "error while %s doc %s in %dus: %s", stage, DebugUtil.docId(doc), elapsed, AttemptContextReactive.dbg(err));
        if (this.expiryOvertimeMode) {
            this.LOGGER.warn(this.attemptId, "should not reach here in expiryOvertimeMode");
        }
        if (ec == ErrorClasses.FAIL_EXPIRY) {
            return this.setExpiryOvertimeModeAndFail(err, stage, ec);
        }
        if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
            return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).retryTransaction().build());
        }
        if (ec == ErrorClasses.FAIL_AMBIGUOUS || ec == ErrorClasses.FAIL_TRANSIENT) {
            return Mono.error((Throwable)out.retryTransaction().build());
        }
        if (ec == ErrorClasses.FAIL_HARD) {
            return Mono.error((Throwable)out.doNotRollbackAttempt().build());
        }
        return Mono.error((Throwable)out.build());
    }

    private Optional<StagedMutation> findStagedMutation(TransactionGetResult doc) {
        return this.findStagedMutation(doc.collection(), doc.id());
    }

    private Optional<StagedMutation> findStagedMutation(ReactiveCollection collection, String docId) {
        return this.stagedMutations.stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(docId)).findFirst();
    }

    private void removeStagedMutation(TransactionGetResult doc) {
        ReactiveCollection collection = doc.collection();
        this.stagedMutations.removeIf(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(doc.id()));
    }

    private Optional<StagedMutation> findStagedInsert(TransactionGetResult doc) {
        ReactiveCollection collection = doc.collection();
        Optional<StagedMutation> overwritingInsert = this.stagedInserts().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(doc.id())).findFirst();
        return overwritingInsert;
    }

    private Optional<StagedMutation> findStagedReplace(TransactionGetResult doc) {
        ReactiveCollection collection = doc.collection();
        return this.stagedReplaces().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(doc.id())).findFirst();
    }

    private static LogDeferThrowable dbg(Throwable err) {
        return DebugUtil.dbg(err);
    }

    private Mono<TransactionGetResult> handleDocExistsDuringStagedInsert(ReactiveCollection collection, String id, Object content, SpanWrapper pspan, TransactionInsertOptions.BuiltOptions options) {
        String bp = "DocExists on " + DebugUtil.docId(collection, id) + ": ";
        return this.beforeGetDocInExistsDuringStagedInsert(this, id).then(DocumentGetter.justGetDoc(collection, this.config, id, pspan, this.getTranscoder(), true, this.logger())).doOnSubscribe(x -> this.LOGGER.info(this.attemptId, "%s getting doc", bp)).onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            TransactionOperationFailedBuilder e = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
            this.LOGGER.warn(this.attemptId, "%s got error while getting doc: %s", bp, AttemptContextReactive.dbg(err));
            if (ec == ErrorClasses.FAIL_TRANSIENT || ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                e.retryTransaction();
            }
            return Mono.error((Throwable)e.build());
        }).flatMap(v -> {
            if (v.isPresent()) {
                Tuple2 results = (Tuple2)v.get();
                TransactionGetResult r = (TransactionGetResult)results.getT1();
                LookupInResult lir = (LookupInResult)results.getT2();
                this.LOGGER.info(this.attemptId, "%s doc %s exists inTransaction=%s isDeleted=%s", bp, DebugUtil.docId(collection, id), r.links(), lir.isDeleted());
                return this.forwardCompatibilityCheck(ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_INSERTING_GET, r.links().forwardCompatibility()).then(Mono.defer(() -> {
                    if (lir.isDeleted() && !r.links().isDocumentInTransaction()) {
                        this.LOGGER.info(this.attemptId, "%s doc %s is a regular tombstone without txn metadata, proceeding to overwrite", bp, DebugUtil.docId(collection, id));
                        return this.createStagedInsert(collection, id, content, pspan, options, Optional.of(r.cas()));
                    }
                    if (!r.links().isDocumentInTransaction()) {
                        this.LOGGER.info(this.attemptId, "%s doc %s exists but is not in txn, raising DocumentAlreadyExistsException", bp, DebugUtil.docId(collection, id));
                        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_ALREADY_EXISTS).cause((Throwable)new DocumentExistsException((ErrorContext)ReducedKeyValueErrorContext.create((String)id))).build());
                    }
                    if (!r.links().op().get().equals("insert")) {
                        this.LOGGER.info(this.attemptId, "%s doc %s is in a txn but is not a staged insert, raising DocumentAlreadyExistsException", bp, DebugUtil.docId(collection, id));
                        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_ALREADY_EXISTS).cause((Throwable)new DocumentExistsException((ErrorContext)ReducedKeyValueErrorContext.create((String)id))).build());
                    }
                    return this.checkAndHandleBlockingTxn(r, pspan, ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_INSERTING).then(this.overwriteStagedInsert(collection, id, content, pspan, options, bp, r, lir));
                }));
            }
            this.LOGGER.info(this.attemptId, "%s completed get of %s, could not find, throwing to retry txn which should succeed now", bp, DebugUtil.docId(collection, id));
            return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_NOT_FOUND).retryTransaction().build());
        });
    }

    private Mono<TransactionGetResult> overwriteStagedInsert(ReactiveCollection collection, String id, Object content, SpanWrapper pspan, TransactionInsertOptions.BuiltOptions options, String bp, TransactionGetResult r, LookupInResult lir) {
        return Mono.defer(() -> {
            assert (r.links().isDocumentInTransaction());
            assert (r.links().op().get().equals("insert"));
            if (lir.isDeleted()) {
                return this.createStagedInsert(collection, id, content, pspan, options, Optional.of(r.cas()));
            }
            this.LOGGER.info(this.attemptId, "%s removing %s as it's a protocol 1.0 staged insert", bp, DebugUtil.docId(collection, id));
            return this.beforeOverwritingStagedInsertRemoval(this, id).then(collection.remove(id, this.wrap(RemoveOptions.removeOptions(), pspan, collection.core()))).onErrorResume(err -> {
                this.LOGGER.warn(this.attemptId, "%s hit error %s while removing %s", bp, DebugUtil.dbg(err), DebugUtil.docId(collection, id));
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH || ec == ErrorClasses.FAIL_TRANSIENT) {
                    out.retryTransaction();
                }
                return Mono.error((Throwable)out.build());
            }).then(this.createStagedInsert(collection, id, content, pspan, options, Optional.empty()));
        });
    }

    private Mono<TransactionGetResult> createStagedInsert(ReactiveCollection collection, String id, Object content, SpanWrapper pspan, TransactionInsertOptions.BuiltOptions options, Optional<Long> cas) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), collection, id, "staging.insert", pspan);
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            JsonObject txn = this.createDocumentMetadata("insert", null);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "about to insert staged doc %s as shadow document, cas=%s", DebugUtil.docId(collection, id), cas);
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_CREATE_STAGED_INSERT, Optional.of(id));
            }).then(this.beforeStagedInsert(this, id)).then(collection.mutateIn(id, Arrays.asList(MutateInSpec.upsert((String)"txn", (Object)txn).xattr().createPath(), MutateInSpec.upsert((String)"txn.op.stgd", (Object)encoded.encoded()).xattr(), MutateInSpec.upsert((String)"txn.op.crc32", (Object)MutateInMacro.VALUE_CRC_32C).xattr()), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("createStagedInsert"))).accessDeleted(true).createAsDeleted(true).cas(cas.orElse(0L).longValue()).storeSemantics(cas.isPresent() ? StoreSemantics.REPLACE : StoreSemantics.INSERT), span, collection.core()))).flatMap(updatedDoc -> this.afterStagedInsertComplete(this, id).thenReturn(updatedDoc)).doOnNext(updatedDoc -> {
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "inserted doc %s got %s, in %dus", DebugUtil.docId(collection, id), this.dbg((MutationResult)updatedDoc), elapsed);
            }).map(updatedDoc -> {
                TransactionGetResult out = TransactionGetResult.createFromInsert(collection, id, encoded.encoded(), this.transactionId(), this.attemptId, this.atrId.get(), this.atrCollection.get().bucketName(), this.atrCollection.get().scopeName(), this.atrCollection.get().name(), updatedDoc.cas(), this.getTranscoder());
                this.addStagedMutation(new StagedMutation(out, encoded.encoded(), StagedMutationType.INSERT, (MutateInResult)updatedDoc));
                return out;
            }).onErrorResume(err -> {
                this.LOGGER.info(this.attemptId, "got err while staging insert of %s: %s", DebugUtil.docId(collection, id), AttemptContextReactive.dbg(err));
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof FeatureNotAvailableException) {
                    return Mono.error((Throwable)out.build());
                }
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(HOOK_CREATE_STAGED_INSERT, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeModeAndFail((Throwable)err, HOOK_CREATE_STAGED_INSERT, ec);
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.createStagedInsert(collection, id, content, span, options, cas));
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)out.retryTransaction().build());
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)out.doNotRollbackAttempt().build());
                }
                if (ec == ErrorClasses.FAIL_DOC_ALREADY_EXISTS || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    return this.handleDocExistsDuringStagedInsert(collection, id, content, span, options);
                }
                return Mono.error((Throwable)out.build());
            }).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    public Mono<Void> remove(TransactionGetResult doc) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "user.remove", this.attemptSpan);
            if (this.isDone()) {
                return Mono.error((Throwable)this.transactionIsDone());
            }
            if (this.isExistingError()) {
                return Mono.error((Throwable)this.existingError());
            }
            this.checkExpiryPreCommitAndSetExpiryOvertimeMode(HOOK_REMOVE, Optional.of(doc.id()));
            if (this.queryMode()) {
                return this.removeWithQuery(doc);
            }
            Mono first = Mono.empty();
            Optional<StagedMutation> existing = this.findStagedMutation(doc);
            if (existing.isPresent()) {
                StagedMutation op = existing.get();
                this.LOGGER.info(this.attemptId, "found previous write of %s as %s on remove", new Object[]{DebugUtil.docId(doc), op.type});
                if (op.type == StagedMutationType.REMOVE) {
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_NOT_FOUND).cause((Throwable)new DocumentNotFoundException(null)).build());
                }
                if (op.type == StagedMutationType.INSERT) {
                    return this.removeStagedInsert(doc, span).doOnSubscribe(v -> this.LOGGER.info(this.attemptId, "removing staged insert %s", DebugUtil.docId(doc))).doFinally(v -> span.finish()).doOnError(err -> {
                        if (err instanceof TransactionOperationFailed) {
                            this.errors.add((TransactionOperationFailed)err);
                        }
                    });
                }
            }
            return first.then(this.checkAndHandleBlockingTxn(doc, span, ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_REMOVING)).then(Mono.defer(() -> {
                this.initAtrIfNeeded(doc.collection(), doc.id());
                if (this.state == AttemptStates.NOT_STARTED) {
                    return this.atrPending(this.atrCollection.get(), span);
                }
                return Mono.empty();
            })).then(this.createStagedRemove(doc, span, doc.links().isDeleted())).doOnSubscribe(v -> this.LOGGER.info(this.attemptId, "remove doc %s", DebugUtil.docId(doc))).doFinally(v -> span.finish()).doOnError(err -> {
                AttemptContextReactive.failSpan(span, err);
                if (err instanceof TransactionOperationFailed) {
                    this.errors.add((TransactionOperationFailed)err);
                }
            });
        });
    }

    private Mono<Void> checkAndHandleBlockingTxn(TransactionGetResult doc, SpanWrapper pspan, ForwardCompatibilityStages stage) {
        if (doc.links().hasStagedWrite()) {
            if (doc.links().stagedTransactionId().get().equals(this.transactionId())) {
                this.LOGGER.info(this.attemptId, "doc %s has been written by this transaction, ok to continue", DebugUtil.docId(doc));
                return Mono.empty();
            }
            if (doc.links().atrId().isPresent() && doc.links().atrBucketName().isPresent()) {
                this.LOGGER.info(this.attemptId, "doc %s is in another txn %s, checking ATR entry %s/%s/%s to see if blocked", DebugUtil.docId(doc), doc.links().stagedAttemptId().get(), doc.links().atrBucketName().orElse(""), doc.links().atrCollectionName().orElse(""), doc.links().atrId().orElse(""));
                return this.forwardCompatibilityCheck(stage, doc.links().forwardCompatibility()).then(this.checkATREntryForBlockingDoc(doc, pspan));
            }
            this.LOGGER.info(this.attemptId, "doc %s is in another txn %s, cannot check ATR entry - probably a bug, so proceeding to overwrite", DebugUtil.docId(doc), doc.links().stagedAttemptId().get());
            return Mono.empty();
        }
        return Mono.empty();
    }

    private List<JsonObject> listStagedToDocRecords(List<StagedMutation> docs) {
        return this.listToDocRecords(docs.stream().map(v -> v.doc).collect(Collectors.toList()));
    }

    private List<JsonObject> listToDocRecords(List<TransactionGetResult> docs) {
        return docs.stream().map(v -> JsonObject.create().put("id", v.id()).put("bkt", v.collection().bucketName()).put("scp", v.collection().scopeName()).put("col", v.collection().name())).collect(Collectors.toList());
    }

    private List<MutateInSpec> addDocsToBuilder() {
        String prefix = "attempts." + this.attemptId;
        return Arrays.asList(MutateInSpec.upsert((String)(prefix + "." + "ins"), this.listStagedToDocRecords(this.stagedInserts())).xattr(), MutateInSpec.upsert((String)(prefix + "." + "rep"), this.listStagedToDocRecords(this.stagedReplaces())).xattr(), MutateInSpec.upsert((String)(prefix + "." + "rem"), this.listStagedToDocRecords(this.stagedRemoves())).xattr());
    }

    @Stability.Volatile
    public Mono<Void> defer() {
        return Mono.fromRunnable(() -> {
            if (this.queryMode()) {
                throw TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause((Throwable)new FeatureNotAvailableException("Deferred transactions are not supported when the transaction includes a query")).build();
            }
            String json = this.dehydrate(true).toString();
            this.LOGGER.info(this.attemptId, "deferring commit, serialized is %d chars", json.length());
            this.overall.serialized(TransactionSerializedContext.createFrom(json));
        }).then();
    }

    @Stability.Volatile
    Optional<TransactionSerializedContext> serialized() {
        return this.overall.serialized();
    }

    public Mono<Void> commit() {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "user.commit", this.attemptSpan);
            return Mono.defer(() -> {
                if (this.isExistingError()) {
                    return this.previousOperationFailedAtCommit().doOnError(err -> this.LOGGER.warn(this.attemptId, "Cannot proceed with commit as previous operations failed: " + AttemptContextReactive.dbg(err)));
                }
                if (this.isDone()) {
                    return Mono.error((Throwable)this.transactionIsDone());
                }
                if (this.queryMode()) {
                    return this.commitWithQuery(span);
                }
                this.LOGGER.info(this.attemptId, "commit %s", this);
                this.checkExpiryPreCommitAndSetExpiryOvertimeMode(HOOK_BEFORE_COMMIT, Optional.empty());
                this.isDone = true;
                if (!this.atrCollection.isPresent() || !this.atrId.isPresent()) {
                    return Mono.create(s -> {
                        this.LOGGER.info(this.attemptId, "calling commit on attempt that's got no mutations, skipping");
                        s.success();
                    });
                }
                String prefix = "attempts." + this.attemptId;
                ArrayList<MutateInSpec> specs = new ArrayList<MutateInSpec>();
                specs.add((MutateInSpec)MutateInSpec.upsert((String)(prefix + "." + "st"), (Object)AttemptStates.COMMITTED.name()).xattr());
                specs.add((MutateInSpec)MutateInSpec.upsert((String)(prefix + "." + "tsc"), (Object)MutateInMacro.CAS));
                specs.addAll(this.addDocsToBuilder());
                specs.add((MutateInSpec)MutateInSpec.insert((String)(prefix + "." + "p"), (Object)0).xattr());
                AtomicReference<Long> overallStartTime = new AtomicReference<Long>(0L);
                Mono<Void> atrCommit = this.atrCommit(specs, overallStartTime, span);
                return atrCommit.then(this.commitDocs(span)).then(this.atrComplete(prefix, overallStartTime, span)).doOnSuccess(ignore -> this.LOGGER.info(this.attemptId, "overall commit completed")).doFinally(v -> span.finish()).then();
            }).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> commitWithQuery(SpanWrapper span) {
        return Mono.defer(() -> {
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, null, "COMMIT", QueryOptions.queryOptions(), HOOK_QUERY_COMMIT, false, true, null, null, span, false).doOnNext(v -> {
                this.state = AttemptStates.COMPLETED;
            }).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    TransactionOperationFailed e = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).cause((Throwable)err).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).doNotRollbackAttempt().build();
                    this.errors.add(e);
                    return Mono.error((Throwable)e);
                }
                if (ec == ErrorClasses.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                TransactionOperationFailed e = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).doNotRollbackAttempt().build();
                this.errors.add(e);
                return Mono.error((Throwable)e);
            }).doOnNext(v -> {
                this.isDone = true;
            }).then();
        });
    }

    private Mono<Void> previousOperationFailedAtCommit() {
        boolean retryTransaction = true;
        boolean autoRollbackAttempt = true;
        ArrayList<Throwable> causes = new ArrayList<Throwable>();
        for (TransactionOperationFailed ew : this.errors) {
            if (!ew.retryTransaction()) {
                retryTransaction = false;
            }
            if (!ew.autoRollbackAttempt()) {
                autoRollbackAttempt = false;
            }
            if (ew.getCause() == null) continue;
            causes.add(ew.getCause());
        }
        TransactionOperationFailedBuilder eb = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new PreviousOperationFailed(causes));
        if (retryTransaction) {
            eb.retryTransaction();
        }
        if (!autoRollbackAttempt) {
            eb.doNotRollbackAttempt();
        }
        return Mono.error((Throwable)eb.build());
    }

    private void checkExpiryDuringCommitOrRollback(String stage, Optional<String> id) {
        if (!this.expiryOvertimeMode) {
            if (this.hasExpiredClientSide(stage, id)) {
                this.LOGGER.info(this.attemptId, "has expired in stage %s, entering expiry-overtime mode (one attempt to complete)", stage);
                this.expiryOvertimeMode = true;
            }
        } else {
            this.LOGGER.info(this.attemptId, "ignoring expiry in stage %s, as in expiry-overtime mode", stage);
        }
    }

    private Mono<Void> atrComplete(String prefix, AtomicReference<Long> overallStartTime, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "atr.complete", pspan);
            this.LOGGER.info(this.attemptId, "about to remove ATR entry %s", this.getAtrDebug(this.atrCollection, this.atrId));
            return Mono.defer(() -> {
                if (!this.expiryOvertimeMode && this.hasExpiredClientSide(HOOK_ATR_COMPLETE, Optional.empty())) {
                    String msg = "has expired in stage atrComplete, but transaction has successfully completed so returning success";
                    this.LOGGER.info(this.attemptId, msg);
                    return Mono.error((Throwable)new AttemptExpired(this, msg));
                }
                return Mono.empty();
            }).then(this.beforeAtrComplete(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), Collections.singletonList(MutateInSpec.remove((String)prefix).xattr()), (MutateInOptions)this.wrap(span, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext(HOOK_ATR_COMPLETE)))).flatMap(v -> this.afterAtrComplete(this)).doOnNext(v -> {
                this.state = AttemptStates.COMPLETED;
                long now = System.nanoTime();
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "removed ATR %s in %dus, overall commit completed in %dus", this.getAtrDebug(this.atrCollection, this.atrId), elapsed, TimeUnit.NANOSECONDS.toMicros(now - (Long)overallStartTime.get()));
            }).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                this.LOGGER.info(this.attemptId, "error '%s' ec=%s while removing ATR %s", new Object[]{err, ec, this.getAtrDebug(this.atrCollection, this.atrId)});
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT).doNotRollbackAttempt().build());
                }
                this.LOGGER.info(this.attemptId, "ignoring error during transaction tidyup, regarding as success");
                return Mono.empty();
            }).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private <T> Mono<T> mapErrorInOvertimeToExpired(String stage, Throwable err, TransactionOperationFailed.FinalErrorToRaise toRaise) {
        this.LOGGER.info(this.attemptId, "in expiry-overtime mode so changing error '%s' to raise %s in stage '%s'; no rollback will be tried", new Object[]{err, toRaise, stage});
        if (!this.expiryOvertimeMode) {
            this.LOGGER.warn(this.attemptId, "not in expiry-overtime mode handling error '%s' in stage %s, possibly a bug", err, stage);
        }
        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).doNotRollbackAttempt().raiseException(toRaise).cause(new AttemptExpired(this, err)).build());
    }

    private Mono<Void> removeDoc(SpanWrapper pspan, TransactionGetResult doc, boolean ambiguityResolutionMode) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "commit.remove", pspan);
            return Mono.fromRunnable(() -> {
                this.LOGGER.info(this.attemptId, "about to remove doc %s, ambiguityResolutionMode=%s", DebugUtil.docId(doc), ambiguityResolutionMode);
                this.checkExpiryDuringCommitOrRollback(HOOK_REMOVE_DOC, Optional.of(doc.id()));
            }).then(this.beforeDocRemoved(this, doc.id())).then(doc.collection().remove(doc.id(), this.wrap(RemoveOptions.removeOptions(), pspan, doc.collection().core()))).flatMap(mutationResult -> this.afterDocRemovedPreRetry(this, doc.id()).thenReturn(mutationResult)).doOnNext(mutationResult -> {
                this.LOGGER.info(this.attemptId, "commit - removed doc %s, mt = %s", DebugUtil.docId(doc), mutationResult.mutationToken());
                mutationResult.mutationToken().ifPresent(mt -> this.finalMutationTokens.add((MutationToken)mt));
            }).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder e = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                this.LOGGER.info("got error while removing doc %s in %dus: %s", DebugUtil.docId(doc), span.elapsed(), AttemptContextReactive.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(HOOK_REMOVE_DOC, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.removeDoc(span, doc, true));
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    return Mono.error((Throwable)e.build());
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)e.build());
                }
                return Mono.error((Throwable)e.build());
            }).then(this.afterDocRemovedPostRetry(this, doc.id())).then().doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> commitDocs(SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "commit.docs", pspan);
            return Flux.fromIterable(this.stagedMutations).concatMap(staged -> {
                TransactionGetResult doc = staged.doc;
                return this.commitDocWrapper(span, (StagedMutation)staged, doc);
            }).then(Mono.defer(() -> {
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "commit - all %d docs committed in %dus", this.stagedMutations.size(), elapsed);
                return this.afterDocsCommitted(this);
            })).then().doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private static String msgDocChangedUnexpectedly(ReactiveCollection collection, String id, String dbg) {
        return "Tried committing document " + DebugUtil.docId(collection, id) + ", but found that it has been modified by another party in-between staging and committing.  The application must ensure that non-transactional writes cannot happen at the same time as transactional writes on a document.  The change will be committed with CAS=0, which will overwrite the other change.  This document may need manual review to verify that no changes have been lost.  Last document state=" + dbg;
    }

    private static String msgDocRemovedUnexpectedly(ReactiveCollection collection, String id, String dbg) {
        return "Tried committing document " + DebugUtil.docId(collection, id) + ", but found that it has been removed by another party in-between replacing and committing.  The application must ensure that non-transactional writes cannot happen at the same time as transactional writes on a document.  The document will be left removed, and the transaction's changes will not be written to this document.  Last document state=" + dbg;
    }

    private Mono<Void> commitDocWrapper(SpanWrapper pspan, StagedMutation staged, TransactionGetResult doc) {
        return Mono.defer(() -> {
            if (staged.type == StagedMutationType.REMOVE) {
                return this.removeDoc(pspan, doc, false);
            }
            return this.commitDoc(pspan, staged, doc, false, staged.type == StagedMutationType.INSERT, false);
        });
    }

    private String dbg(MutationResult result) {
        if (result == null) {
            return "<unavailable>";
        }
        StringBuilder sb = new StringBuilder();
        sb.append("cas=");
        sb.append(result.cas());
        result.mutationToken().ifPresent(mt -> {
            sb.append(",seqno=");
            sb.append(mt.sequenceNumber());
            sb.append(",vbucket=");
            sb.append(mt.partitionID());
        });
        return sb.toString();
    }

    private Mono<Void> commitDoc(SpanWrapper pspan, StagedMutation staged, TransactionGetResult doc, boolean casZeroMode, boolean insertMode, boolean ambiguityResolutionMode) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "commit.doc", pspan);
            return Mono.fromRunnable(() -> {
                this.LOGGER.info(this.attemptId, "commit - committing doc %s, casZeroMode=%s, insertMode=%s, ambiguity-resolution=%s", DebugUtil.docId(doc), casZeroMode, insertMode, ambiguityResolutionMode);
                this.checkExpiryDuringCommitOrRollback(HOOK_COMMIT_DOC, Optional.of(doc.id()));
            }).then(this.beforeDocCommitted(this, doc.id())).then(Mono.defer(() -> {
                if (insertMode) {
                    return doc.collection().insert(doc.id(), (Object)staged.content, OptionsWrapperUtil.wrap(InsertOptions.insertOptions(), span, this.config, doc.collection().core()).transcoder((Transcoder)RawJsonTranscoder.INSTANCE));
                }
                return doc.collection().mutateIn(doc.id(), Arrays.asList(MutateInSpec.upsert((String)"txn", null).xattr(), MutateInSpec.remove((String)"txn").xattr(), MutateInSpec.replace((String)"", (Object)staged.content)), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("commitDoc"))).cas(casZeroMode ? 0L : doc.cas()), span, doc.collection().core()));
            })).flatMap(v -> this.afterDocCommittedBeforeSavingCAS(this, doc.id()).thenReturn(v)).flatMap(updatedDoc -> {
                this.LOGGER.info(this.attemptId, "commit - committed doc %s got %s", DebugUtil.docId(doc), this.dbg((MutationResult)updatedDoc));
                doc.cas(updatedDoc.cas());
                updatedDoc.mutationToken().ifPresent(mt -> this.finalMutationTokens.add((MutationToken)mt));
                return this.afterDocCommitted(this, doc.id());
            }).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder e = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                this.LOGGER.info(this.attemptId, "error while committing doc %s in %dus: %s", DebugUtil.docId(doc), span.elapsed(), AttemptContextReactive.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(HOOK_COMMIT_DOC, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT).thenReturn((Object)0);
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    this.LOGGER.warn(this.attemptId, "%s while committing doc %s: as op is ambiguously successful, retrying op in ambiguity-resolution mode", DebugUtil.dbg(err), DebugUtil.docId(doc));
                    return this.commitDoc(span, staged, doc, casZeroMode, insertMode, true).thenReturn((Object)0);
                }
                if (ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    if (ambiguityResolutionMode) {
                        return Mono.error((Throwable)e.build());
                    }
                    String msg = AttemptContextReactive.msgDocChangedUnexpectedly(doc.collection(), doc.id(), this.dbg((MutationResult)staged.mr));
                    this.LOGGER.warn(this.attemptId, msg);
                    this.LOGGER.eventBus().publish((Event)new IllegalDocumentState(Event.Severity.WARN, msg, doc.id()));
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.commitDoc(span, staged, doc, true, false, ambiguityResolutionMode).thenReturn((Object)0));
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    String msg = AttemptContextReactive.msgDocRemovedUnexpectedly(doc.collection(), doc.id(), this.dbg((MutationResult)staged.mr));
                    this.LOGGER.warn(this.attemptId, msg);
                    this.LOGGER.eventBus().publish((Event)new IllegalDocumentState(Event.Severity.WARN, msg, doc.id()));
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.commitDoc(span, staged, doc, false, true, ambiguityResolutionMode).thenReturn((Object)0));
                }
                if (ec == ErrorClasses.FAIL_DOC_ALREADY_EXISTS) {
                    if (ambiguityResolutionMode) {
                        return Mono.error((Throwable)e.build());
                    }
                    String msg = AttemptContextReactive.msgDocRemovedUnexpectedly(doc.collection(), doc.id(), this.dbg((MutationResult)staged.mr));
                    this.LOGGER.warn(this.attemptId, msg);
                    this.LOGGER.eventBus().publish((Event)new IllegalDocumentState(Event.Severity.WARN, msg, doc.id()));
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.commitDoc(span, staged, doc, true, false, ambiguityResolutionMode).thenReturn((Object)0));
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)e.build());
                }
                return Mono.error((Throwable)e.build());
            }).then().doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> atrCommitAmbiguityResolution(AtomicReference<Long> overallStartTime, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "atr.commit_ambiguity_resolution", pspan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "about to fetch status of ATR %s to resolve ambiguity, expiryOvertimeMode=%s", this.getAtrDebug(this.atrCollection, this.atrId), this.expiryOvertimeMode);
                overallStartTime.set(System.nanoTime());
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_COMMIT_AMBIGUITY_RESOLUTION, Optional.empty());
            }).then(this.beforeAtrCommitAmbiguityResolution(this)).then(this.atrCollection.get().lookupIn(this.atrId.get(), Collections.singletonList(LookupInSpec.get((String)("attempts." + this.attemptId + "." + "st")).xattr()), (LookupInOptions)LookupInOptions.lookupInOptions().serializer((JsonSerializer)SerializationUtil.DEFAULT_JSON_SERIALIZER).parentSpan(span.span()))).flatMap(result -> {
                String status = (String)result.contentAs(0, String.class);
                this.LOGGER.info(this.attemptId, "got status of ATR %s: '%s'", this.getAtrDebug(this.atrCollection, this.atrId), status);
                AttemptStates state = AttemptStates.convert(status);
                switch (state) {
                    case COMMITTED: {
                        return Mono.empty();
                    }
                    case ABORTED: {
                        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).retryTransaction().build());
                    }
                }
                return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).doNotRollbackAttempt().cause(new IllegalStateException("This transaction has been changed by another actor to be in unexpected state " + status)).build());
            }).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause((Throwable)err);
                if (err instanceof RetryAtrCommit || ec == ErrorClasses.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                this.LOGGER.info(this.attemptId, "error while resolving ATR %s ambiguity in %dus: %s", this.getAtrDebug(this.atrCollection, this.atrId), span.elapsed(), AttemptContextReactive.dbg(err));
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause(new AttemptExpired(this, (Throwable)err)).build());
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)builder.doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build());
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT || ec == ErrorClasses.FAIL_OTHER) {
                    return Mono.error((Throwable)new RetryOperation());
                }
                if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause(new ActiveTransactionRecordEntryNotFound(this.atrId.get(), this.attemptId)).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build());
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause(new ActiveTransactionRecordNotFound(this.atrId.get(), this.attemptId)).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build());
                }
                return Mono.error((Throwable)builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> atrCommit(List<MutateInSpec> specs, AtomicReference<Long> overallStartTime, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "atr.commit", pspan);
            AtomicBoolean ambiguityResolutionMode = new AtomicBoolean(false);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "about to set ATR %s to Committed, expiryOvertimeMode=%s, ambiguityResolutionMode=%s", this.getAtrDebug(this.atrCollection, this.atrId), this.expiryOvertimeMode, ambiguityResolutionMode);
                overallStartTime.set(System.nanoTime());
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_COMMIT, Optional.empty());
            }).then(this.beforeAtrCommit(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), specs, (MutateInOptions)this.wrap(span, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext("atrCommit")))).flatMap(v -> this.afterAtrCommit(this)).doOnNext(v -> {
                this.state = AttemptStates.COMMITTED;
                this.LOGGER.info(this.attemptId, "set ATR %s to Committed in %dus", this.getAtrDebug(this.atrCollection, this.atrId), TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - (Long)overallStartTime.get()));
            }).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while setting ATR %s to Committed in %dus: %s", this.getAtrDebug(this.atrCollection, this.atrId), span.elapsed(), AttemptContextReactive.dbg(err));
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    TransactionOperationFailed.FinalErrorToRaise toRaise = ambiguityResolutionMode.get() ? TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS : TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED;
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(toRaise).cause(new AttemptExpired(this, (Throwable)err)).build());
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    ambiguityResolutionMode.set(true);
                    return Mono.error((Throwable)new RetryOperation());
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    if (ambiguityResolutionMode.get()) {
                        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause((Throwable)err).build());
                    }
                    return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    if (ambiguityResolutionMode.get()) {
                        throw new RetryOperation();
                    }
                    return Mono.error((Throwable)builder.retryTransaction().build());
                }
                if (ec == ErrorClasses.FAIL_PATH_ALREADY_EXISTS) {
                    return this.atrCommitAmbiguityResolution(overallStartTime, pspan).onErrorResume(e -> {
                        if (e instanceof RetryAtrCommit) {
                            ambiguityResolutionMode.set(false);
                            throw new RetryOperation();
                        }
                        return Mono.error((Throwable)e);
                    });
                }
                Throwable cause = err;
                boolean rollback = true;
                switch (ec) {
                    case FAIL_PATH_NOT_FOUND: {
                        cause = new ActiveTransactionRecordEntryNotFound(this.atrId.get(), this.attemptId);
                        rollback = false;
                        break;
                    }
                    case FAIL_DOC_NOT_FOUND: {
                        cause = new ActiveTransactionRecordNotFound(this.atrId.get(), this.attemptId);
                        rollback = false;
                        break;
                    }
                    case FAIL_ATR_FULL: {
                        cause = new ActiveTransactionRecordFull(this, (Throwable)err);
                        rollback = false;
                    }
                }
                if (ambiguityResolutionMode.get()) {
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause(cause).build());
                }
                return Mono.error((Throwable)builder.cause(cause).rollbackAttempt(rollback).build());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private <T> Mono<T> setExpiryOvertimeMode(String stage) {
        return Mono.fromRunnable(() -> {
            this.LOGGER.info(this.attemptId, "moving to expiry-overtime-mode in stage %s", stage);
            this.expiryOvertimeMode = true;
        });
    }

    private <T> Mono<T> setExpiryOvertimeModeAndFail(Throwable err, String stage, ErrorClasses ec) {
        this.LOGGER.info(this.attemptId, "moving to expiry-overtime-mode in stage %s, and raising error", stage);
        this.expiryOvertimeMode = true;
        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).cause(new AttemptExpired(this, err)).build());
    }

    public Mono<Void> rollback() {
        return this.rollbackInternal(true);
    }

    Mono<Void> rollbackInternal(boolean isAppRollback) {
        SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "rollback", this.attemptSpan).attribute("db.couchbase.transactions.user_initiated_rollback", true);
        if (this.queryMode()) {
            return this.rollbackQuery();
        }
        return Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "rollback %s expiryOvertimeMode=%s isAppRollback=%s", this, this.expiryOvertimeMode, isAppRollback);
            if (!this.expiryOvertimeMode && this.hasExpiredClientSide(HOOK_ROLLBACK, Optional.empty())) {
                this.LOGGER.info(this.attemptId, "has expired before rollback, entering expiry-overtime mode");
                this.expiryOvertimeMode = true;
            }
            if (isAppRollback && this.isDone()) {
                return Mono.error((Throwable)this.transactionIsDone());
            }
            this.isDone = true;
            if (!this.atrCollection.isPresent() || !this.atrId.isPresent()) {
                return Mono.create(s -> {
                    this.LOGGER.info(this.attemptId, "Calling rollback when it's had no mutations, so nothing to do");
                    s.success();
                });
            }
            String prefix = "attempts." + this.attemptId;
            return this.atrAbort(prefix, span, isAppRollback, false).then(this.rollbackDocs(span)).then(this.atrRollbackComplete(prefix, span)).onErrorResume(err -> {
                if (err instanceof ActiveTransactionRecordNotFound) {
                    this.LOGGER.info(this.attemptId, "ActiveTransactionRecordNotFound indicates that nothing needs to be done for this rollback: treating as successful rollback");
                    return Mono.empty();
                }
                return Mono.error((Throwable)err);
            });
        }).doFinally(v -> span.finish());
    }

    private Mono<Void> rollbackQuery() {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "rollback_query", this.attemptSpan);
            int statementIdx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(statementIdx, null, "ROLLBACK", QueryOptions.queryOptions(), HOOK_QUERY_ROLLBACK, false, false, null, null, span, false).then(Mono.fromRunnable(() -> {
                this.state = AttemptStates.ROLLED_BACK;
                this.isDone = true;
            })).onErrorResume(err -> {
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                if (err instanceof AttemptNotFoundOnQuery) {
                    return Mono.empty();
                }
                TransactionOperationFailed e = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause((Throwable)err).doNotRollbackAttempt().build();
                this.errors.add(e);
                return Mono.error((Throwable)e);
            }).doFinally(v -> span.finish()).then();
        });
    }

    private Mono<Void> atrRollbackComplete(String prefix, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "atr.rollback_complete", pspan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "marking ATR %s as rollback complete", this.getAtrDebug(this.atrCollection, this.atrId));
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_ROLLBACK_COMPLETE, Optional.empty());
            }).then(this.beforeAtrRolledBack(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), Collections.singletonList(MutateInSpec.remove((String)prefix).xattr()), (MutateInOptions)this.wrap(span, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext(HOOK_ATR_ROLLBACK_COMPLETE)))).flatMap(v -> this.afterAtrRolledBack(this)).doOnNext(v -> {
                this.state = AttemptStates.ROLLED_BACK;
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "rollback - atr rolled back in %dus", elapsed);
            }).onErrorResume(err -> {
                this.LOGGER.info(this.attemptId, "error while marking ATR %s as rollback complete in %dus: %s", this.getAtrDebug(this.atrCollection, this.atrId), span.elapsed(), AttemptContextReactive.dbg(err));
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder error = TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt();
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(HOOK_ATR_ROLLBACK_COMPLETE, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build());
                }
                if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND || ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    return Mono.empty();
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)error.build());
                }
                return Mono.error((Throwable)new RetryOperation());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).then().doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> rollbackDocs(SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "rollback.docs", pspan);
            return Flux.fromIterable(this.stagedMutations).concatMap(staged -> {
                TransactionGetResult doc = staged.doc;
                switch (staged.type) {
                    case INSERT: {
                        return this.rollbackStagedInsert(span, doc);
                    }
                }
                return this.rollbackStagedReplaceOrRemove(span, doc);
            }).doOnNext(v -> this.LOGGER.info(this.attemptId, "rollback - docs rolled back")).then().doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> rollbackStagedReplaceOrRemove(SpanWrapper pspan, TransactionGetResult doc) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "rollback.doc", pspan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "rolling back doc %s with cas %d by removing staged mutation", DebugUtil.docId(doc), doc.cas());
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ROLLBACK_DOC, Optional.of(doc.id()));
            }).then(this.beforeDocRolledBack(this, doc.id())).then(doc.collection().mutateIn(doc.id(), Arrays.asList(MutateInSpec.remove((String)"txn").xattr()), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("rollbackDoc"))).cas(doc.cas()), span, doc.collection().core()))).flatMap(updatedDoc -> this.afterRollbackReplaceOrRemove(this, doc.id()).thenReturn(updatedDoc)).doOnNext(updatedDoc -> this.LOGGER.info(this.attemptId, "rolled back doc %s, got cas %d and mt %s", DebugUtil.docId(doc), updatedDoc.cas(), updatedDoc.mutationToken())).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause((Throwable)err);
                this.logger().info(this.attemptId, "got error while rolling back doc %s in %dus: %s", DebugUtil.docId(doc), span.elapsed(), AttemptContextReactive.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(HOOK_ROLLBACK_DOC, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeMode(HOOK_ROLLBACK_DOC).then(Mono.error((Throwable)new RetryOperation()));
                }
                if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                    this.LOGGER.info(this.attemptId, "got PATH_NOT_FOUND while cleaning up staged doc %s, it must have already been rolled back, continuing", DebugUtil.docId(doc));
                    return Mono.empty();
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    return Mono.error((Throwable)builder.build());
                }
                if (ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    return Mono.error((Throwable)builder.build());
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
                }
                return Mono.error((Throwable)new RetryOperation());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Publisher<Void> rollbackStagedInsert(SpanWrapper pspan, TransactionGetResult doc) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "rollback.insert", pspan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "rolling back staged insert %s with cas %d", DebugUtil.docId(doc), doc.cas());
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_DELETE_INSERTED, Optional.of(doc.id()));
            }).then(this.beforeRollbackDeleteInserted(this, doc.id())).then(Mono.defer(() -> doc.collection().mutateIn(doc.id(), Collections.singletonList(MutateInSpec.remove((String)"txn").xattr()), this.wrap(MutateInOptions.mutateInOptions(), span, doc.collection().core()).accessDeleted(true).cas(doc.cas())))).flatMap(updatedDoc -> this.afterRollbackDeleteInserted(this, doc.id()).thenReturn(updatedDoc)).doOnNext(updatedDoc -> this.LOGGER.info(this.attemptId, "deleted inserted doc %s, mt %s", DebugUtil.docId(doc), updatedDoc.mutationToken())).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while rolling back inserted doc %s in %dus: %s", DebugUtil.docId(doc), span.elapsed(), AttemptContextReactive.dbg(err));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(HOOK_REMOVE_DOC, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeMode(HOOK_REMOVE).then(Mono.error((Throwable)new RetryOperation()));
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                    this.LOGGER.info(this.attemptId, "got %s while removing staged insert doc %s, it must have already been rolled back, continuing", new Object[]{ec, DebugUtil.docId(doc)});
                    return Mono.empty();
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
                }
                if (ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
                }
                return Mono.error((Throwable)new RetryOperation());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    private Mono<Void> removeStagedInsert(TransactionGetResult doc, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), doc.collection(), doc.id(), "staging.remove_staged_insert", pspan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "removing staged insert %s with cas %d", DebugUtil.docId(doc), doc.cas());
                if (this.hasExpiredClientSide(HOOK_REMOVE_STAGED_INSERT, Optional.of(doc.id()))) {
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).doNotRollbackAttempt().cause(new AttemptExpired(this, "Attempt has expired in stage " + HOOK_REMOVE_STAGED_INSERT)).build());
                }
                return Mono.empty();
            }).then(this.beforeRemoveStagedInsert(this, doc.id())).then(Mono.defer(() -> doc.collection().mutateIn(doc.id(), Collections.singletonList(MutateInSpec.remove((String)"txn").xattr()), this.wrap(MutateInOptions.mutateInOptions(), pspan, doc.collection().core()).accessDeleted(true).cas(doc.cas())))).flatMap(updatedDoc -> this.afterRemoveStagedInsert(this, doc.id()).thenReturn(updatedDoc)).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).retryTransaction().cause((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while removing staged insert doc %s in %dus: %s", DebugUtil.docId(doc), span.elapsed(), AttemptContextReactive.dbg(err));
                if (ec == ErrorClasses.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
                }
                return Mono.error((Throwable)builder.build());
            }).doOnNext(v -> {
                doc.cas(v.cas());
                this.removeStagedMutation(doc);
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "removed staged insert from doc %s in %dus", DebugUtil.docId(doc), elapsed);
            }).then();
        });
    }

    private Mono<Void> atrAbort(String prefix, SpanWrapper pspan, boolean isAppRollback, boolean ambiguityResolutionMode) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), this.atrCollection.orElse(null), this.atrId.orElse(null), "atr.abort", pspan);
            ArrayList<Object> spec = new ArrayList<Object>();
            spec.add(MutateInSpec.upsert((String)(prefix + "." + "st"), (Object)AttemptStates.ABORTED.name()).xattr());
            spec.add(MutateInSpec.upsert((String)(prefix + "." + "tsrs"), (Object)MutateInMacro.CAS));
            spec.addAll(this.addDocsToBuilder());
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "aborting ATR %s isAppRollback=%s ambiguityResolutionMode=%s", this.getAtrDebug(this.atrCollection, this.atrId), isAppRollback, ambiguityResolutionMode);
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_ABORT, Optional.empty());
            }).then(this.beforeAtrAborted(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), spec, (MutateInOptions)this.wrap(span, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext("atrAbort")))).then(this.afterAtrAborted(this)).doOnNext(v -> {
                this.state = AttemptStates.ABORTED;
                this.LOGGER.info(this.attemptId, "aborted ATR %s", this.getAtrDebug(this.atrCollection, this.atrId));
            }).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).doNotRollbackAttempt();
                this.LOGGER.info(this.attemptId, "error %s while aborting ATR %s", DebugUtil.dbg(err), this.getAtrDebug(this.atrCollection, this.atrId));
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(HOOK_ATR_ABORT, (Throwable)err, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeMode(HOOK_ATR_ABORT).then(Mono.error((Throwable)new RetryOperation()));
                }
                if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                    return Mono.error((Throwable)builder.cause(new ActiveTransactionRecordEntryNotFound(this.atrId.get(), this.attemptId)).build());
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                    return Mono.error((Throwable)builder.cause(new ActiveTransactionRecordNotFound(this.atrId.get(), this.attemptId)).build());
                }
                if (ec == ErrorClasses.FAIL_ATR_FULL) {
                    return Mono.error((Throwable)builder.cause(new ActiveTransactionRecordFull(this)).build());
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
                }
                return Mono.error((Throwable)new RetryOperation());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).doOnError(err -> AttemptContextReactive.failSpan(span, err)).doFinally(v -> span.finish());
        });
    }

    boolean isDone() {
        return this.isDone;
    }

    boolean isExistingError() {
        return !this.errors.isEmpty();
    }

    AttemptStates state() {
        return this.state;
    }

    boolean queryMode() {
        return this.queryTarget != null;
    }

    @Stability.Uncommitted
    public Mono<ReactiveQueryResult> query(String statement) {
        return this.query(statement, TransactionQueryOptions.queryOptions());
    }

    Mono<QueryResult> queryBlocking(String statement, TransactionQueryOptions options) {
        return Mono.defer(() -> {
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, null, statement, options.builder(), HOOK_QUERY, false, true, null, null, null, false);
        });
    }

    Mono<QueryResult> queryBlocking(Scope scope, String statement, TransactionQueryOptions options, boolean tximplicit) {
        return Mono.defer(() -> {
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, scope, statement, options.builder(), HOOK_QUERY, false, true, null, null, null, tximplicit);
        });
    }

    private String debugMetrics(QueryMetaData md) {
        StringBuilder sb = new StringBuilder();
        md.metrics().ifPresent(m -> sb.append(m));
        md.profile().ifPresent(p -> {
            sb.append(", profile=");
            sb.append(p);
        });
        return sb.toString();
    }

    private Duration queryTimeout() {
        return Duration.ofMillis(this.expiryRemainingMillis()).plus(this.config.keyValueTimeout().orElse(this.cluster().environment().timeoutConfig().kvDurableTimeout())).plusSeconds(1L);
    }

    private Mono<ReactiveQueryResult> queryInternalReactive(int sidx, @Nullable ReactiveScope scope, String statement, TransactionQueryOptions options, @Nullable SpanWrapper pspan, boolean tximplicit) {
        return this.beforeQuery(this, statement).then(Mono.defer(() -> {
            QueryOptions.Built built = ((QueryOptions)options.builder().parentSpan(pspan == null ? this.attemptSpan.span() : pspan.span())).build();
            JsonSerializer serializer = built.serializer() == null ? this.cluster().environment().jsonSerializer() : built.serializer();
            String queryContext = scope == null ? null : "`default`:`" + scope.bucketName() + "`.`" + scope.name() + "`";
            QueryRequest request = QueryAccessor.targetedQueryRequest(statement, built, queryContext, this.queryTarget, this.queryTimeout(), this.cluster(), tximplicit);
            return SDKAccessUtil.queryAccessor(this.cluster()).queryReactive(request, built, serializer).doOnNext(v -> {
                if (this.queryTarget == null) {
                    this.queryTarget = request.context().lastDispatchedToNode();
                    this.logger().info(this.attemptId, "q%d got query node to use for future queries %s", sidx, RedactableArgument.redactMeta((Object)this.queryTarget));
                }
            }).doFinally(ignore -> request.context().logicallyComplete());
        })).flatMap(result -> this.afterQuery(this, statement).thenReturn(result));
    }

    private Mono<QueryResult> queryInternal(int sidx, @Nullable Scope scope, String statement, QueryOptions options, @Nullable SpanWrapper pspan, boolean tximplicit) {
        return this.beforeQuery(this, statement).then(Mono.defer(() -> {
            options.metrics(true);
            options.parentSpan(pspan == null ? this.attemptSpan.span() : pspan.span());
            QueryOptions.Built built = options.build();
            JsonSerializer serializer = built.serializer() == null ? this.cluster().environment().jsonSerializer() : built.serializer();
            String queryContext = scope == null ? null : "`default`:`" + scope.bucketName() + "`.`" + scope.name() + "`";
            QueryRequest request = QueryAccessor.targetedQueryRequest(statement, built, queryContext, this.queryTarget, this.queryTimeout(), this.cluster(), tximplicit);
            return Mono.fromFuture((CompletableFuture)SDKAccessUtil.queryAccessor(this.cluster()).queryAsync(request, built, serializer)).doOnNext(v -> {
                if (this.queryTarget == null) {
                    this.queryTarget = request.context().lastDispatchedToNode();
                    this.logger().info(this.attemptId, "q%d got query node id %s", sidx, RedactableArgument.redactMeta((Object)this.queryTarget));
                }
            }).doFinally(ignore -> request.context().logicallyComplete());
        })).flatMap(result -> this.afterQuery(this, statement).thenReturn(result));
    }

    private Mono<QueryResult> queryWrapper(int sidx, @Nullable Scope scope, String statement, QueryOptions options, String hookPoint, boolean isBeginWork, boolean existingErrorCheck, @Nullable JsonObject txdata, @Nullable JsonArray params, @Nullable SpanWrapper pspan, boolean tximplicit) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "query.wrapper", pspan != null ? pspan : this.attemptSpan).attribute("db.statement", statement).attribute("db.couchbase.transactions.tximplicit", tximplicit);
            Mono<Void> beginWorkIfNeeded = Mono.empty();
            if (!this.queryMode() && !isBeginWork) {
                beginWorkIfNeeded = this.queryBeginWork(span);
            }
            this.logger().debug(this.attemptId, "q%d: '%s' params=%s txdata=%s tximplicit=%s", sidx, RedactableArgument.redactUser((Object)statement), RedactableArgument.redactUser((Object)params), RedactableArgument.redactUser((Object)txdata), tximplicit);
            if (txdata != null) {
                options.raw("txdata", (Object)txdata);
            }
            return beginWorkIfNeeded.then(this.queryInternalPre(sidx, statement, hookPoint, existingErrorCheck)).then(Mono.defer(() -> {
                if (!isBeginWork) {
                    options.raw("txid", (Object)this.attemptId);
                }
                return this.queryInternal(sidx, scope, statement, options, span, tximplicit);
            })).onErrorResume(err -> {
                RuntimeException converted = this.convertQueryError(sidx, (Throwable)err);
                long elapsed = span.finish();
                this.logger().warn(this.attemptId, "q%d got error %s after %dus, converted to %s", sidx, AttemptContextReactive.dbg(err), elapsed, AttemptContextReactive.dbg(converted));
                if (converted != null) {
                    if (converted instanceof TransactionOperationFailed) {
                        this.errors.add((TransactionOperationFailed)converted);
                    }
                    return Mono.error((Throwable)converted);
                }
                return Mono.error((Throwable)err);
            }).flatMap(result -> {
                long elapsed = span.finish();
                this.logger().info(this.attemptId, "q%d returned with metrics %s after %dus", sidx, this.debugMetrics(result.metaData()), elapsed);
                if (result.metaData().status() == QueryStatus.FATAL) {
                    TransactionOperationFailed err = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).build();
                    this.errors.add(err);
                    return Mono.error((Throwable)err);
                }
                return Mono.just((Object)result);
            });
        });
    }

    private ErrorCodeAndMessage chooseQueryError(QueryErrorContext ctx) {
        for (ErrorCodeAndMessage err : ctx.errors()) {
            if (!err.context().containsKey("cause")) continue;
            return err;
        }
        for (ErrorCodeAndMessage err : ctx.errors()) {
            if (err.code() < 17000 || err.code() > 18000) continue;
            return err;
        }
        return (ErrorCodeAndMessage)ctx.errors().get(0);
    }

    private RuntimeException convertQueryError(int sidx, Throwable err) {
        QueryErrorContext ctx;
        CouchbaseException ce;
        if (err instanceof TimeoutException) {
            return new AttemptExpired(this, err);
        }
        if (err instanceof CouchbaseException && (ce = (CouchbaseException)err).context() instanceof QueryErrorContext && (ctx = (QueryErrorContext)ce.context()).errors().size() >= 1) {
            ErrorCodeAndMessage chosenError = this.chooseQueryError(ctx);
            int code = chosenError.code();
            switch (code) {
                case 1065: {
                    return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause((Throwable)new FeatureNotAvailableException("Unknown query parameter: note that query support in transactions is available from Couchbase Server 7.0 onwards", err)).build();
                }
                case 17004: {
                    return new AttemptNotFoundOnQuery();
                }
                case 1080: 
                case 17010: {
                    return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).cause(new AttemptExpired(this, err)).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build();
                }
                case 17012: {
                    return new DocumentExistsException((ErrorContext)ctx);
                }
                case 17014: {
                    return new DocumentNotFoundException((ErrorContext)ctx);
                }
                case 17015: {
                    return new CasMismatchException((ErrorContext)ctx);
                }
            }
            if (chosenError.context().containsKey("cause")) {
                Map cause = (Map)chosenError.context().get("cause");
                Boolean rollbackRaw = (Boolean)cause.get("rollback");
                Boolean retryRaw = (Boolean)cause.get("retry");
                String raise = (String)cause.get("raise");
                this.logger().info(this.attemptId, "q%d query code=%d cause=%s raise=%s", sidx, code, RedactableArgument.redactUser((Object)cause), raise);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(err);
                switch (raise) {
                    case "failed_post_commit": {
                        builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                        break;
                    }
                    case "commit_ambiguous": {
                        builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS);
                        break;
                    }
                    case "expired": {
                        builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                        break;
                    }
                    default: {
                        builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED);
                    }
                }
                if (retryRaw != null && retryRaw.booleanValue()) {
                    builder.retryTransaction();
                }
                if (rollbackRaw != null && !rollbackRaw.booleanValue()) {
                    builder.doNotRollbackAttempt();
                }
                return builder.build();
            }
        }
        return null;
    }

    private Mono<Void> queryBeginWork(SpanWrapper pspan) {
        return Mono.defer(() -> {
            String durabilityLevelString;
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "query.begin_work", pspan);
            JsonObject txdata = this.makeQueryTxData();
            QueryScanConsistency scanConsistency = null;
            if (this.config.scanConsistency().isPresent()) {
                scanConsistency = this.config.scanConsistency().get();
            }
            this.logger().info(this.attemptId, "BEGIN WORK scanConsistency: c=%s", scanConsistency);
            QueryOptions options = QueryOptions.queryOptions();
            if (scanConsistency != null) {
                options.scanConsistency(scanConsistency);
            }
            if (scanConsistency == QueryScanConsistency.NOT_BOUNDED) {
                options.raw("scan_consistency", (Object)QueryScanConsistency.NOT_BOUNDED.toString());
            }
            TransactionDurabilityLevel durabilityLevel = this.config.transactionDurabilityLevel();
            switch (durabilityLevel) {
                case NONE: {
                    durabilityLevelString = "none";
                    break;
                }
                case MAJORITY: {
                    durabilityLevelString = "majority";
                    break;
                }
                case MAJORITY_AND_PERSIST_TO_ACTIVE: {
                    durabilityLevelString = "majorityAndPersistActive";
                    break;
                }
                case PERSIST_TO_MAJORITY: {
                    durabilityLevelString = "persistToMajority";
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unknown durability level " + (Object)((Object)durabilityLevel));
                }
            }
            options.raw("durability_level", (Object)durabilityLevelString);
            options.raw("txtimeout", (Object)(this.expiryRemainingMillis() + "ms"));
            this.config.metadataCollection().ifPresent(metadataCollection -> options.raw("atrcollection", (Object)String.format("`%s`.`%s`.`%s`", metadataCollection.bucketName(), metadataCollection.scopeName(), metadataCollection.name())));
            options.raw("numatrs", (Object)this.config.numAtrs());
            String statement = "BEGIN WORK";
            int statementIdx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(statementIdx, null, statement, options, HOOK_QUERY_BEGIN_WORK, true, true, txdata, null, span, false).doOnNext(v -> {
                this.stagedMutations.clear();
                List rows = v.rowsAsObject();
                for (JsonObject row : rows) {
                    String txid = row.getString("txid");
                    this.logger().info(this.attemptId, "BEGIN WORK got txid %s", txid);
                }
            }).doFinally(ignore -> this.span().finish()).then();
        });
    }

    private Cluster cluster() {
        return this.parent.cleanup().clusterData().cluster();
    }

    @Stability.Uncommitted
    public Mono<ReactiveQueryResult> query(String statement, TransactionQueryOptions options) {
        return this.query(null, statement, options);
    }

    @Stability.Uncommitted
    public Mono<ReactiveQueryResult> query(ReactiveScope scope, String statement) {
        return this.query(scope, statement, null, false);
    }

    @Stability.Uncommitted
    public Mono<ReactiveQueryResult> query(ReactiveScope scope, String statement, TransactionQueryOptions options) {
        return this.query(scope, statement, options, false);
    }

    @Stability.Uncommitted
    Mono<ReactiveQueryResult> query(ReactiveScope scope, String statement, TransactionQueryOptions options, boolean tximplicit) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapperUtil.createOp(this, this.tracer(), null, null, "user.query", this.attemptSpan).attribute("db.statement", statement).attribute("db.couchbase.transactions.tximplicit", tximplicit);
            long start = System.nanoTime();
            int sidx = this.queryStatementIdx.getAndIncrement();
            Mono<Void> beginWorkIfNeeded = Mono.empty();
            if (!this.queryMode()) {
                beginWorkIfNeeded = this.queryBeginWork(span);
            }
            this.logger().debug(this.attemptId, "q%d: '%s' tximplicit=%s", sidx, RedactableArgument.redactUser((Object)statement), tximplicit);
            return beginWorkIfNeeded.then(this.queryInternalPre(sidx, statement, HOOK_QUERY, true)).then(Mono.defer(() -> {
                options.raw("txid", this.attemptId);
                return this.queryInternalReactive(sidx, scope, statement, options, span, tximplicit);
            })).doOnNext(result -> {
                this.LOGGER.info(this.attemptId, "q%d async started streaming rows after %dus", sidx, TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start));
                result.metaData().flatMap(metaData -> {
                    this.LOGGER.info(this.attemptId, "q%d async received query status of %s and finished streaming after %dus", sidx, metaData.status(), TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start));
                    if (metaData.status() == QueryStatus.FATAL) {
                        TransactionOperationFailed err = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).build();
                        this.errors.add(err);
                        return Mono.error((Throwable)err);
                    }
                    return Mono.empty();
                }).subscribe();
            }).doOnError(err -> this.logger().warn(this.attemptId, "q%d got error %s while performing query '%s'", sidx, AttemptContextReactive.dbg(err), RedactableArgument.redactUser((Object)statement))).doFinally(ignore -> span.finish());
        });
    }

    Mono<Void> queryInternalPre(int sidx, String statement, String hookPoint, boolean existingErrorCheck) {
        return Mono.defer(() -> {
            boolean expiresSoon;
            if (this.isDone()) {
                return Mono.error((Throwable)this.transactionIsDone());
            }
            if (existingErrorCheck && this.isExistingError()) {
                return Mono.error((Throwable)this.existingError());
            }
            long remaining = this.expiryRemainingMillis();
            boolean bl = expiresSoon = remaining < (long)this.EXPIRY_THRESHOLD;
            if (this.hasExpiredClientSide(hookPoint, Optional.of(statement)) || expiresSoon) {
                this.logger().info(this.attemptId, "transaction has expired in stage '%s' remaining=%d threshold=%d", hookPoint, remaining, this.EXPIRY_THRESHOLD);
                return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).doNotRollbackAttempt().build());
            }
            return Mono.empty();
        });
    }

    CleanupRequest createCleanupRequest() {
        assert (this.state != AttemptStates.NOT_STARTED);
        assert (this.state != AttemptStates.COMPLETED);
        assert (!this.queryMode());
        long transactionElapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.overall.startTimeClient().toNanos());
        return new CleanupRequest(this.attemptId, this.atrId().get(), this.atrCollection().get(), this.state, this.toDocRecords(this.stagedReplaces()), this.toDocRecords(this.stagedRemoves()), this.toDocRecords(this.stagedInserts()), Duration.ZERO, Optional.empty(), transactionElapsedTimeMillis, Optional.of(this.config.durabilityLevel()));
    }

    private List<DocRecord> toDocRecords(List<StagedMutation> mutations) {
        return mutations.stream().map(m -> new DocRecord(m.doc.collection().bucketName(), m.doc.collection().scopeName(), m.doc.collection().name(), m.doc.id())).collect(Collectors.toList());
    }

    protected Mono<Integer> beforeAtrCommit(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrCommitAmbiguityResolution(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrCommit(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocCommitted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocRolledBack(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocCommittedBeforeSavingCAS(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocCommitted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocsCommitted(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocRemoved(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocRemovedPreRetry(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocRemovedPostRetry(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocsRemoved(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrPending(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrPending(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrComplete(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrComplete(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrRolledBack(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterGetComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeRollbackDeleteInserted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterStagedReplaceComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterStagedRemoveComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeStagedInsert(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeStagedRemove(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeStagedReplace(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterStagedInsertComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrAborted(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrAborted(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrRolledBack(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterRollbackReplaceOrRemove(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterRollbackDeleteInserted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeCheckATREntryForBlockingDoc(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocGet(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeGetDocInExistsDuringStagedInsert(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeRemoveStagedInsert(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterRemoveStagedInsert(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Boolean hasExpiredClientSideHook(AttemptContextReactive self, String place, Optional<String> docId) {
        return false;
    }

    protected Mono<Integer> beforeQuery(AttemptContextReactive self, String statement) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterQuery(AttemptContextReactive self, String statement) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeOverwritingStagedInsertRemoval(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    public TransactionLogger logger() {
        return this.LOGGER;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("AttemptContextReactive{");
        sb.append("id=").append(this.attemptId.substring(0, 5));
        sb.append(",state=").append((Object)this.state);
        sb.append(",atr=").append(ATRUtil.getAtrDebug(this.atrCollection, this.atrId));
        sb.append(",staged=").append(this.stagedMutations.stream().map(StagedMutation::toString).collect(Collectors.toList()));
        sb.append('}');
        return sb.toString();
    }
}

