/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.transactions;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.error.CasMismatchException;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DocumentExistsException;
import com.couchbase.client.core.error.DocumentNotFoundException;
import com.couchbase.client.core.error.ErrorCodeAndMessage;
import com.couchbase.client.core.error.FeatureNotAvailableException;
import com.couchbase.client.core.error.TimeoutException;
import com.couchbase.client.core.error.context.ErrorContext;
import com.couchbase.client.core.error.context.QueryErrorContext;
import com.couchbase.client.core.error.context.ReducedKeyValueErrorContext;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.kv.MutationToken;
import com.couchbase.client.core.msg.query.QueryRequest;
import com.couchbase.client.core.node.NodeIdentifier;
import com.couchbase.client.core.retry.reactor.Jitter;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.core.retry.reactor.RetryExhaustedException;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.SDKAccessUtil;
import com.couchbase.client.java.Scope;
import com.couchbase.client.java.codec.JsonSerializer;
import com.couchbase.client.java.codec.RawJsonTranscoder;
import com.couchbase.client.java.codec.Transcoder;
import com.couchbase.client.java.env.ClusterEnvironment;
import com.couchbase.client.java.json.JsonArray;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.InsertOptions;
import com.couchbase.client.java.kv.LookupInOptions;
import com.couchbase.client.java.kv.LookupInResult;
import com.couchbase.client.java.kv.LookupInSpec;
import com.couchbase.client.java.kv.MutateInMacro;
import com.couchbase.client.java.kv.MutateInOptions;
import com.couchbase.client.java.kv.MutateInResult;
import com.couchbase.client.java.kv.MutateInSpec;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.java.kv.RemoveOptions;
import com.couchbase.client.java.kv.StoreSemantics;
import com.couchbase.client.java.query.QueryMetaData;
import com.couchbase.client.java.query.QueryOptions;
import com.couchbase.client.java.query.QueryResult;
import com.couchbase.client.java.query.QueryScanConsistency;
import com.couchbase.client.java.query.QueryStatus;
import com.couchbase.client.java.query.ReactiveQueryResult;
import com.couchbase.transactions.TransactionContext;
import com.couchbase.transactions.TransactionGetResult;
import com.couchbase.transactions.TransactionInsertOptions;
import com.couchbase.transactions.TransactionJsonDocumentStatus;
import com.couchbase.transactions.TransactionQueryOptions;
import com.couchbase.transactions.TransactionReplaceOptions;
import com.couchbase.transactions.TransactionsReactive;
import com.couchbase.transactions.atr.ATRIds;
import com.couchbase.transactions.cleanup.CleanupRequest;
import com.couchbase.transactions.components.ATREntry;
import com.couchbase.transactions.components.ATRUtil;
import com.couchbase.transactions.components.ActiveTransactionRecord;
import com.couchbase.transactions.components.DocRecord;
import com.couchbase.transactions.components.DocumentGetter;
import com.couchbase.transactions.components.DocumentMetadata;
import com.couchbase.transactions.components.DurabilityLevelUtil;
import com.couchbase.transactions.components.SerializationUtil;
import com.couchbase.transactions.config.TransactionConfig;
import com.couchbase.transactions.deferred.TransactionSerializedContext;
import com.couchbase.transactions.error.attempts.ActiveTransactionRecordEntryNotFound;
import com.couchbase.transactions.error.attempts.ActiveTransactionRecordFull;
import com.couchbase.transactions.error.attempts.ActiveTransactionRecordNotFound;
import com.couchbase.transactions.error.external.ForwardCompatibilityFailure;
import com.couchbase.transactions.error.external.PreviousOperationFailed;
import com.couchbase.transactions.error.external.TransactionOperationFailed;
import com.couchbase.transactions.error.internal.AttemptExpired;
import com.couchbase.transactions.error.internal.AttemptNotFoundOnQuery;
import com.couchbase.transactions.error.internal.ErrorClasses;
import com.couchbase.transactions.error.internal.ForwardCompatibilityRequiresRetry;
import com.couchbase.transactions.error.internal.RetryAtrCommit;
import com.couchbase.transactions.error.internal.RetryOperation;
import com.couchbase.transactions.error.internal.TransactionOperationFailedBuilder;
import com.couchbase.transactions.forwards.ForwardCompatibility;
import com.couchbase.transactions.forwards.ForwardCompatibilityStages;
import com.couchbase.transactions.forwards.Supported;
import com.couchbase.transactions.log.IllegalDocumentState;
import com.couchbase.transactions.log.TransactionLogger;
import com.couchbase.transactions.query.QueryAccessor;
import com.couchbase.transactions.support.AttemptStates;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import com.couchbase.transactions.support.StagedMutation;
import com.couchbase.transactions.support.StagedMutationType;
import com.couchbase.transactions.util.DebugUtil;
import com.couchbase.transactions.util.LogDeferThrowable;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

public class AttemptContextReactive {
    private final TransactionConfig config;
    private final ConcurrentLinkedQueue<StagedMutation> stagedMutations = new ConcurrentLinkedQueue();
    private final String attemptId;
    private final TransactionContext overall;
    private final Duration startTimeClient;
    private Optional<Duration> startTimeServer;
    private Optional<String> atrId = Optional.empty();
    private Optional<ReactiveCollection> atrCollection = Optional.empty();
    final TransactionLogger LOGGER;
    private AttemptStates state = AttemptStates.NOT_STARTED;
    private final TransactionsReactive parent;
    private final SpanWrapper attemptSpan;
    private final ConcurrentLinkedQueue<TransactionOperationFailed> errors = new ConcurrentLinkedQueue();
    private boolean expiryOvertimeMode = false;
    @Nullable
    private volatile NodeIdentifier queryTarget = null;
    @Nullable
    private String txid;
    private final AtomicInteger queryStatementIdx = new AtomicInteger(0);
    private volatile boolean isDone = false;
    private int EXPIRY_THRESHOLD = Integer.parseInt(System.getProperty("com.couchbase.transactions.expiryThresholdMs", "10"));
    public static String HOOK_ROLLBACK = "rollback";
    public static String HOOK_GET = "get";
    public static String HOOK_INSERT = "insert";
    public static String HOOK_REPLACE = "replace";
    public static String HOOK_REMOVE = "remove";
    public static String HOOK_BEFORE_COMMIT = "commit";
    public static String HOOK_ABORT_GET_ATR = "abortGetAtr";
    public static String HOOK_ROLLBACK_DOC = "rollbackDoc";
    public static String HOOK_DELETE_INSERTED = "deleteInserted";
    public static String HOOK_CREATE_STAGED_INSERT = "createdStagedInsert";
    public static String HOOK_INSERT_QUERY = "insertQuery";
    public static String HOOK_REMOVE_DOC = "removeDoc";
    public static String HOOK_COMMIT_DOC = "commitDoc";
    public static String HOOK_QUERY = "query";
    public static String HOOK_QUERY_BEGIN_WORK = "queryBeginWork";
    public static String HOOK_QUERY_COMMIT = "queryCommit";
    public static String HOOK_QUERY_ROLLBACK = "queryRollback";
    public static String HOOK_QUERY_KV_GET = "queryKvGet";
    public static String HOOK_QUERY_KV_REPLACE = "queryKvReplace";
    public static String HOOK_QUERY_KV_REMOVE = "queryKvRemove";
    public static String HOOK_QUERY_KV_INSERT = "queryKvInsert";
    public static String HOOK_ATR_COMMIT = "atrCommit";
    public static String HOOK_ATR_COMMIT_AMBIGUITY_RESOLUTION = "atrCommitAmbiguityResolution";
    public static String HOOK_ATR_ABORT = "atrAbort";
    public static String HOOK_ATR_ROLLBACK_COMPLETE = "atrRollbackComplete";
    public static String HOOK_ATR_PENDING = "atrPending";
    public static String HOOK_ATR_COMPLETE = "atrComplete";
    public static Duration DEFAULT_DELAY_RETRYING_OPERATION = Duration.ofMillis(3L);
    private static final reactor.util.retry.Retry RETRY_OPERATION_UNTIL_EXPIRY = Retry.anyOf((Class[])new Class[]{RetryOperation.class}).exponentialBackoff(Duration.of(1L, ChronoUnit.MILLIS), Duration.of(100L, ChronoUnit.MILLIS)).jitter(Jitter.random()).toReactorRetry();
    private static final reactor.util.retry.Retry RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY = Retry.anyOf((Class[])new Class[]{RetryOperation.class}).fixedBackoff(DEFAULT_DELAY_RETRYING_OPERATION).toReactorRetry();
    private final List<MutationToken> finalMutationTokens = new ArrayList<MutationToken>();

    public AttemptContextReactive(TransactionContext overall, TransactionConfig config, String attemptId, TransactionsReactive parent, Optional<SpanWrapper> parentSpan) {
        Objects.requireNonNull(overall);
        Objects.requireNonNull(config);
        Objects.requireNonNull(attemptId);
        Objects.requireNonNull(parent);
        this.LOGGER = overall.LOGGER;
        this.overall = overall;
        this.config = config;
        this.attemptId = attemptId;
        this.startTimeClient = Duration.of(System.nanoTime(), ChronoUnit.MILLIS);
        this.parent = parent;
        this.attemptSpan = SpanWrapper.create(config, "transaction_attempt", parentSpan).withTag("couchbase.transaction_id", overall.transactionId()).withTag("couchbase.attempt_id", attemptId).start();
    }

    private JsonObject dehydrate(boolean includeMutationBody) {
        JsonObject ob = JsonObject.create();
        ob.put("transactionId", this.transactionId());
        ob.put("attemptId", this.attemptId);
        this.atrId.ifPresent(v -> ob.put("atrId", v));
        this.atrCollection.ifPresent(ac -> {
            ob.put("atrBucket", ac.bucketName());
            ob.put("atrScope", ac.scopeName());
            ob.put("atrCollection", ac.name());
        });
        long transactionElapsedTimeMillis = (System.nanoTime() - this.overall.startTimeClient().toNanos()) / 1000000L;
        ob.put("transactionElapsedTimeMillis", transactionElapsedTimeMillis);
        this.startTimeServer.ifPresent(v -> ob.put("startTimeServerMillis", v.toMillis()));
        JsonArray mutations = JsonArray.create();
        this.stagedMutations.forEach(sm -> {
            JsonObject m = JsonObject.create();
            m.put("type", sm.type.toString());
            switch (sm.type) {
                case REMOVE: {
                    break;
                }
                default: {
                    if (!includeMutationBody) break;
                    m.put("content", Base64.getEncoder().encodeToString(sm.content));
                }
            }
            sm.doc.serialize(m);
            mutations.add(m);
        });
        ob.put("mutations", mutations);
        return ob;
    }

    private JsonObject makeQueryTxData() {
        ClusterEnvironment env = this.parent.cleanup().clusterData().cluster().environment();
        JsonObject out = JsonObject.create().put("id", JsonObject.create().put("txn", this.transactionId()).put("atmpt", this.attemptId)).put("state", JsonObject.create().put("timeLeftMs", this.expiryRemainingMillis())).put("config", JsonObject.create().put("operationTimeoutMs", this.config.keyValueTimeout().orElse(env.timeoutConfig().kvDurableTimeout()).toMillis()).put("kvTimeoutMs", this.config.keyValueTimeout().orElse(env.timeoutConfig().kvDurableTimeout()).toMillis()).put("durabilityLevel", this.config.durabilityLevel().name()).put("numAtrs", this.config.numAtrs()));
        JsonArray mutations = JsonArray.create();
        this.stagedMutations.forEach(sm -> mutations.add(JsonObject.create().put("scp", sm.doc.collection().scopeName()).put("coll", sm.doc.collection().name()).put("bkt", sm.doc.collection().bucketName()).put("id", sm.doc.id()).put("cas", Long.toString(sm.doc.cas())).put("type", sm.type.name())));
        out.put("mutations", mutations);
        if (this.atrCollection.isPresent() && this.atrId.isPresent()) {
            out.put("atr", JsonObject.create().put("id", this.atrId.get()).put("bkt", this.atrCollection.get().bucketName()).put("scp", this.atrCollection.get().scopeName()).put("coll", this.atrCollection.get().name()));
        }
        return out;
    }

    public AttemptContextReactive(JsonObject pill, TransactionContext overall, TransactionConfig config, TransactionsReactive parent, Optional<SpanWrapper> parentSpan) {
        this.LOGGER = overall.LOGGER;
        this.overall = overall;
        this.config = config;
        this.attemptId = pill.getString("attemptId");
        this.startTimeClient = Duration.of(System.nanoTime(), ChronoUnit.MILLIS);
        this.atrId = Optional.of(pill.getString("atrId"));
        String atrBucket = pill.getString("atrBucket");
        String atrScope = pill.getString("atrScope");
        String atrCollection = pill.getString("atrCollection");
        this.atrCollection = Optional.of(parent.cleanup().clusterData().getBucketFromName(atrBucket).scope(atrScope).collection(atrCollection));
        this.parent = parent;
        this.state = AttemptStates.PENDING;
        JsonArray mutations = pill.getArray("mutations");
        mutations.iterator().forEachRemaining(v -> {
            JsonObject jo = (JsonObject)v;
            TransactionGetResult doc = TransactionGetResult.createFrom(jo, parent.cleanup().clusterData());
            StagedMutationType type = StagedMutationType.REMOVE;
            switch (jo.getString("type")) {
                case "INSERT": {
                    type = StagedMutationType.INSERT;
                    break;
                }
                case "REPLACE": {
                    type = StagedMutationType.REPLACE;
                }
            }
            switch (type) {
                case REMOVE: {
                    StagedMutation sm = new StagedMutation(doc, null, type, null);
                    this.stagedMutations.add(sm);
                    break;
                }
                default: {
                    byte[] bytes = Base64.getDecoder().decode(jo.getString("content"));
                    StagedMutation sm = new StagedMutation(doc, bytes, type, null);
                    this.stagedMutations.add(sm);
                }
            }
        });
        this.attemptSpan = SpanWrapper.create(config, "transaction_attempt", parentSpan).withTag("couchbase.transaction_id", overall.transactionId()).withTag("couchbase.attempt_id", this.attemptId).start();
    }

    SpanWrapper span() {
        return this.attemptSpan;
    }

    Duration startTimeClient() {
        return this.startTimeClient;
    }

    public String attemptId() {
        return this.attemptId;
    }

    public String transactionId() {
        return this.overall.transactionId();
    }

    TransactionContext overall() {
        return this.overall;
    }

    private List<StagedMutation> stagedReplaces() {
        assert (!this.queryMode());
        return this.stagedMutations.stream().filter(v -> v.type == StagedMutationType.REPLACE).collect(Collectors.toList());
    }

    private List<StagedMutation> stagedRemoves() {
        assert (!this.queryMode());
        return this.stagedMutations.stream().filter(v -> v.type == StagedMutationType.REMOVE).collect(Collectors.toList());
    }

    private List<StagedMutation> stagedInserts() {
        assert (!this.queryMode());
        return this.stagedMutations.stream().filter(v -> v.type == StagedMutationType.INSERT).collect(Collectors.toList());
    }

    private Optional<StagedMutation> checkForOwnWrite(ReactiveCollection collection, String id) {
        assert (!this.queryMode());
        Optional<StagedMutation> ownReplace = this.stagedReplaces().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(id)).findFirst();
        if (ownReplace.isPresent()) {
            return ownReplace;
        }
        Optional<StagedMutation> ownInserted = this.stagedInserts().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(id)).findFirst();
        if (ownInserted.isPresent()) {
            return ownInserted;
        }
        return Optional.empty();
    }

    private Mono<Void> errorIfExpiredAndNotInExpiryOvertimeMode(String stage, Optional<String> docId) {
        if (this.expiryOvertimeMode) {
            this.LOGGER.info(this.attemptId, "not doing expiry check in %s as already in expiry-overtime-mode", stage);
            return Mono.empty();
        }
        if (this.hasExpiredClientSide(stage, docId)) {
            this.LOGGER.info(this.attemptId, "has expired in stage %s", stage);
            return Mono.error((Throwable)new AttemptExpired(this, "Attempt has expired in stage " + stage));
        }
        return Mono.empty();
    }

    private void checkExpiryPreCommitAndSetExpiryOvertimeMode(String stage, Optional<String> docId) {
        if (this.hasExpiredClientSide(stage, docId)) {
            this.LOGGER.info(this.attemptId, "has expired in stage %s, setting expiry-overtime-mode", stage);
            this.expiryOvertimeMode = true;
            throw TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build();
        }
    }

    private TransactionOperationFailed transactionIsDone() {
        return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new IllegalStateException("Cannot perform operations after transaction has been committed or rolled back")).doNotRollbackAttempt().build();
    }

    private TransactionOperationFailed existingError() {
        return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new PreviousOperationFailed()).build();
    }

    public Mono<Optional<TransactionGetResult>> getOptional(ReactiveCollection collection, String id) {
        return this.getInternal(collection, id);
    }

    private Mono<Optional<TransactionGetResult>> getInternal(ReactiveCollection collection, String id) {
        return Mono.defer(() -> {
            if (this.queryMode()) {
                return this.getWithQuery(collection, id);
            }
            return this.getWithKV(collection, id, Optional.empty());
        });
    }

    private Mono<Optional<TransactionGetResult>> getWithKV(ReactiveCollection collection, String id, Optional<String> resolvingMissingATREntry) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapper.create(this.config, collection, "transaction_get", id, this.attemptSpan);
            this.LOGGER.info(this.attemptId, "getting doc %s, resolvingMissingATREntry=%s", DebugUtil.docId(collection, id), resolvingMissingATREntry.orElse("<empty>"));
            if (this.isDone()) {
                return Mono.error((Throwable)this.transactionIsDone());
            }
            if (this.isExistingError()) {
                return Mono.error((Throwable)this.existingError());
            }
            this.checkExpiryPreCommitAndSetExpiryOvertimeMode(HOOK_GET, Optional.of(id));
            Optional<StagedMutation> ownWrite = this.checkForOwnWrite(collection, id);
            if (ownWrite.isPresent()) {
                this.LOGGER.info(this.attemptId, "found own-write of mutated doc %s", DebugUtil.docId(collection, id));
                return Mono.just(Optional.of(TransactionGetResult.createFrom(ownWrite.get().doc, ownWrite.get().content, TransactionJsonDocumentStatus.OWN_WRITE)));
            }
            Optional<TransactionGetResult> ownRemove = this.stagedRemoves().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(id)).findFirst().map(v -> v.doc);
            if (ownRemove.isPresent()) {
                this.LOGGER.info(this.attemptId, "found own-write of removed doc %s", DebugUtil.docId(collection, id));
                return Mono.just(Optional.empty());
            }
            return this.beforeDocGet(this, id).doOnSubscribe(x -> span.start()).then(DocumentGetter.getAsync(this.cluster(), this.LOGGER, collection, this.config, id, this.attemptId, false, span, this.getTranscoder(), resolvingMissingATREntry)).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                this.LOGGER.warn(this.attemptId, "got error while getting doc %s: %s", DebugUtil.docId(collection, id), AttemptContextReactive.dbg(err));
                if (err instanceof ForwardCompatibilityRequiresRetry || err instanceof ForwardCompatibilityFailure) {
                    TransactionOperationFailedBuilder error = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new ForwardCompatibilityFailure());
                    if (err instanceof ForwardCompatibilityRequiresRetry) {
                        error.retryTransaction();
                    }
                    return Mono.error((Throwable)error.build());
                }
                if (ec == ErrorClasses.TRANSACTION_OPERATION_FAILED) {
                    return Mono.error((Throwable)err);
                }
                if (err instanceof ActiveTransactionRecordNotFound) {
                    return this.getWithKV(collection, id, Optional.of(((ActiveTransactionRecordNotFound)err).attemptId()));
                }
                if (err instanceof ActiveTransactionRecordEntryNotFound) {
                    this.LOGGER.info(this.attemptId, "trying to get doc again to see if it's in same state");
                    return this.getWithKV(collection, id, Optional.of(((ActiveTransactionRecordEntryNotFound)err).attemptId()));
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)builder.retryTransaction().build());
                }
                return Mono.error((Throwable)builder.build());
            }).flatMap(v -> {
                long elapsed = span.finish();
                if (v.isPresent()) {
                    this.LOGGER.info(this.attemptId, "completed get of %s in %dms", v.get(), elapsed);
                } else {
                    this.LOGGER.info(this.attemptId, "completed get of %s, could not find, in %dms", DebugUtil.docId(collection, id), elapsed);
                }
                return this.afterGetComplete(this, id).thenReturn(v);
            }).flatMap(doc -> {
                if (doc.isPresent()) {
                    return this.forwardCompatibilityCheck(ForwardCompatibilityStages.GETS, doc.flatMap(v -> v.links().forwardCompatibility())).thenReturn(doc);
                }
                return Mono.just((Object)doc);
            }).doOnError(err -> {
                if (err instanceof TransactionOperationFailed) {
                    this.errors.add((TransactionOperationFailed)err);
                }
            });
        });
    }

    private JsonObject makeTxdata() {
        return JsonObject.create().put("kv", true);
    }

    private Mono<Optional<TransactionGetResult>> getWithQuery(ReactiveCollection collection, String id) {
        return Mono.defer(() -> {
            int sidx = this.queryStatementIdx.getAndIncrement();
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(collection)).add(id);
            return this.queryWrapper(sidx, null, "EXECUTE __get", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_GET, false, true, this.makeTxdata(), params).map(result -> {
                Optional<Object> ret;
                List rows = result.rowsAsObject();
                if (rows.size() == 0) {
                    ret = Optional.empty();
                } else {
                    JsonObject row = (JsonObject)rows.get(0);
                    String scas = row.getString("scas");
                    long cas = Long.parseLong(scas);
                    JsonObject doc = row.getObject("doc");
                    JsonObject txnMeta = row.getObject("txnMeta");
                    ret = Optional.of(new TransactionGetResult(id, doc.toBytes(), cas, collection, null, null, null, this.getTranscoder(), Optional.ofNullable(txnMeta)));
                }
                return ret;
            }).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof DocumentNotFoundException) {
                    return Mono.just(Optional.empty());
                }
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                return Mono.error((Throwable)builder.build());
            }).doOnError(err -> {
                if (err instanceof TransactionOperationFailed) {
                    this.errors.add((TransactionOperationFailed)err);
                }
            });
        });
    }

    public Mono<TransactionGetResult> get(ReactiveCollection collection, String id) {
        return this.getOptional(collection, id).flatMap(doc -> {
            if (doc.isPresent()) {
                return Mono.just(doc.get());
            }
            TransactionOperationFailed err = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_NOT_FOUND).cause((Throwable)new DocumentNotFoundException((ErrorContext)ReducedKeyValueErrorContext.create((String)id))).build();
            this.errors.add(err);
            return Mono.error((Throwable)err);
        });
    }

    boolean hasExpiredClientSide(String place, Optional<String> docId) {
        boolean over = this.overall.hasExpiredClientSide(this.config);
        boolean hook = this.hasExpiredClientSideHook(this, place, docId);
        if (over) {
            this.LOGGER.info(this.attemptId, "expired in %s", place);
        }
        if (hook) {
            this.LOGGER.info(this.attemptId, "fake expiry in %s", place);
        }
        return over || hook;
    }

    List<MutationToken> finalMutationTokens() {
        return this.finalMutationTokens;
    }

    public Optional<String> atrId() {
        return this.atrId;
    }

    public Optional<ReactiveCollection> atrCollection() {
        return this.atrCollection;
    }

    List<String> stagedReplaceIds() {
        return this.stagedReplaces().stream().map(v -> v.doc.id()).collect(Collectors.toList());
    }

    List<String> stagedInsertIds() {
        return this.stagedInserts().stream().map(v -> v.doc.id()).collect(Collectors.toList());
    }

    List<String> stagedRemoveIds() {
        return this.stagedRemoves().stream().map(v -> v.doc.id()).collect(Collectors.toList());
    }

    public Mono<TransactionGetResult> insert(ReactiveCollection collection, String id, Object content) {
        return this.insert(collection, id, content, TransactionInsertOptions.DEFAULT);
    }

    private ReactiveCollection getAtrCollection(ReactiveCollection docCollection) {
        if (this.config.metadataCollection().isPresent()) {
            return this.config.metadataCollection().get().reactive();
        }
        return this.parent.cleanup().clusterData().getBucketFromName(docCollection.bucketName()).defaultCollection();
    }

    private static String makeKeyspace(ReactiveCollection collection) {
        return String.format("default:`%s`.`%s`.`%s`", collection.bucketName(), collection.scopeName(), collection.name());
    }

    public Mono<TransactionGetResult> insert(ReactiveCollection collection, String id, Object content, TransactionInsertOptions options) {
        TransactionInsertOptions.BuiltOptions built = options.build();
        SpanWrapper span = SpanWrapper.create(this.config, collection, HOOK_INSERT, id, this.attemptSpan);
        if (this.queryMode()) {
            return this.insertWithQuery(collection, id, content);
        }
        return Mono.defer(() -> {
            if (this.isDone()) {
                return Mono.error((Throwable)this.transactionIsDone());
            }
            if (this.isExistingError()) {
                return Mono.error((Throwable)this.existingError());
            }
            Optional<StagedMutation> ownWrite = this.checkForOwnWrite(collection, id);
            if (ownWrite.isPresent()) {
                this.LOGGER.info(this.attemptId, "found own-write of mutated doc %s on insert", DebugUtil.docId(collection, id));
                return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new IllegalStateException("Cannot insert a document that has already been mutated inside the same transaction")).build());
            }
            this.checkExpiryPreCommitAndSetExpiryOvertimeMode(HOOK_INSERT, Optional.of(id));
            this.initAtrIfNeeded(collection, id);
            if (this.state == AttemptStates.NOT_STARTED) {
                return this.atrPending(this.atrCollection.get(), span);
            }
            return Mono.empty();
        }).then(this.createStagedInsert(collection, id, content, span, built, Optional.empty())).doFinally(v -> span.finish()).doOnSubscribe(s -> span.start()).doOnError(err -> {
            if (err instanceof TransactionOperationFailed) {
                this.errors.add((TransactionOperationFailed)err);
            }
        });
    }

    private Mono<TransactionGetResult> insertWithQuery(ReactiveCollection collection, String id, Object content) {
        return Mono.defer(() -> {
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            JsonObject reencoded = JsonObject.fromJson((byte[])encoded.encoded());
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(collection)).add(id).add(reencoded).add(JsonObject.create());
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, null, "EXECUTE __insert", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_INSERT, false, true, this.makeTxdata(), params).map(result -> {
                JsonObject row = (JsonObject)result.rowsAsObject().get(0);
                String scas = row.getString("scas");
                long cas = Long.parseLong(scas);
                return TransactionGetResult.createFromInsert(collection, id, content.toString().getBytes(StandardCharsets.UTF_8), this.transactionId(), this.attemptId, null, null, null, null, cas, this.getTranscoder());
            }).onErrorResume(err -> {
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeModeAndFail((Throwable)err, HOOK_INSERT_QUERY, ec);
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)out.retryTransaction().build());
                }
                if (ec == ErrorClasses.FAIL_DOC_ALREADY_EXISTS) {
                    return Mono.error((Throwable)err);
                }
                return Mono.error((Throwable)out.build());
            }).doOnError(err -> {
                if (err instanceof TransactionOperationFailed) {
                    this.errors.add((TransactionOperationFailed)err);
                }
            });
        });
    }

    protected String randomAtrIdForVbucket(AttemptContextReactive self, Integer vbucketIdForDoc, int numAtrs) {
        return ATRIds.randomAtrIdForVbucket(vbucketIdForDoc, numAtrs);
    }

    private void initAtrIfNeeded(ReactiveCollection collection, String id) {
        if (!this.atrId.isPresent()) {
            long vbucketIdForDoc = ATRIds.vbucketForKey(id, 1024);
            String atr = this.randomAtrIdForVbucket(this, (int)vbucketIdForDoc, this.config.numAtrs());
            this.atrId = Optional.of(atr);
            this.atrCollection = this.config.metadataCollection().isPresent() ? Optional.of(this.config.metadataCollection().get().reactive()) : Optional.of(this.getAtrCollection(collection));
            this.LOGGER.info(this.attemptId, "First mutated doc in txn is '%s' on vbucket %d, so using atr %s", DebugUtil.docId(collection, id), vbucketIdForDoc, atr);
        }
    }

    public Mono<TransactionGetResult> replace(TransactionGetResult doc, Object content) {
        return this.replace(doc, content, TransactionReplaceOptions.DEFAULT);
    }

    public Mono<TransactionGetResult> replace(TransactionGetResult doc, Object content, TransactionReplaceOptions options) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapper.create(this.config, doc, HOOK_REPLACE, this.attemptSpan);
            return Mono.defer(() -> {
                this.LOGGER.info(this.attemptId, "replace doc %s", doc);
                if (this.isDone()) {
                    return Mono.error((Throwable)this.transactionIsDone());
                }
                if (this.isExistingError()) {
                    return Mono.error((Throwable)this.existingError());
                }
                this.checkExpiryPreCommitAndSetExpiryOvertimeMode(HOOK_REPLACE, Optional.of(doc.id()));
                if (this.queryMode()) {
                    return this.replaceWithQuery(doc, content);
                }
                return this.checkAndHandleBlockingTxn(doc, span, ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_REPLACING).then(Mono.defer(() -> {
                    this.initAtrIfNeeded(doc.collection(), doc.id());
                    if (this.state == AttemptStates.NOT_STARTED) {
                        return this.atrPending(this.atrCollection.get(), span);
                    }
                    return Mono.empty();
                })).then(this.createStagedReplace(doc, content, options.build(), span, doc.links().isDeleted()));
            }).doOnSubscribe(s -> span.start()).doFinally(s -> span.finish());
        }).doOnError(err -> {
            if (err instanceof TransactionOperationFailed) {
                this.errors.add((TransactionOperationFailed)err);
            }
        });
    }

    private Mono<TransactionGetResult> replaceWithQuery(TransactionGetResult doc, Object content) {
        return Mono.defer(() -> {
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            JsonObject reencoded = JsonObject.fromJson((byte[])encoded.encoded());
            JsonObject txData = this.makeTxdata().put("scas", Long.toString(doc.cas()));
            doc.txnMeta().ifPresent(v -> txData.put("txnMeta", v));
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(doc.collection())).add(doc.id()).add(reencoded).add(JsonObject.create());
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, null, "EXECUTE __update", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_REPLACE, false, true, txData, params).map(result -> {
                List rows = result.rowsAsObject();
                JsonObject row = (JsonObject)rows.get(0);
                String scas = row.getString("scas");
                long cas = Long.parseLong(scas);
                JsonObject updatedDoc = row.getObject("doc");
                return new TransactionGetResult(doc.id(), updatedDoc.toBytes(), cas, doc.collection(), null, null, null, this.getTranscoder(), Optional.empty());
            }).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    return Mono.error((Throwable)builder.retryTransaction().build());
                }
                return Mono.error((Throwable)builder.build());
            }).doOnError(err -> {
                if (err instanceof TransactionOperationFailed) {
                    this.errors.add((TransactionOperationFailed)err);
                }
            });
        });
    }

    private Mono<Void> removeWithQuery(TransactionGetResult doc) {
        return Mono.defer(() -> {
            JsonObject txData = this.makeTxdata().put("scas", Long.toString(doc.cas()));
            doc.txnMeta().ifPresent(v -> txData.put("txnMeta", v));
            JsonArray params = JsonArray.create().add(AttemptContextReactive.makeKeyspace(doc.collection())).add(doc.id()).add(JsonObject.create());
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, null, "EXECUTE __delete", QueryOptions.queryOptions().parameters(params), HOOK_QUERY_KV_REMOVE, false, true, txData, params).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof TransactionOperationFailed) {
                    return Mono.error((Throwable)err);
                }
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    return Mono.error((Throwable)builder.retryTransaction().build());
                }
                return Mono.error((Throwable)builder.build());
            }).doOnError(err -> {
                if (err instanceof TransactionOperationFailed) {
                    this.errors.add((TransactionOperationFailed)err);
                }
            });
        });
    }

    private Mono<Void> forwardCompatibilityCheck(ForwardCompatibilityStages stage, Optional<ForwardCompatibility> fc) {
        return ForwardCompatibility.check(stage, fc, this.logger(), Supported.SUPPORTED).onErrorResume(err -> {
            TransactionOperationFailedBuilder error = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new ForwardCompatibilityFailure());
            if (err instanceof ForwardCompatibilityRequiresRetry) {
                error.retryTransaction();
            }
            return Mono.error((Throwable)error.build());
        });
    }

    private Mono<Void> checkATREntryForBlockingDocInternal(TransactionGetResult doc, ReactiveCollection collection, SpanWrapper span) {
        return Mono.fromRunnable(() -> this.checkExpiryPreCommitAndSetExpiryOvertimeMode("checkATREntryForBlockingDoc", Optional.empty())).then(this.beforeCheckATREntryForBlockingDoc(this, doc.links().atrId().get())).then(ActiveTransactionRecord.findEntryForTransaction(collection, doc.links().atrId().get(), doc.links().stagedAttemptId().get(), this.config).flatMap(atrEntry -> {
            if (atrEntry.isPresent()) {
                ATREntry ae = (ATREntry)atrEntry.get();
                this.LOGGER.info(this.attemptId, "fetched ATR entry for blocking txn: hasExpired=%s entry=%s", ae.hasExpired(), ae);
                return this.forwardCompatibilityCheck(ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_READING_ATR, ae.forwardCompatibility()).then(Mono.defer(() -> {
                    if (ae.hasExpired()) {
                        return Mono.empty();
                    }
                    switch (ae.state()) {
                        case COMPLETED: 
                        case ROLLED_BACK: {
                            this.LOGGER.info(this.attemptId, "ATR entry state of %s indicates we can proceed to overwrite", new Object[]{((ATREntry)atrEntry.get()).state()});
                            return Mono.empty();
                        }
                    }
                    return Mono.error((Throwable)new RetryOperation());
                }));
            }
            this.LOGGER.info(this.attemptId, "blocking txn %s's entry has been removed indicating the txn expired, so proceeding to overwrite", doc.links().stagedAttemptId().get());
            return Mono.empty();
        })).retryWhen(Retry.anyOf((Class[])new Class[]{RetryOperation.class}).exponentialBackoff(Duration.ofMillis(50L), Duration.ofMillis(500L)).timeout(Duration.ofSeconds(1L)).toReactorRetry()).onErrorResume(err -> {
            if (err instanceof RetryExhaustedException) {
                this.LOGGER.info(this.attemptId, "still blocked by a valid transaction, retrying to unlock documents");
            } else {
                this.LOGGER.warn(this.attemptId, "got error in checkATREntryForBlockingDoc: %s", AttemptContextReactive.dbg(err));
            }
            return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_WRITE_WRITE_CONFLICT).cause((Throwable)err).retryTransaction().build());
        }).then();
    }

    private Mono<Void> checkATREntryForBlockingDoc(TransactionGetResult doc, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapper.create(this.config, doc, "transaction_check_atr_blocking", this.attemptSpan);
            ReactiveCollection collection = this.parent.cleanup().clusterData().cluster().bucket(doc.links().atrBucketName().get()).scope(doc.links().atrScopeName().get()).collection(doc.links().atrCollectionName().get()).reactive();
            return this.checkATREntryForBlockingDocInternal(doc, collection, span).doOnSubscribe(s -> span.start()).doFinally(s -> span.finish());
        });
    }

    private RedactableArgument getAtrDebug(ReactiveCollection collection, Optional<String> atrId) {
        return ATRUtil.getAtrDebug(collection, atrId);
    }

    private RedactableArgument getAtrDebug(Optional<ReactiveCollection> collection, Optional<String> atrId) {
        return ATRUtil.getAtrDebug(collection, atrId);
    }

    private RemoveOptions wrap(RemoveOptions opts, SpanWrapper pspan, Core core) {
        return OptionsWrapperUtil.wrap(opts, pspan, this.config, this.overall.perTransactionConfig(), core);
    }

    private MutateInOptions wrap(MutateInOptions opts, SpanWrapper pspan, Core core) {
        return OptionsWrapperUtil.wrap(opts, pspan, this.config, this.overall.perTransactionConfig(), core);
    }

    private MutateInOptions wrap(SpanWrapper pspan, Core core) {
        return OptionsWrapperUtil.wrap(pspan, this.config, this.overall.perTransactionConfig(), core);
    }

    private long expiryRemainingMillis() {
        long nowNanos = System.nanoTime();
        long expiredMillis = this.overall.timeSinceStartOfTransactionsMillis(nowNanos);
        long remainingMillis = this.config.transactionExpirationTime().toMillis() - expiredMillis;
        return Math.max(Math.min(remainingMillis, this.config.transactionExpirationTime().toMillis()), 0L);
    }

    private Mono<MutateInResult> atrPending(ReactiveCollection collection, SpanWrapper pspan) {
        SpanWrapper span = SpanWrapper.createATROp(this.config, this.atrCollection, this.atrId, HOOK_ATR_PENDING, this.attemptSpan);
        String prefix = "attempts." + this.attemptId;
        if (!this.atrId.isPresent()) {
            return Mono.error((Throwable)new IllegalStateException("atrId not present"));
        }
        return Mono.defer(() -> {
            span.start();
            this.LOGGER.info(this.attemptId, "about to set ATR %s to Pending", this.getAtrDebug(collection, this.atrId));
            return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_PENDING, Optional.empty());
        }).then(this.beforeAtrPending(this)).then(collection.mutateIn(this.atrId.get(), Arrays.asList(MutateInSpec.insert((String)(prefix + "." + "tid"), (Object)this.transactionId()).xattr().createPath(), MutateInSpec.insert((String)(prefix + "." + "st"), (Object)AttemptStates.PENDING.name()).xattr(), MutateInSpec.insert((String)(prefix + "." + "tst"), (Object)MutateInMacro.CAS), MutateInSpec.insert((String)(prefix + "." + "exp"), (Object)this.expiryRemainingMillis()).xattr(), MutateInSpec.insert((String)(prefix + "." + "d"), (Object)DurabilityLevelUtil.convertDurabilityLevel(this.config.durabilityLevel())).xattr(), MutateInSpec.replace((String)"", (Object)new byte[]{0})), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("atrPending"))).storeSemantics(StoreSemantics.UPSERT), pspan, collection.core()))).flatMap(v -> this.afterAtrPending(this).map(x -> v)).onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
            this.LOGGER.info(this.attemptId, "error while setting ATR %s to Pending: %s", this.getAtrDebug(collection, this.atrId), AttemptContextReactive.dbg(err));
            if (this.expiryOvertimeMode) {
                return this.mapErrorInOvertimeToExpired(HOOK_ATR_PENDING, (Throwable)err, ec, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
            }
            if (ec == ErrorClasses.FAIL_EXPIRY) {
                return this.setExpiryOvertimeModeAndFail((Throwable)err, HOOK_ATR_PENDING, ec);
            }
            if (ec == ErrorClasses.FAIL_ATR_FULL) {
                return Mono.error((Throwable)out.cause(new ActiveTransactionRecordFull(this, (Throwable)err)).build());
            }
            if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                this.LOGGER.info(this.attemptId, "retrying the op on %s to resolve ambiguity", new Object[]{ec});
                return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.atrPending(collection, pspan));
            }
            if (ec == ErrorClasses.FAIL_PATH_ALREADY_EXISTS) {
                this.LOGGER.info(this.attemptId, "assuming this is caused by resolved ambiguity, and proceeding as though successful", new Object[]{ec});
                return Mono.empty();
            }
            if (ec == ErrorClasses.FAIL_TRANSIENT) {
                this.LOGGER.info(this.attemptId, "transient error likely to be solved by retry", new Object[]{ec});
                return Mono.error((Throwable)out.retryTransaction().build());
            }
            if (ec == ErrorClasses.FAIL_HARD) {
                return Mono.error((Throwable)out.doNotRollbackAttempt().build());
            }
            return Mono.error((Throwable)out.build());
        }).doOnNext(v -> {
            long elapsed = span.finish();
            this.LOGGER.info(this.attemptId, "set ATR %s to Pending in %dms, got CAS (start time) %s", this.getAtrDebug(collection, this.atrId), elapsed, this.dbg((MutationResult)v));
            this.startTimeServer = Optional.of(Duration.ofNanos(v.cas()));
            this.state = AttemptStates.PENDING;
        });
    }

    private Transcoder getTranscoder() {
        return this.parent.cleanup().clusterData().cluster().environment().transcoder();
    }

    private Mono<TransactionGetResult> createStagedReplace(TransactionGetResult doc, Object content, TransactionReplaceOptions.BuiltOptions options, SpanWrapper pspan, boolean accessDeleted) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapper.create(this.config, doc, "transaction_staged_replace", this.attemptSpan);
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            List<MutateInSpec> ops = this.createMutationOps("replace", doc);
            ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.op.stgd", (Object)encoded.encoded()).xattr());
            return this.beforeStagedReplace(this, doc.id()).then(doc.collection().mutateIn(doc.id(), ops, this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().accessDeleted(accessDeleted).clientContext(OptionsWrapperUtil.createClientContext("createStagedReplace"))).cas(doc.cas()), pspan, doc.collection().core()))).doOnSubscribe(v -> {
                this.LOGGER.info(this.attemptId, "about to replace doc %s with cas %d, accessDeleted=%s", DebugUtil.docId(doc), doc.cas(), accessDeleted);
                span.start();
            }).flatMap(result -> this.afterStagedReplaceComplete(this, doc.id()).map(x -> result)).doOnNext(updatedDoc -> {
                long elapsed = span.finish();
                doc.cas(updatedDoc.cas());
                this.LOGGER.info(this.attemptId, "replaced doc %s got %s, in %dms", DebugUtil.docId(doc), this.dbg((MutationResult)updatedDoc), elapsed);
                Optional<StagedMutation> overwritingInsert = this.findStagedInsert(doc);
                Optional<StagedMutation> overwritingReplace = this.findStagedReplace(doc);
                overwritingReplace.ifPresent(stagedMutation -> {
                    this.LOGGER.info(this.attemptId, "doc %s is being replaced after being replaced in the same txn", DebugUtil.docId(doc));
                    this.stagedMutations.remove(stagedMutation);
                });
                if (overwritingInsert.isPresent()) {
                    this.LOGGER.info(this.attemptId, "doc %s is being replaced after being inserted in the same txn", DebugUtil.docId(doc));
                    this.stagedMutations.remove(overwritingInsert.get());
                    this.stagedMutations.add(new StagedMutation(doc, encoded.encoded(), StagedMutationType.INSERT, (MutateInResult)updatedDoc));
                } else {
                    this.stagedMutations.add(new StagedMutation(doc, encoded.encoded(), StagedMutationType.REPLACE, (MutateInResult)updatedDoc));
                }
            }).thenReturn((Object)doc).onErrorResume(err -> this.handleErrorOnStagedMutation("replacing", doc, (Throwable)err).thenReturn((Object)doc));
        });
    }

    private List<MutateInSpec> createMutationOps(String op, TransactionGetResult doc) {
        ArrayList<MutateInSpec> ops = new ArrayList<MutateInSpec>();
        ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.id.txn", (Object)this.transactionId()).xattr().createPath());
        ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.id.atmpt", (Object)this.attemptId).xattr().createPath());
        ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.atr.id", (Object)this.atrId.get()).xattr().createPath());
        ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.atr.bkt", (Object)this.atrCollection.get().bucketName()).xattr());
        ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.atr.scp", (Object)this.atrCollection.get().scopeName()).xattr());
        ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.atr.coll", (Object)this.atrCollection.get().name()).xattr());
        ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.op.type", (Object)op).xattr().createPath());
        ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.op.crc32", (Object)MutateInMacro.VALUE_CRC_32C).xattr());
        doc.documentMetadata().flatMap(DocumentMetadata::cas).ifPresent(v -> ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.restore.CAS", (Object)v).xattr().createPath()));
        doc.documentMetadata().flatMap(DocumentMetadata::exptime).ifPresent(v -> ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.restore.exptime", (Object)v).xattr().createPath()));
        doc.documentMetadata().flatMap(DocumentMetadata::revid).ifPresent(v -> ops.add((MutateInSpec)MutateInSpec.upsert((String)"txn.restore.revid", (Object)v).xattr().createPath()));
        return ops;
    }

    private Mono<Void> createStagedRemove(TransactionGetResult doc, SpanWrapper pspan, boolean accessDeleted) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapper.create(this.config, doc, "transaction_staged_remove", this.attemptSpan);
            List<MutateInSpec> ops = this.createMutationOps("remove", doc);
            return this.beforeStagedRemove(this, doc.id()).doOnSubscribe(x -> {
                this.LOGGER.info(this.attemptId, "about to remove doc %s with cas %d", DebugUtil.docId(doc), doc.cas());
                span.start();
            }).then(doc.collection().mutateIn(doc.id(), ops, this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().accessDeleted(accessDeleted).clientContext(OptionsWrapperUtil.createClientContext("createdStagedRemove"))).cas(doc.cas()), pspan, doc.collection().core()))).flatMap(updatedDoc -> this.afterStagedRemoveComplete(this, doc.id()).thenReturn(updatedDoc)).doOnNext(updatedDoc -> {
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "staged remove of doc %s got %s, in %dms", DebugUtil.docId(doc), this.dbg((MutationResult)updatedDoc), elapsed);
                doc.cas(updatedDoc.cas());
                this.stagedMutations.add(new StagedMutation(doc, null, StagedMutationType.REMOVE, (MutateInResult)updatedDoc));
            }).then().onErrorResume(err -> this.handleErrorOnStagedMutation("removing", doc, (Throwable)err));
        });
    }

    private Mono<Void> handleErrorOnStagedMutation(String stage, TransactionGetResult doc, Throwable err) {
        ErrorClasses ec = ErrorClasses.classify(err);
        TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause(err);
        this.LOGGER.info(this.attemptId, "error while %s doc %s: %s", stage, DebugUtil.docId(doc), AttemptContextReactive.dbg(err));
        if (this.expiryOvertimeMode) {
            this.LOGGER.warn(this.attemptId, "should not reach here in expiryOvertimeMode");
        }
        if (ec == ErrorClasses.FAIL_EXPIRY) {
            return this.setExpiryOvertimeModeAndFail(err, stage, ec);
        }
        if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
            return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).retryTransaction().build());
        }
        if (ec == ErrorClasses.FAIL_AMBIGUOUS || ec == ErrorClasses.FAIL_TRANSIENT) {
            return Mono.error((Throwable)out.retryTransaction().build());
        }
        if (ec == ErrorClasses.FAIL_HARD) {
            return Mono.error((Throwable)out.doNotRollbackAttempt().build());
        }
        return Mono.error((Throwable)out.build());
    }

    private Optional<StagedMutation> findStagedInsert(TransactionGetResult doc) {
        ReactiveCollection collection = doc.collection();
        Optional<StagedMutation> overwritingInsert = this.stagedInserts().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(doc.id())).findFirst();
        return overwritingInsert;
    }

    private Optional<StagedMutation> findStagedReplace(TransactionGetResult doc) {
        ReactiveCollection collection = doc.collection();
        return this.stagedReplaces().stream().filter(v -> v.doc.collection().bucketName().equals(collection.bucketName()) && v.doc.collection().scopeName().equals(collection.scopeName()) && v.doc.collection().name().equals(collection.name()) && v.doc.id().equals(doc.id())).findFirst();
    }

    private static LogDeferThrowable dbg(Throwable err) {
        return DebugUtil.dbg(err);
    }

    private Mono<TransactionGetResult> handleDocExistsDuringStagedInsert(ReactiveCollection collection, String id, Object content, SpanWrapper pspan, TransactionInsertOptions.BuiltOptions options) {
        String bp = "DocExists on " + DebugUtil.docId(collection, id) + ": ";
        return this.beforeGetDocInExistsDuringStagedInsert(this, id).then(DocumentGetter.justGetDoc(collection, this.config, id, pspan, this.getTranscoder(), true)).doOnSubscribe(x -> this.LOGGER.info(this.attemptId, "%s getting doc", bp)).onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            TransactionOperationFailedBuilder e = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
            this.LOGGER.warn(this.attemptId, "%s got error while getting doc: %s", bp, AttemptContextReactive.dbg(err));
            if (ec == ErrorClasses.FAIL_TRANSIENT || ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                e.retryTransaction();
            }
            return Mono.error((Throwable)e.build());
        }).flatMap(v -> {
            if (v.isPresent()) {
                Tuple2 results = (Tuple2)v.get();
                TransactionGetResult r = (TransactionGetResult)results.getT1();
                LookupInResult lir = (LookupInResult)results.getT2();
                this.LOGGER.info(this.attemptId, "%s doc %s exists inTransaction=%s isDeleted=%s", bp, DebugUtil.docId(collection, id), r.links(), lir.isDeleted());
                return this.forwardCompatibilityCheck(ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_INSERTING_GET, r.links().forwardCompatibility()).then(Mono.defer(() -> {
                    if (lir.isDeleted() && !r.links().isDocumentInTransaction()) {
                        this.LOGGER.info(this.attemptId, "%s doc %s is a regular tombstone without txn metadata, proceeding to overwrite", bp, DebugUtil.docId(collection, id));
                        return this.createStagedInsert(collection, id, content, pspan, options, Optional.of(r.cas()));
                    }
                    if (!r.links().isDocumentInTransaction()) {
                        this.LOGGER.info(this.attemptId, "%s doc %s exists but is not in txn, raising DocumentAlreadyExistsException", bp, DebugUtil.docId(collection, id));
                        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_ALREADY_EXISTS).cause((Throwable)new DocumentExistsException((ErrorContext)ReducedKeyValueErrorContext.create((String)id))).build());
                    }
                    if (!r.links().op().get().equals("insert")) {
                        this.LOGGER.info(this.attemptId, "%s doc %s is in a txn but is not a staged insert, raising DocumentAlreadyExistsException", bp, DebugUtil.docId(collection, id));
                        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_ALREADY_EXISTS).cause((Throwable)new DocumentExistsException((ErrorContext)ReducedKeyValueErrorContext.create((String)id))).build());
                    }
                    return this.checkAndHandleBlockingTxn(r, pspan, ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_INSERTING).then(this.overwriteStagedInsert(collection, id, content, pspan, options, bp, r, lir));
                }));
            }
            this.LOGGER.info(this.attemptId, "%s completed get of %s, could not find, throwing to retry txn which should succeed now", bp, DebugUtil.docId(collection, id));
            return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_DOC_NOT_FOUND).retryTransaction().build());
        });
    }

    private Mono<TransactionGetResult> overwriteStagedInsert(ReactiveCollection collection, String id, Object content, SpanWrapper pspan, TransactionInsertOptions.BuiltOptions options, String bp, TransactionGetResult r, LookupInResult lir) {
        return Mono.defer(() -> {
            assert (r.links().isDocumentInTransaction());
            assert (r.links().op().get().equals("insert"));
            if (lir.isDeleted()) {
                return this.createStagedInsert(collection, id, content, pspan, options, Optional.of(r.cas()));
            }
            this.LOGGER.info(this.attemptId, "%s removing %s as it's a protocol 1.0 staged insert", bp, DebugUtil.docId(collection, id));
            return this.beforeOverwritingStagedInsertRemoval(this, id).then(collection.remove(id)).onErrorResume(err -> {
                this.LOGGER.warn(this.attemptId, "%s hit error %s while removing %s", bp, DebugUtil.dbg(err), DebugUtil.docId(collection, id));
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_CAS_MISMATCH || ec == ErrorClasses.FAIL_TRANSIENT) {
                    out.retryTransaction();
                }
                return Mono.error((Throwable)out.build());
            }).then(this.createStagedInsert(collection, id, content, pspan, options, Optional.empty()));
        });
    }

    private Mono<TransactionGetResult> createStagedInsert(ReactiveCollection collection, String id, Object content, SpanWrapper pspan, TransactionInsertOptions.BuiltOptions options, Optional<Long> cas) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapper.create(this.config, HOOK_CREATE_STAGED_INSERT, this.attemptSpan).withTag("couchbase.bucket_name", collection.bucketName()).withTag("couchbase.collection_name", collection.name()).withTag("couchbase.document_key", id);
            Transcoder.EncodedValue encoded = this.getTranscoder().encode(content);
            return Mono.defer(() -> {
                span.start();
                this.LOGGER.info(this.attemptId, "about to insert staged doc %s as shadow document, cas=%s", DebugUtil.docId(collection, id), cas);
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_CREATE_STAGED_INSERT, Optional.of(id));
            }).then(this.beforeStagedInsert(this, id)).then(collection.mutateIn(id, Arrays.asList(MutateInSpec.upsert((String)"txn.id.txn", (Object)this.transactionId()).xattr().createPath(), MutateInSpec.upsert((String)"txn.id.atmpt", (Object)this.attemptId).xattr().createPath(), MutateInSpec.upsert((String)"txn.atr.id", (Object)this.atrId.get()).xattr().createPath(), MutateInSpec.upsert((String)"txn.op.stgd", (Object)encoded.encoded()).xattr().createPath(), MutateInSpec.upsert((String)"txn.atr.bkt", (Object)this.atrCollection.get().bucketName()).xattr(), MutateInSpec.upsert((String)"txn.atr.scp", (Object)this.atrCollection.get().scopeName()).xattr(), MutateInSpec.upsert((String)"txn.atr.coll", (Object)this.atrCollection.get().name()).xattr(), MutateInSpec.upsert((String)"txn.op.type", (Object)"insert").xattr(), MutateInSpec.upsert((String)"txn.op.crc32", (Object)MutateInMacro.VALUE_CRC_32C).xattr()), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("createStagedInsert"))).accessDeleted(true).createAsDeleted(true).cas(cas.orElse(0L).longValue()).storeSemantics(cas.isPresent() ? StoreSemantics.REPLACE : StoreSemantics.INSERT), pspan, collection.core()))).flatMap(updatedDoc -> this.afterStagedInsertComplete(this, id).thenReturn(updatedDoc)).doOnNext(updatedDoc -> {
                long elapsed = span.finish();
                this.LOGGER.info(this.attemptId, "inserted doc %s got %s, in %dms", DebugUtil.docId(collection, id), this.dbg((MutationResult)updatedDoc), elapsed);
            }).map(updatedDoc -> {
                TransactionGetResult out = TransactionGetResult.createFromInsert(collection, id, encoded.encoded(), this.transactionId(), this.attemptId, this.atrId.get(), this.atrCollection.get().bucketName(), this.atrCollection.get().scopeName(), this.atrCollection.get().name(), updatedDoc.cas(), this.getTranscoder());
                this.stagedMutations.add(new StagedMutation(out, encoded.encoded(), StagedMutationType.INSERT, (MutateInResult)updatedDoc));
                return out;
            }).onErrorResume(err -> {
                this.LOGGER.info(this.attemptId, "got err while staging insert of %s: %s", DebugUtil.docId(collection, id), AttemptContextReactive.dbg(err));
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder out = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                if (err instanceof FeatureNotAvailableException) {
                    return Mono.error((Throwable)out.build());
                }
                if (this.expiryOvertimeMode) {
                    return this.mapErrorInOvertimeToExpired(HOOK_CREATE_STAGED_INSERT, (Throwable)err, ec, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                }
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    return this.setExpiryOvertimeModeAndFail((Throwable)err, HOOK_CREATE_STAGED_INSERT, ec);
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.createStagedInsert(collection, id, content, span, options, cas));
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    return Mono.error((Throwable)out.retryTransaction().build());
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    return Mono.error((Throwable)out.doNotRollbackAttempt().build());
                }
                if (ec == ErrorClasses.FAIL_DOC_ALREADY_EXISTS || ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                    return this.handleDocExistsDuringStagedInsert(collection, id, content, pspan, options);
                }
                return Mono.error((Throwable)out.build());
            });
        });
    }

    public Mono<Void> remove(TransactionGetResult doc) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapper.create(this.config, doc, HOOK_REMOVE, this.attemptSpan);
            if (this.isDone()) {
                return Mono.error((Throwable)this.transactionIsDone());
            }
            if (this.isExistingError()) {
                return Mono.error((Throwable)this.existingError());
            }
            this.checkExpiryPreCommitAndSetExpiryOvertimeMode(HOOK_REMOVE, Optional.of(doc.id()));
            if (this.queryMode()) {
                return this.removeWithQuery(doc);
            }
            Optional<StagedMutation> overwritingInsert = this.findStagedInsert(doc);
            if (overwritingInsert.isPresent()) {
                return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new IllegalStateException("doc " + DebugUtil.docId(doc) + " is being removed after being inserted in the same txn: this is an application error")).build());
            }
            return this.checkAndHandleBlockingTxn(doc, span, ForwardCompatibilityStages.WRITE_WRITE_CONFLICT_REMOVING).then(Mono.defer(() -> {
                this.initAtrIfNeeded(doc.collection(), doc.id());
                if (this.state == AttemptStates.NOT_STARTED) {
                    return this.atrPending(this.atrCollection.get(), span);
                }
                return Mono.empty();
            })).then(this.createStagedRemove(doc, span, doc.links().isDeleted())).doOnSubscribe(v -> {
                span.start();
                this.LOGGER.info(this.attemptId, "remove doc %s", DebugUtil.docId(doc));
            }).doFinally(v -> span.finish()).doOnError(err -> {
                if (err instanceof TransactionOperationFailed) {
                    this.errors.add((TransactionOperationFailed)err);
                }
            });
        });
    }

    private Mono<Void> checkAndHandleBlockingTxn(TransactionGetResult doc, SpanWrapper pspan, ForwardCompatibilityStages stage) {
        if (doc.links().hasStagedWrite()) {
            if (doc.links().stagedTransactionId().get().equals(this.transactionId())) {
                this.LOGGER.info(this.attemptId, "doc %s has been written by this transaction, ok to continue", DebugUtil.docId(doc));
                return Mono.empty();
            }
            if (doc.links().atrId().isPresent() && doc.links().atrBucketName().isPresent()) {
                this.LOGGER.info(this.attemptId, "doc %s is in another txn %s, checking ATR entry %s/%s/%s to see if blocked", DebugUtil.docId(doc), doc.links().stagedAttemptId().get(), doc.links().atrBucketName().orElse(""), doc.links().atrCollectionName().orElse(""), doc.links().atrId().orElse(""));
                return this.forwardCompatibilityCheck(stage, doc.links().forwardCompatibility()).then(this.checkATREntryForBlockingDoc(doc, pspan));
            }
            this.LOGGER.info(this.attemptId, "doc %s is in another txn %s, cannot check ATR entry - probably a bug, so proceeding to overwrite", DebugUtil.docId(doc), doc.links().stagedAttemptId().get());
            return Mono.empty();
        }
        return Mono.empty();
    }

    private List<JsonObject> listStagedToDocRecords(List<StagedMutation> docs) {
        return this.listToDocRecords(docs.stream().map(v -> v.doc).collect(Collectors.toList()));
    }

    private List<JsonObject> listToDocRecords(List<TransactionGetResult> docs) {
        return docs.stream().map(v -> JsonObject.create().put("id", v.id()).put("bkt", v.collection().bucketName()).put("scp", v.collection().scopeName()).put("col", v.collection().name())).collect(Collectors.toList());
    }

    private List<MutateInSpec> addDocsToBuilder() {
        String prefix = "attempts." + this.attemptId;
        return Arrays.asList(MutateInSpec.upsert((String)(prefix + "." + "ins"), this.listStagedToDocRecords(this.stagedInserts())).xattr(), MutateInSpec.upsert((String)(prefix + "." + "rep"), this.listStagedToDocRecords(this.stagedReplaces())).xattr(), MutateInSpec.upsert((String)(prefix + "." + "rem"), this.listStagedToDocRecords(this.stagedRemoves())).xattr());
    }

    @Stability.Volatile
    public Mono<Void> defer() {
        return Mono.fromRunnable(() -> {
            if (this.queryMode()) {
                throw TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause((Throwable)new FeatureNotAvailableException("Deferred transactions are not supported when the transaction includes a query")).build();
            }
            String json = this.dehydrate(true).toString();
            this.LOGGER.info(this.attemptId, "deferring commit, serialized is %d chars", json.length());
            this.overall.serialized(TransactionSerializedContext.createFrom(json));
        }).then();
    }

    @Stability.Volatile
    Optional<TransactionSerializedContext> serialized() {
        return this.overall.serialized();
    }

    public Mono<Void> commit() {
        if (this.queryMode()) {
            return this.commitWithQuery();
        }
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapper.create(this.config, "transaction_commit", this.attemptSpan);
            if (this.isExistingError()) {
                return this.previousOperationFailedAtCommit().doOnError(err -> this.LOGGER.warn(this.attemptId, "Cannot proceed with commit as previous operations failed: " + AttemptContextReactive.dbg(err)));
            }
            this.LOGGER.info(this.attemptId, "commit %s", this);
            this.checkExpiryPreCommitAndSetExpiryOvertimeMode(HOOK_BEFORE_COMMIT, Optional.empty());
            if (this.isDone()) {
                return Mono.error((Throwable)this.transactionIsDone());
            }
            this.isDone = true;
            if (!this.atrCollection.isPresent() || !this.atrId.isPresent()) {
                return Mono.create(s -> {
                    this.LOGGER.info(this.attemptId, "calling commit on attempt that's got no mutations, skipping");
                    s.success();
                });
            }
            String prefix = "attempts." + this.attemptId;
            ArrayList<MutateInSpec> specs = new ArrayList<MutateInSpec>();
            specs.add((MutateInSpec)MutateInSpec.upsert((String)(prefix + "." + "st"), (Object)AttemptStates.COMMITTED.name()).xattr());
            specs.add((MutateInSpec)MutateInSpec.upsert((String)(prefix + "." + "tsc"), (Object)MutateInMacro.CAS));
            specs.addAll(this.addDocsToBuilder());
            specs.add((MutateInSpec)MutateInSpec.insert((String)(prefix + "." + "p"), (Object)0).xattr());
            AtomicReference<Long> overallStartTime = new AtomicReference<Long>(0L);
            Mono<Void> atrCommit = this.atrCommit(specs, overallStartTime, span);
            return atrCommit.then(this.commitDocs(span)).then(this.atrComplete(prefix, overallStartTime, span)).doOnSuccess(ignore -> this.LOGGER.info(this.attemptId, "overall commit completed")).doOnSubscribe(v -> span.start()).doFinally(v -> span.finish()).then();
        });
    }

    private Mono<Void> commitWithQuery() {
        return Mono.defer(() -> {
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, null, "COMMIT", QueryOptions.queryOptions().raw("txid", (Object)this.txid), HOOK_QUERY_COMMIT, false, true, null, null).doOnNext(v -> {
                this.state = AttemptStates.COMPLETED;
            }).onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    TransactionOperationFailed e = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).cause((Throwable)err).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).doNotRollbackAttempt().build();
                    this.errors.add(e);
                    return Mono.error((Throwable)e);
                }
                if (ec == ErrorClasses.TRANSACTION_OPERATION_FAILED) {
                    TransactionOperationFailed e = (TransactionOperationFailed)err;
                    TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, e.causingErrorClass()).cause(e.getCause()).raiseException(e.toRaise()).doNotRollbackAttempt();
                    if (e.retryTransaction()) {
                        builder.retryTransaction();
                    }
                    return Mono.error((Throwable)builder.build());
                }
                return Mono.error((Throwable)err);
            }).doFinally(v -> {
                this.isDone = true;
            }).then();
        });
    }

    private Mono<Void> previousOperationFailedAtCommit() {
        boolean retryTransaction = true;
        boolean autoRollbackAttempt = true;
        ArrayList<Throwable> causes = new ArrayList<Throwable>();
        for (TransactionOperationFailed ew : this.errors) {
            if (!ew.retryTransaction()) {
                retryTransaction = false;
            }
            if (!ew.autoRollbackAttempt()) {
                autoRollbackAttempt = false;
            }
            if (ew.getCause() == null) continue;
            causes.add(ew.getCause());
        }
        TransactionOperationFailedBuilder eb = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(new PreviousOperationFailed(causes));
        if (retryTransaction) {
            eb.retryTransaction();
        }
        if (!autoRollbackAttempt) {
            eb.doNotRollbackAttempt();
        }
        return Mono.error((Throwable)eb.build());
    }

    private void checkExpiryDuringCommitOrRollback(String stage, Optional<String> id) {
        if (!this.expiryOvertimeMode) {
            if (this.hasExpiredClientSide(stage, id)) {
                this.LOGGER.info(this.attemptId, "has expired in stage %s, entering expiry-overtime mode (one attempt to complete)", stage);
                this.expiryOvertimeMode = true;
            }
        } else {
            this.LOGGER.info(this.attemptId, "ignoring expiry in stage %s, as in expiry-overtime mode", stage);
        }
    }

    private Mono<Void> atrComplete(String prefix, AtomicReference<Long> overallStartTime, SpanWrapper pspan) {
        SpanWrapper span = SpanWrapper.createATROp(this.config, this.atrCollection, this.atrId, HOOK_ATR_COMPLETE, this.attemptSpan);
        return Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "about to set ATR %s to Completed", this.getAtrDebug(this.atrCollection, this.atrId));
            span.start();
            if (!this.expiryOvertimeMode && this.hasExpiredClientSide(HOOK_ATR_COMPLETE, Optional.empty())) {
                String msg = "has expired in stage atrComplete, but transaction has successfully completed so returning success";
                this.LOGGER.info(this.attemptId, msg);
                return Mono.error((Throwable)new AttemptExpired(this, msg));
            }
            return Mono.empty();
        }).then(this.beforeAtrComplete(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), Arrays.asList(MutateInSpec.upsert((String)(prefix + "." + "st"), (Object)AttemptStates.COMPLETED.name()).xattr(), MutateInSpec.upsert((String)(prefix + "." + "tsco"), (Object)MutateInMacro.CAS)), (MutateInOptions)this.wrap(pspan, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext("atrComplete")))).flatMap(v -> this.afterAtrComplete(this)).doOnNext(v -> {
            this.state = AttemptStates.COMPLETED;
            long now = System.nanoTime();
            long elapsed = span.finish();
            this.LOGGER.info(this.attemptId, "set ATR %s to Completed in %dms, overall commit completed in %dms", this.getAtrDebug(this.atrCollection, this.atrId), elapsed, (now - (Long)overallStartTime.get()) / 1000000L);
        }).then().onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            this.LOGGER.info(this.attemptId, "error '%s' ec=%s while setting ATR %s to Completed", new Object[]{err, ec, this.getAtrDebug(this.atrCollection, this.atrId)});
            if (ec == ErrorClasses.FAIL_HARD) {
                return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT).doNotRollbackAttempt().build());
            }
            this.LOGGER.info(this.attemptId, "ignoring error during transaction tidyup, regarding as success");
            return Mono.empty();
        });
    }

    private <T> Mono<T> mapErrorInOvertimeToExpired(String stage, Throwable err, ErrorClasses ec, TransactionOperationFailed.FinalErrorToRaise toRaise) {
        this.LOGGER.info(this.attemptId, "in expiry-overtime mode so changing error '%s' to raise %s in stage '%s'; no rollback will be tried", new Object[]{err, toRaise, stage});
        if (!this.expiryOvertimeMode) {
            this.LOGGER.warn(this.attemptId, "not in expiry-overtime mode handling error '%s' in stage %s, possibly a bug", err, stage);
        }
        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).doNotRollbackAttempt().raiseException(toRaise).cause(new AttemptExpired(this, err)).build());
    }

    private Mono<Void> removeDoc(SpanWrapper pspan, TransactionGetResult doc, boolean ambiguityResolutionMode) {
        return Mono.fromRunnable(() -> {
            this.LOGGER.info(this.attemptId, "about to remove doc %s, ambiguityResolutionMode=%s", DebugUtil.docId(doc), ambiguityResolutionMode);
            this.checkExpiryDuringCommitOrRollback(HOOK_REMOVE_DOC, Optional.of(doc.id()));
        }).then(this.beforeDocRemoved(this, doc.id())).then(doc.collection().remove(doc.id(), this.wrap(RemoveOptions.removeOptions(), pspan, doc.collection().core()))).flatMap(mutationResult -> this.afterDocRemovedPreRetry(this, doc.id()).thenReturn(mutationResult)).doOnNext(mutationResult -> {
            this.LOGGER.info(this.attemptId, "commit - removed doc %s, mt = %s", DebugUtil.docId(doc), mutationResult.mutationToken());
            mutationResult.mutationToken().ifPresent(mt -> this.finalMutationTokens.add((MutationToken)mt));
        }).then().onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            TransactionOperationFailedBuilder e = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
            this.LOGGER.info("got error while removing doc %s: %s", DebugUtil.docId(doc), AttemptContextReactive.dbg(err));
            if (this.expiryOvertimeMode) {
                return this.mapErrorInOvertimeToExpired(HOOK_REMOVE_DOC, (Throwable)err, ec, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
            }
            if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.removeDoc(pspan, doc, true));
            }
            if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                return Mono.error((Throwable)e.build());
            }
            if (ec == ErrorClasses.FAIL_HARD) {
                return Mono.error((Throwable)e.build());
            }
            return Mono.error((Throwable)e.build());
        }).then(this.afterDocRemovedPostRetry(this, doc.id())).then();
    }

    private Mono<Void> commitDocs(SpanWrapper pspan) {
        SpanWrapper span = SpanWrapper.create(this.config, "transaction_commit_docs", this.attemptSpan);
        return Flux.fromIterable(this.stagedMutations).concatMap(staged -> {
            TransactionGetResult doc = staged.doc;
            return this.commitDocWrapper(pspan, (StagedMutation)staged, doc);
        }).doOnSubscribe(x -> span.start()).then(Mono.defer(() -> {
            long elapsed = span.finish();
            this.LOGGER.info(this.attemptId, "commit - all %d docs committed in %dms", this.stagedMutations.size(), elapsed);
            return this.afterDocsCommitted(this);
        })).then();
    }

    private static String msgDocChangedUnexpectedly(ReactiveCollection collection, String id, String dbg) {
        return "Tried committing document " + DebugUtil.docId(collection, id) + ", but found that it has been modified by another party in-between staging and committing.  The application must ensure that non-transactional writes cannot happen at the same time as transactional writes on a document.  The change will be committed with CAS=0, which will overwrite the other change.  This document may need manual review to verify that no changes have been lost.  Last document state=" + dbg;
    }

    private static String msgDocRemovedUnexpectedly(ReactiveCollection collection, String id, String dbg) {
        return "Tried committing document " + DebugUtil.docId(collection, id) + ", but found that it has been removed by another party in-between replacing and committing.  The application must ensure that non-transactional writes cannot happen at the same time as transactional writes on a document.  The document will be left removed, and the transaction's changes will not be written to this document.  Last document state=" + dbg;
    }

    private Mono<Void> commitDocWrapper(SpanWrapper pspan, StagedMutation staged, TransactionGetResult doc) {
        return Mono.defer(() -> {
            if (staged.type == StagedMutationType.REMOVE) {
                return this.removeDoc(pspan, doc, false);
            }
            return this.commitDoc(pspan, staged, doc, false, staged.type == StagedMutationType.INSERT, false);
        });
    }

    private String dbg(MutationResult result) {
        if (result == null) {
            return "<unavailable>";
        }
        StringBuilder sb = new StringBuilder();
        sb.append("cas=");
        sb.append(result.cas());
        result.mutationToken().ifPresent(mt -> {
            sb.append(",seqno=");
            sb.append(mt.sequenceNumber());
            sb.append(",vbucket=");
            sb.append(mt.partitionID());
        });
        return sb.toString();
    }

    private Mono<Void> commitDoc(SpanWrapper pspan, StagedMutation staged, TransactionGetResult doc, boolean casZeroMode, boolean insertMode, boolean ambiguityResolutionMode) {
        return Mono.fromRunnable(() -> {
            this.LOGGER.info(this.attemptId, "commit - committing doc %s, casZeroMode=%s, insertMode=%s, ambiguity-resolution=%s", DebugUtil.docId(doc), casZeroMode, insertMode, ambiguityResolutionMode);
            this.checkExpiryDuringCommitOrRollback(HOOK_COMMIT_DOC, Optional.of(doc.id()));
        }).then(this.beforeDocCommitted(this, doc.id())).then(Mono.defer(() -> {
            if (insertMode) {
                return doc.collection().insert(doc.id(), (Object)staged.content, OptionsWrapperUtil.wrap(InsertOptions.insertOptions().transcoder((Transcoder)RawJsonTranscoder.INSTANCE), pspan, this.config, this.overall.perTransactionConfig(), doc.collection().core()));
            }
            return doc.collection().mutateIn(doc.id(), Arrays.asList(MutateInSpec.upsert((String)"txn", null).xattr(), MutateInSpec.remove((String)"txn").xattr(), MutateInSpec.replace((String)"", (Object)staged.content)), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("commitDoc"))).cas(casZeroMode ? 0L : doc.cas()), pspan, doc.collection().core()));
        })).flatMap(v -> this.afterDocCommittedBeforeSavingCAS(this, doc.id()).thenReturn(v)).flatMap(updatedDoc -> {
            this.LOGGER.info(this.attemptId, "commit - committed doc %s got %s", DebugUtil.docId(doc), this.dbg((MutationResult)updatedDoc));
            doc.cas(updatedDoc.cas());
            updatedDoc.mutationToken().ifPresent(mt -> this.finalMutationTokens.add((MutationToken)mt));
            return this.afterDocCommitted(this, doc.id());
        }).onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            TransactionOperationFailedBuilder e = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
            this.LOGGER.info(this.attemptId, "error while committing doc %s: %s", DebugUtil.docId(doc), AttemptContextReactive.dbg(err));
            if (this.expiryOvertimeMode) {
                return this.mapErrorInOvertimeToExpired(HOOK_COMMIT_DOC, (Throwable)err, ec, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT).thenReturn((Object)0);
            }
            if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                this.LOGGER.warn(this.attemptId, "%s while committing doc %s: as op is ambiguously successful, retrying op in ambiguity-resolution mode", err.getClass().getSimpleName(), DebugUtil.docId(doc));
                return this.commitDoc(pspan, staged, doc, casZeroMode, insertMode, true).thenReturn((Object)0);
            }
            if (ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                if (ambiguityResolutionMode) {
                    return Mono.error((Throwable)e.build());
                }
                String msg = AttemptContextReactive.msgDocChangedUnexpectedly(doc.collection(), doc.id(), this.dbg((MutationResult)staged.mr));
                this.LOGGER.warn(this.attemptId, msg);
                this.LOGGER.eventBus().publish((Event)new IllegalDocumentState(Event.Severity.WARN, msg, doc.id()));
                return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.commitDoc(pspan, staged, doc, true, false, ambiguityResolutionMode).thenReturn((Object)0));
            }
            if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                String msg = AttemptContextReactive.msgDocRemovedUnexpectedly(doc.collection(), doc.id(), this.dbg((MutationResult)staged.mr));
                this.LOGGER.warn(this.attemptId, msg);
                this.LOGGER.eventBus().publish((Event)new IllegalDocumentState(Event.Severity.WARN, msg, doc.id()));
                return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.commitDoc(pspan, staged, doc, false, true, ambiguityResolutionMode).thenReturn((Object)0));
            }
            if (ec == ErrorClasses.FAIL_DOC_ALREADY_EXISTS) {
                if (ambiguityResolutionMode) {
                    return Mono.error((Throwable)e.build());
                }
                String msg = AttemptContextReactive.msgDocRemovedUnexpectedly(doc.collection(), doc.id(), this.dbg((MutationResult)staged.mr));
                this.LOGGER.warn(this.attemptId, msg);
                this.LOGGER.eventBus().publish((Event)new IllegalDocumentState(Event.Severity.WARN, msg, doc.id()));
                return Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION).then(this.commitDoc(pspan, staged, doc, true, false, ambiguityResolutionMode).thenReturn((Object)0));
            }
            if (ec == ErrorClasses.FAIL_HARD) {
                return Mono.error((Throwable)e.build());
            }
            return Mono.error((Throwable)e.build());
        }).then();
    }

    private Mono<Void> atrCommitAmbiguityResolution(List<MutateInSpec> specs, AtomicReference<Long> overallStartTime, SpanWrapper pspan) {
        SpanWrapper span = SpanWrapper.createATROp(this.config, this.atrCollection, this.atrId, HOOK_ATR_COMMIT_AMBIGUITY_RESOLUTION, this.attemptSpan);
        return Mono.defer(() -> {
            span.start();
            this.LOGGER.info(this.attemptId, "about to fetch status of ATR %s to resolve ambiguity, expiryOvertimeMode=%s", this.getAtrDebug(this.atrCollection, this.atrId), this.expiryOvertimeMode);
            overallStartTime.set(System.nanoTime());
            return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_COMMIT_AMBIGUITY_RESOLUTION, Optional.empty());
        }).then(this.beforeAtrCommitAmbiguityResolution(this)).then(this.atrCollection.get().lookupIn(this.atrId.get(), Collections.singletonList(LookupInSpec.get((String)("attempts." + this.attemptId + "." + "st")).xattr()), LookupInOptions.lookupInOptions().serializer((JsonSerializer)SerializationUtil.DEFAULT_JSON_SERIALIZER))).flatMap(result -> {
            String status = (String)result.contentAs(0, String.class);
            this.LOGGER.info(this.attemptId, "got status of ATR %s: '%s'", this.getAtrDebug(this.atrCollection, this.atrId), status);
            AttemptStates state = AttemptStates.convert(status);
            switch (state) {
                case COMMITTED: {
                    return Mono.empty();
                }
                case ABORTED: {
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).retryTransaction().build());
                }
            }
            return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).doNotRollbackAttempt().cause(new IllegalStateException("This transaction has been changed by another actor to be in unexpected state " + status)).build());
        }).then().onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause((Throwable)err);
            if (err instanceof RetryAtrCommit || ec == ErrorClasses.TRANSACTION_OPERATION_FAILED) {
                return Mono.error((Throwable)err);
            }
            this.LOGGER.info(this.attemptId, "error while resolving ATR %s ambiguity: %s", this.getAtrDebug(this.atrCollection, this.atrId), AttemptContextReactive.dbg(err));
            if (ec == ErrorClasses.FAIL_EXPIRY) {
                return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause(new AttemptExpired(this, (Throwable)err)).build());
            }
            if (ec == ErrorClasses.FAIL_HARD) {
                return Mono.error((Throwable)builder.doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build());
            }
            if (ec == ErrorClasses.FAIL_TRANSIENT || ec == ErrorClasses.FAIL_OTHER) {
                return Mono.error((Throwable)new RetryOperation());
            }
            if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause(new ActiveTransactionRecordEntryNotFound(this.atrId.get(), this.attemptId)).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build());
            }
            if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause(new ActiveTransactionRecordNotFound(this.atrId.get(), this.attemptId)).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build());
            }
            return Mono.error((Throwable)builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).build());
        }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY).doFinally(v -> span.finish());
    }

    private Mono<Void> atrCommit(List<MutateInSpec> specs, AtomicReference<Long> overallStartTime, SpanWrapper pspan) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapper.createATROp(this.config, this.atrCollection, this.atrId, HOOK_ATR_COMMIT, this.attemptSpan);
            AtomicBoolean ambiguityResolutionMode = new AtomicBoolean(false);
            return Mono.defer(() -> {
                span.start();
                this.LOGGER.info(this.attemptId, "about to set ATR %s to Committed, expiryOvertimeMode=%s, ambiguityResolutionMode=%s", this.getAtrDebug(this.atrCollection, this.atrId), this.expiryOvertimeMode, ambiguityResolutionMode);
                overallStartTime.set(System.nanoTime());
                return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_COMMIT, Optional.empty());
            }).then(this.beforeAtrCommit(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), specs, (MutateInOptions)this.wrap(pspan, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext("atrCommit")))).flatMap(v -> this.afterAtrCommit(this)).doOnNext(v -> {
                this.state = AttemptStates.COMMITTED;
                this.LOGGER.info(this.attemptId, "set ATR %s to Committed in %dms", this.getAtrDebug(this.atrCollection, this.atrId), (System.nanoTime() - (Long)overallStartTime.get()) / 1000000L);
            }).then().onErrorResume(err -> {
                ErrorClasses ec = ErrorClasses.classify(err);
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
                this.LOGGER.info(this.attemptId, "error while setting ATR %s to Committed: %s", this.getAtrDebug(this.atrCollection, this.atrId), AttemptContextReactive.dbg(err));
                if (ec == ErrorClasses.FAIL_EXPIRY) {
                    TransactionOperationFailed.FinalErrorToRaise toRaise = ambiguityResolutionMode.get() ? TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS : TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED;
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(toRaise).cause(new AttemptExpired(this, (Throwable)err)).build());
                }
                if (ec == ErrorClasses.FAIL_AMBIGUOUS) {
                    ambiguityResolutionMode.set(true);
                    return Mono.error((Throwable)new RetryOperation());
                }
                if (ec == ErrorClasses.FAIL_HARD) {
                    if (ambiguityResolutionMode.get()) {
                        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause((Throwable)err).build());
                    }
                    return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
                }
                if (ec == ErrorClasses.FAIL_TRANSIENT) {
                    if (ambiguityResolutionMode.get()) {
                        throw new RetryOperation();
                    }
                    return Mono.error((Throwable)builder.retryTransaction().build());
                }
                if (ec == ErrorClasses.FAIL_PATH_ALREADY_EXISTS) {
                    return this.atrCommitAmbiguityResolution(specs, overallStartTime, pspan).onErrorResume(e -> {
                        if (e instanceof RetryAtrCommit) {
                            ambiguityResolutionMode.set(false);
                            throw new RetryOperation();
                        }
                        return Mono.error((Throwable)e);
                    });
                }
                Throwable cause = err;
                boolean rollback = true;
                switch (ec) {
                    case FAIL_PATH_NOT_FOUND: {
                        cause = new ActiveTransactionRecordEntryNotFound(this.atrId.get(), this.attemptId);
                        rollback = false;
                        break;
                    }
                    case FAIL_DOC_NOT_FOUND: {
                        cause = new ActiveTransactionRecordNotFound(this.atrId.get(), this.attemptId);
                        rollback = false;
                        break;
                    }
                    case FAIL_ATR_FULL: {
                        cause = new ActiveTransactionRecordFull(this, (Throwable)err);
                        rollback = false;
                    }
                }
                if (ambiguityResolutionMode.get()) {
                    return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS).cause(cause).build());
                }
                return Mono.error((Throwable)builder.cause(cause).rollbackAttempt(rollback).build());
            }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY_WITH_FIXED_RETRY).doFinally(v -> span.finish());
        });
    }

    private <T> Mono<T> setExpiryOvertimeMode(Throwable err, String stage, ErrorClasses ec) {
        return Mono.fromRunnable(() -> {
            this.LOGGER.info(this.attemptId, "moving to expiry-overtime-mode in stage %s", stage);
            this.expiryOvertimeMode = true;
        });
    }

    private <T> Mono<T> setExpiryOvertimeModeAndFail(Throwable err, String stage, ErrorClasses ec) {
        this.LOGGER.info(this.attemptId, "moving to expiry-overtime-mode in stage %s, and raising error", stage);
        this.expiryOvertimeMode = true;
        return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ec).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).cause(new AttemptExpired(this, err)).build());
    }

    public Mono<Void> rollback() {
        return this.rollbackInternal(true);
    }

    Mono<Void> rollbackInternal(boolean isAppRollback) {
        SpanWrapper span = SpanWrapper.create(this.config, "transaction_rollback", this.attemptSpan);
        if (this.queryMode()) {
            return this.rollbackQuery();
        }
        return Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "rollback %s expiryOvertimeMode=%s isAppRollback=%s", this, this.expiryOvertimeMode, isAppRollback);
            if (!this.expiryOvertimeMode && this.hasExpiredClientSide(HOOK_ROLLBACK, Optional.empty())) {
                this.LOGGER.info(this.attemptId, "has expired before rollback, entering expiry-overtime mode");
                this.expiryOvertimeMode = true;
            }
            if (isAppRollback && this.isDone()) {
                return Mono.error((Throwable)this.transactionIsDone());
            }
            this.isDone = true;
            if (!this.atrCollection.isPresent() || !this.atrId.isPresent()) {
                return Mono.create(s -> {
                    this.LOGGER.info(this.attemptId, "Calling rollback when it's had no mutations, so nothing to do");
                    s.success();
                });
            }
            String prefix = "attempts." + this.attemptId;
            return this.atrAbort(prefix, span, isAppRollback, false).then(this.rollbackDocs(span)).then(this.atrRollbackComplete(prefix, span, isAppRollback)).onErrorResume(err -> {
                if (err instanceof ActiveTransactionRecordNotFound) {
                    this.LOGGER.info(this.attemptId, "ActiveTransactionRecordNotFound indicates that nothing needs to be done for this rollback: treating as successful rollback");
                    return Mono.empty();
                }
                return Mono.error((Throwable)err);
            });
        }).doOnSubscribe(v -> span.start()).doFinally(v -> span.finish());
    }

    private Mono<Void> rollbackQuery() {
        return Mono.defer(() -> {
            int statementIdx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(statementIdx, null, "ROLLBACK", QueryOptions.queryOptions(), HOOK_QUERY_ROLLBACK, false, false, null, null).then(Mono.fromRunnable(() -> {
                this.state = AttemptStates.ROLLED_BACK;
            })).onErrorResume(err -> {
                if (err instanceof TransactionOperationFailed) {
                    TransactionOperationFailed e = (TransactionOperationFailed)err;
                    TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, e.causingErrorClass()).cause(e.getCause()).raiseException(e.toRaise()).doNotRollbackAttempt();
                    if (e.retryTransaction()) {
                        builder.retryTransaction();
                    }
                    return Mono.error((Throwable)builder.build());
                }
                if (err instanceof AttemptNotFoundOnQuery) {
                    return Mono.empty();
                }
                return Mono.error((Throwable)err);
            }).doFinally(v -> {
                this.isDone = true;
            }).then();
        });
    }

    private Mono<Void> atrRollbackComplete(String prefix, SpanWrapper pspan, boolean isAppRollback) {
        SpanWrapper span = SpanWrapper.createATROp(this.config, this.atrCollection, this.atrId, HOOK_ATR_ROLLBACK_COMPLETE, this.attemptSpan);
        return Mono.defer(() -> {
            span.start();
            this.LOGGER.info(this.attemptId, "marking ATR %s as rollback complete", this.getAtrDebug(this.atrCollection, this.atrId));
            return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_ROLLBACK_COMPLETE, Optional.empty());
        }).then(this.beforeAtrRolledBack(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), Arrays.asList(MutateInSpec.upsert((String)(prefix + "." + "st"), (Object)AttemptStates.ROLLED_BACK.name()).xattr(), MutateInSpec.upsert((String)(prefix + "." + "tsrc"), (Object)MutateInMacro.CAS)), (MutateInOptions)this.wrap(pspan, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext("atrRollbackComplete")))).flatMap(v -> this.afterAtrRolledBack(this)).onErrorResume(err -> {
            this.LOGGER.info(this.attemptId, "error while marking ATR %s as rollback complete: %s", this.getAtrDebug(this.atrCollection, this.atrId), AttemptContextReactive.dbg(err));
            ErrorClasses ec = ErrorClasses.classify(err);
            TransactionOperationFailedBuilder error = TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt();
            if (this.expiryOvertimeMode) {
                return this.mapErrorInOvertimeToExpired(HOOK_ATR_ROLLBACK_COMPLETE, (Throwable)err, ec, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
            }
            if (ec == ErrorClasses.FAIL_EXPIRY) {
                return this.setExpiryOvertimeMode((Throwable)err, HOOK_ATR_ROLLBACK_COMPLETE, ec).then(Mono.delay((Duration)DEFAULT_DELAY_RETRYING_OPERATION)).then(Mono.error((Throwable)new RetryOperation()));
            }
            if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                return Mono.empty();
            }
            if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                return Mono.error((Throwable)error.cause(new ActiveTransactionRecordNotFound(this.atrId.get(), this.attemptId)).build());
            }
            if (ec == ErrorClasses.FAIL_HARD) {
                return Mono.error((Throwable)error.build());
            }
            return Mono.error((Throwable)new RetryOperation());
        }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).then(Mono.fromRunnable(() -> {
            this.state = AttemptStates.ROLLED_BACK;
            this.LOGGER.info(this.attemptId, "rollback - atr rolled back");
        })).then().doFinally(v -> span.finish());
    }

    private Mono<Void> rollbackDocs(SpanWrapper pspan) {
        SpanWrapper span = SpanWrapper.create(this.config, "transaction_rollback_docs", this.attemptSpan);
        return Flux.fromIterable(this.stagedMutations).concatMap(staged -> {
            TransactionGetResult doc = staged.doc;
            switch (staged.type) {
                case INSERT: {
                    return this.rollbackStagedInsert(pspan, doc);
                }
            }
            return this.rollbackStagedReplaceOrRemove(pspan, doc);
        }).doOnNext(v -> this.LOGGER.info(this.attemptId, "rollback - docs rolled back")).then().doOnSubscribe(v -> span.start()).doFinally(v -> span.finish());
    }

    private Mono<Void> rollbackStagedReplaceOrRemove(SpanWrapper pspan, TransactionGetResult doc) {
        return Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "rolling back doc %s with cas %d by removing staged mutation", DebugUtil.docId(doc), doc.cas());
            return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ROLLBACK_DOC, Optional.of(doc.id()));
        }).then(this.beforeDocRolledBack(this, doc.id())).then(doc.collection().mutateIn(doc.id(), Arrays.asList(MutateInSpec.remove((String)"txn").xattr()), this.wrap(((MutateInOptions)MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("rollbackDoc"))).cas(doc.cas()), pspan, doc.collection().core()))).flatMap(updatedDoc -> this.afterRollbackReplaceOrRemove(this, doc.id()).thenReturn(updatedDoc)).doOnNext(updatedDoc -> this.LOGGER.info(this.attemptId, "rolled back doc %s, got cas %d and mt %s", DebugUtil.docId(doc), updatedDoc.cas(), updatedDoc.mutationToken())).then().onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).doNotRollbackAttempt().cause((Throwable)err);
            this.logger().info(this.attemptId, "got error while rolling back doc %s: %s", DebugUtil.docId(doc), AttemptContextReactive.dbg(err));
            if (this.expiryOvertimeMode) {
                return this.mapErrorInOvertimeToExpired(HOOK_ROLLBACK_DOC, (Throwable)err, ec, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
            }
            if (ec == ErrorClasses.FAIL_EXPIRY) {
                return this.setExpiryOvertimeMode((Throwable)err, HOOK_ROLLBACK_DOC, ec).then(Mono.error((Throwable)new RetryOperation()));
            }
            if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                this.LOGGER.info(this.attemptId, "got PATH_NOT_FOUND while cleaning up staged doc %s, it must have already been rolled back, continuing", DebugUtil.docId(doc));
                return Mono.empty();
            }
            if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                return Mono.error((Throwable)builder.build());
            }
            if (ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                return Mono.error((Throwable)builder.build());
            }
            if (ec == ErrorClasses.FAIL_HARD) {
                return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
            }
            return Mono.error((Throwable)new RetryOperation());
        }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY);
    }

    private Publisher<Void> rollbackStagedInsert(SpanWrapper pspan, TransactionGetResult doc) {
        return Mono.defer(() -> {
            this.LOGGER.info(this.attemptId, "rolling back staged insert %s with cas %d", DebugUtil.docId(doc), doc.cas());
            return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_DELETE_INSERTED, Optional.of(doc.id()));
        }).then(this.beforeRollbackDeleteInserted(this, doc.id())).then(Mono.defer(() -> doc.collection().mutateIn(doc.id(), Collections.singletonList(MutateInSpec.remove((String)"txn").xattr()), this.wrap(MutateInOptions.mutateInOptions(), pspan, doc.collection().core()).accessDeleted(true).cas(doc.cas())))).flatMap(updatedDoc -> this.afterRollbackDeleteInserted(this, doc.id()).thenReturn(updatedDoc)).doOnNext(updatedDoc -> this.LOGGER.info(this.attemptId, "deleted inserted doc %s, mt %s", DebugUtil.docId(doc), updatedDoc.mutationToken())).then().onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err);
            this.LOGGER.info(this.attemptId, "error while rolling back inserted doc %s: %s", DebugUtil.docId(doc), AttemptContextReactive.dbg(err));
            if (this.expiryOvertimeMode) {
                return this.mapErrorInOvertimeToExpired(HOOK_REMOVE_DOC, (Throwable)err, ec, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
            }
            if (ec == ErrorClasses.FAIL_EXPIRY) {
                return this.setExpiryOvertimeMode((Throwable)err, HOOK_REMOVE, ec).then(Mono.error((Throwable)new RetryOperation()));
            }
            if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND || ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                this.LOGGER.info(this.attemptId, "got %s while removing staged insert doc %s, it must have already been rolled back, continuing", new Object[]{ec, DebugUtil.docId(doc)});
                return Mono.empty();
            }
            if (ec == ErrorClasses.FAIL_HARD) {
                return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
            }
            if (ec == ErrorClasses.FAIL_CAS_MISMATCH) {
                return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
            }
            return Mono.error((Throwable)new RetryOperation());
        }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY);
    }

    private Mono<Void> atrAbort(String prefix, SpanWrapper pspan, boolean isAppRollback, boolean ambiguityResolutionMode) {
        SpanWrapper span = SpanWrapper.createATROp(this.config, this.atrCollection, this.atrId, HOOK_ATR_ABORT, this.attemptSpan);
        ArrayList<Object> spec = new ArrayList<Object>();
        spec.add(MutateInSpec.upsert((String)(prefix + "." + "st"), (Object)AttemptStates.ABORTED.name()).xattr());
        spec.add(MutateInSpec.upsert((String)(prefix + "." + "tsrs"), (Object)MutateInMacro.CAS));
        spec.addAll(this.addDocsToBuilder());
        return Mono.defer(() -> {
            span.start();
            this.LOGGER.info(this.attemptId, "aborting ATR %s isAppRollback=%s ambiguityResolutionMode=%s", this.getAtrDebug(this.atrCollection, this.atrId), isAppRollback, ambiguityResolutionMode);
            return this.errorIfExpiredAndNotInExpiryOvertimeMode(HOOK_ATR_ABORT, Optional.empty());
        }).then(this.beforeAtrAborted(this)).then(this.atrCollection.get().mutateIn(this.atrId.get(), spec, (MutateInOptions)this.wrap(pspan, this.atrCollection.get().core()).clientContext(OptionsWrapperUtil.createClientContext("atrAbort")))).then(this.afterAtrAborted(this)).doOnNext(v -> {
            this.state = AttemptStates.ABORTED;
            this.LOGGER.info(this.attemptId, "aborted ATR %s", this.getAtrDebug(this.atrCollection, this.atrId));
        }).then().onErrorResume(err -> {
            ErrorClasses ec = ErrorClasses.classify(err);
            TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ec).cause((Throwable)err).doNotRollbackAttempt();
            this.LOGGER.info(this.attemptId, "error '%s' '%s' %s while aborting ATR %s", new Object[]{err.getClass().getSimpleName(), err.getMessage(), ec, this.getAtrDebug(this.atrCollection, this.atrId)});
            if (this.expiryOvertimeMode) {
                return this.mapErrorInOvertimeToExpired(HOOK_ATR_ABORT, (Throwable)err, ec, TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
            }
            if (ec == ErrorClasses.FAIL_EXPIRY) {
                return this.setExpiryOvertimeMode((Throwable)err, HOOK_ATR_ABORT, ec).then(Mono.error((Throwable)new RetryOperation()));
            }
            if (ec == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                return Mono.error((Throwable)builder.cause(new ActiveTransactionRecordEntryNotFound(this.atrId.get(), this.attemptId)).build());
            }
            if (ec == ErrorClasses.FAIL_DOC_NOT_FOUND) {
                return Mono.error((Throwable)builder.cause(new ActiveTransactionRecordNotFound(this.atrId.get(), this.attemptId)).build());
            }
            if (ec == ErrorClasses.FAIL_ATR_FULL) {
                return Mono.error((Throwable)builder.cause(new ActiveTransactionRecordFull(this)).build());
            }
            if (ec == ErrorClasses.FAIL_HARD) {
                return Mono.error((Throwable)builder.doNotRollbackAttempt().build());
            }
            return Mono.error((Throwable)new RetryOperation());
        }).retryWhen(RETRY_OPERATION_UNTIL_EXPIRY).doFinally(v -> span.finish());
    }

    boolean isDone() {
        return this.isDone;
    }

    boolean isExistingError() {
        return !this.errors.isEmpty();
    }

    AttemptStates state() {
        return this.state;
    }

    boolean queryMode() {
        return this.queryTarget != null;
    }

    @Stability.Uncommitted
    public Mono<ReactiveQueryResult> query(String statement) {
        return this.query(statement, TransactionQueryOptions.DEFAULT);
    }

    Mono<QueryResult> queryBlocking(String statement, TransactionQueryOptions options) {
        return Mono.defer(() -> {
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, null, statement, options.builder(), HOOK_QUERY, true, true, null, null);
        });
    }

    Mono<QueryResult> queryBlocking(Scope scope, String statement, TransactionQueryOptions options) {
        return Mono.defer(() -> {
            int sidx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(sidx, scope, statement, options.builder(), HOOK_QUERY, true, true, null, null);
        });
    }

    private String debugMetrics(QueryMetaData md) {
        StringBuilder sb = new StringBuilder();
        md.metrics().ifPresent(m -> sb.append(m));
        md.profile().ifPresent(p -> {
            sb.append(", profile=");
            sb.append(p);
        });
        return sb.toString();
    }

    private Duration queryTimeout() {
        return Duration.ofMillis(this.expiryRemainingMillis()).plus(this.config.keyValueTimeout().orElse(this.cluster().environment().timeoutConfig().kvDurableTimeout())).plusSeconds(1L);
    }

    private Mono<ReactiveQueryResult> queryInternalReactive(int sidx, @Nullable Scope scope, String statement, TransactionQueryOptions options) {
        return this.beforeQuery(this, statement).then(Mono.defer(() -> {
            QueryOptions.Built built = options.builder().build();
            JsonSerializer serializer = built.serializer() == null ? this.cluster().environment().jsonSerializer() : built.serializer();
            QueryRequest request = QueryAccessor.targetedQueryRequest(statement, built, scope, this.queryTarget, this.queryTimeout(), this.cluster());
            return SDKAccessUtil.queryAccessor(this.cluster()).queryReactive(request, built, serializer).doOnNext(v -> {
                if (this.queryTarget == null) {
                    this.queryTarget = request.context().lastDispatchedToNode();
                    this.logger().info(this.attemptId, "q%d got query node to use for future queries %s", sidx, RedactableArgument.redactMeta((Object)this.queryTarget));
                }
            });
        })).flatMap(result -> this.afterQuery(this, statement).thenReturn(result));
    }

    private Mono<QueryResult> queryInternal(int sidx, @Nullable Scope scope, String statement, QueryOptions options) {
        return this.beforeQuery(this, statement).then(Mono.defer(() -> {
            options.metrics(true);
            QueryOptions.Built built = options.build();
            JsonSerializer serializer = built.serializer() == null ? this.cluster().environment().jsonSerializer() : built.serializer();
            QueryRequest request = QueryAccessor.targetedQueryRequest(statement, built, scope, this.queryTarget, this.queryTimeout(), this.cluster());
            return Mono.fromFuture((CompletableFuture)SDKAccessUtil.queryAccessor(this.cluster()).queryAsync(request, built, serializer)).doOnNext(v -> {
                if (this.queryTarget == null) {
                    this.queryTarget = request.context().lastDispatchedToNode();
                    this.logger().info(this.attemptId, "q%d got query node id %s", sidx, RedactableArgument.redactMeta((Object)this.queryTarget));
                }
            });
        })).flatMap(result -> this.afterQuery(this, statement).thenReturn(result));
    }

    private Mono<QueryResult> queryWrapper(int sidx, @Nullable Scope scope, String statement, QueryOptions options, String hookPoint, boolean allowedToBeginWork, boolean existingErrorCheck, @Nullable JsonObject txdata, @Nullable JsonArray params) {
        return Mono.defer(() -> {
            SpanWrapper span = SpanWrapper.create(this.config, "user.query", this.attemptSpan).start();
            Mono<Void> beginWorkIfNeeded = Mono.empty();
            if (!this.queryMode() && allowedToBeginWork) {
                beginWorkIfNeeded = this.queryBeginWork();
            }
            this.logger().debug(this.attemptId, "q%d: '%s' params=%s txdata=%s", sidx, RedactableArgument.redactUser((Object)statement), RedactableArgument.redactUser((Object)params), txdata);
            if (txdata != null) {
                options.raw("txdata", (Object)txdata);
            }
            return beginWorkIfNeeded.then(this.queryInternalPre(sidx, statement, hookPoint, existingErrorCheck)).then(Mono.defer(() -> {
                if (this.txid != null) {
                    options.raw("txid", (Object)this.txid);
                }
                return this.queryInternal(sidx, scope, statement, options);
            })).onErrorResume(err -> {
                RuntimeException converted = this.convertQueryError(sidx, (Throwable)err);
                long elapsed = span.finish();
                this.logger().warn(this.attemptId, "q%d got error %s after %dms, converted to %s", sidx, AttemptContextReactive.dbg(err), elapsed, AttemptContextReactive.dbg(converted));
                if (converted != null) {
                    return Mono.error((Throwable)converted);
                }
                return Mono.error((Throwable)err);
            }).flatMap(result -> {
                long elapsed = span.finish();
                this.logger().info(this.attemptId, "q%d returned with metrics %s after %dms", sidx, this.debugMetrics(result.metaData()), elapsed);
                if (result.metaData().status() == QueryStatus.FATAL) {
                    TransactionOperationFailed err = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).build();
                    this.errors.add(err);
                    return Mono.error((Throwable)err);
                }
                return Mono.just((Object)result);
            });
        });
    }

    private RuntimeException convertQueryError(int sidx, Throwable err) {
        QueryErrorContext ctx;
        CouchbaseException ce;
        if (err instanceof TimeoutException) {
            return new AttemptExpired(this, err);
        }
        if (err instanceof CouchbaseException && (ce = (CouchbaseException)err).context() instanceof QueryErrorContext && (ctx = (QueryErrorContext)ce.context()).errors().size() >= 1) {
            ErrorCodeAndMessage first = (ErrorCodeAndMessage)ctx.errors().get(0);
            int code = first.code();
            switch (code) {
                case 1065: {
                    return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause((Throwable)new FeatureNotAvailableException("N1QL queries inside transactions are supported from Couchbase Server 7.0", err)).build();
                }
                case 17004: {
                    return new AttemptNotFoundOnQuery();
                }
                case 17010: {
                    return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).cause(new AttemptExpired(this, err)).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build();
                }
                case 17012: {
                    return new DocumentExistsException((ErrorContext)ctx);
                }
                case 17014: {
                    return new DocumentNotFoundException((ErrorContext)ctx);
                }
                case 17015: {
                    return new CasMismatchException((ErrorContext)ctx);
                }
            }
            if (first.context().containsKey("cause")) {
                Map cause = (Map)first.context().get("cause");
                Boolean rollbackRaw = (Boolean)cause.get("rollback");
                Boolean retryRaw = (Boolean)cause.get("retry");
                Object raiseRaw = cause.get("raise");
                this.logger().info(this.attemptId, "q%d query code=%d cause=%s raise_type=%s", sidx, code, RedactableArgument.redactUser((Object)cause), raiseRaw.getClass().getSimpleName());
                TransactionOperationFailedBuilder builder = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).cause(err);
                if (raiseRaw instanceof Integer) {
                    int raise = (Integer)raiseRaw;
                    if (raise < TransactionOperationFailed.FinalErrorToRaise.values().length) {
                        builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.values()[raise]);
                    } else {
                        builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED);
                    }
                } else if (raiseRaw instanceof String) {
                    String raise;
                    switch (raise = (String)raiseRaw) {
                        case "failed_post_commit": {
                            builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT);
                            break;
                        }
                        case "commit_ambiguous": {
                            builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_COMMIT_AMBIGUOUS);
                            break;
                        }
                        case "expired": {
                            builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED);
                            break;
                        }
                        default: {
                            builder.raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED);
                        }
                    }
                }
                if (retryRaw != null && retryRaw.booleanValue()) {
                    builder.retryTransaction();
                }
                if (rollbackRaw != null && !rollbackRaw.booleanValue()) {
                    builder.doNotRollbackAttempt();
                }
                return builder.build();
            }
            if (first.message().contains("write write conflict")) {
                return TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_WRITE_WRITE_CONFLICT).cause(err).retryTransaction().doNotRollbackAttempt().build();
            }
        }
        return null;
    }

    private Mono<Void> queryBeginWork() {
        return Mono.defer(() -> {
            JsonObject txdata = this.makeQueryTxData();
            QueryScanConsistency scanConsistency = null;
            if (this.overall.perTransactionConfig().scanConsistency().isPresent()) {
                scanConsistency = this.overall.perTransactionConfig().scanConsistency().get();
            } else if (this.config.scanConsistency().isPresent()) {
                scanConsistency = this.config.scanConsistency().get();
            }
            this.logger().info(this.attemptId, "BEGIN WORK scanConsistency: c=%s ptc=%s chosen=%s", this.config.scanConsistency(), this.overall.perTransactionConfig().scanConsistency(), scanConsistency);
            QueryOptions options = QueryOptions.queryOptions();
            if (scanConsistency != null) {
                options.scanConsistency(scanConsistency);
            }
            if (scanConsistency == QueryScanConsistency.NOT_BOUNDED) {
                options.raw("scan_consistency", (Object)QueryScanConsistency.NOT_BOUNDED.toString());
            }
            String statement = "BEGIN WORK";
            int statementIdx = this.queryStatementIdx.getAndIncrement();
            return this.queryWrapper(statementIdx, null, statement, options, HOOK_QUERY_BEGIN_WORK, false, true, txdata, null).doOnNext(v -> {
                List rows = v.rowsAsObject();
                for (JsonObject row : rows) {
                    String txid;
                    this.txid = txid = row.getString("txid");
                }
            }).then();
        }).doOnNext(v -> this.stagedMutations.clear());
    }

    private Cluster cluster() {
        return this.parent.cleanup().clusterData().cluster();
    }

    @Stability.Uncommitted
    public Mono<ReactiveQueryResult> query(String statement, TransactionQueryOptions options) {
        return this.query(null, statement, options);
    }

    @Stability.Uncommitted
    public Mono<ReactiveQueryResult> query(Scope scope, String statement, TransactionQueryOptions options) {
        return Mono.defer(() -> {
            long start = System.nanoTime();
            int sidx = this.queryStatementIdx.getAndIncrement();
            Mono<Void> beginWorkIfNeeded = Mono.empty();
            if (!this.queryMode()) {
                beginWorkIfNeeded = this.queryBeginWork();
            }
            return beginWorkIfNeeded.then(this.queryInternalPre(sidx, statement, HOOK_QUERY, true)).then(Mono.defer(() -> {
                if (this.txid != null) {
                    options.raw("txid", this.txid);
                }
                return this.queryInternalReactive(sidx, scope, statement, options);
            })).doOnNext(result -> {
                this.LOGGER.info(this.attemptId, "q%d async started streaming rows after %dms", sidx, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
                result.metaData().flatMap(metaData -> {
                    this.LOGGER.info(this.attemptId, "q%d async received query status of %s and finished streaming after %d millis", sidx, metaData.status(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
                    if (metaData.status() == QueryStatus.FATAL) {
                        TransactionOperationFailed err = TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_OTHER).build();
                        this.errors.add(err);
                        return Mono.error((Throwable)err);
                    }
                    return Mono.empty();
                }).subscribe();
            }).doOnError(err -> this.logger().warn(this.attemptId, "q%d got error %s while performing query '%s'", sidx, AttemptContextReactive.dbg(err), RedactableArgument.redactUser((Object)statement)));
        });
    }

    Mono<Void> queryInternalPre(int sidx, String statement, String hookPoint, boolean existingErrorCheck) {
        return Mono.defer(() -> {
            boolean expiresSoon;
            this.logger().info(this.attemptId, "q%d performing query '%s'", sidx, RedactableArgument.redactUser((Object)statement));
            if (this.isDone()) {
                return Mono.error((Throwable)this.transactionIsDone());
            }
            if (existingErrorCheck && this.isExistingError()) {
                return Mono.error((Throwable)this.existingError());
            }
            long remaining = this.expiryRemainingMillis();
            boolean bl = expiresSoon = remaining < (long)this.EXPIRY_THRESHOLD;
            if (this.hasExpiredClientSide(hookPoint, Optional.of(statement)) || expiresSoon) {
                this.logger().info(this.attemptId, "transaction has expired in stage '%s' remaining=%d threshold=%d", hookPoint, remaining, this.EXPIRY_THRESHOLD);
                return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(this, ErrorClasses.FAIL_EXPIRY).raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build());
            }
            return Mono.empty();
        });
    }

    CleanupRequest createCleanupRequest() {
        assert (this.state != AttemptStates.NOT_STARTED);
        assert (!this.queryMode());
        long transactionElapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.overall.startTimeClient().toNanos());
        return new CleanupRequest(this.attemptId, this.atrId().get(), this.atrCollection().get(), this.state, this.toDocRecords(this.stagedReplaces()), this.toDocRecords(this.stagedRemoves()), this.toDocRecords(this.stagedInserts()), this.config.transactionExpirationTime(), Optional.empty(), transactionElapsedTimeMillis, Optional.of(this.config.durabilityLevel()));
    }

    private List<DocRecord> toDocRecords(List<StagedMutation> mutations) {
        return mutations.stream().map(m -> new DocRecord(m.doc.collection().bucketName(), m.doc.collection().scopeName(), m.doc.collection().name(), m.doc.id())).collect(Collectors.toList());
    }

    protected Mono<Integer> beforeAtrCommit(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrCommitAmbiguityResolution(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrCommit(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocCommitted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocRolledBack(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocCommittedBeforeSavingCAS(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocCommitted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocsCommitted(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocRemoved(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocRemovedPreRetry(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocRemovedPostRetry(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterDocsRemoved(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrPending(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrPending(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrComplete(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrComplete(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrRolledBack(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterGetComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeRollbackDeleteInserted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterStagedReplaceComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterStagedRemoveComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeStagedInsert(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeStagedRemove(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeStagedReplace(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterStagedInsertComplete(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeAtrAborted(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrAborted(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterAtrRolledBack(AttemptContextReactive self) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterRollbackReplaceOrRemove(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterRollbackDeleteInserted(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeCheckATREntryForBlockingDoc(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeDocGet(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeGetDocInExistsDuringStagedInsert(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    protected Boolean hasExpiredClientSideHook(AttemptContextReactive self, String place, Optional<String> docId) {
        return false;
    }

    protected Mono<Integer> beforeQuery(AttemptContextReactive self, String statement) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> afterQuery(AttemptContextReactive self, String statement) {
        return Mono.just((Object)0);
    }

    protected Mono<Integer> beforeOverwritingStagedInsertRemoval(AttemptContextReactive self, String id) {
        return Mono.just((Object)0);
    }

    public TransactionLogger logger() {
        return this.LOGGER;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("AttemptContextReactive{");
        sb.append("id=").append(this.attemptId.substring(0, 5));
        sb.append(",state=").append((Object)this.state);
        sb.append(",atr=").append(ATRUtil.getAtrDebug(this.atrCollection, this.atrId));
        sb.append(",staged=").append(this.stagedMutations.stream().map(StagedMutation::toString).collect(Collectors.toList()));
        sb.append('}');
        return sb.toString();
    }
}

