/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.transactions;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.error.UnambiguousTimeoutException;
import com.couchbase.client.core.msg.query.SDKAccessUtil;
import com.couchbase.client.core.retry.reactor.DefaultRetry;
import com.couchbase.client.core.retry.reactor.Jitter;
import com.couchbase.client.core.retry.reactor.RetryContext;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.ReactiveScope;
import com.couchbase.client.java.Scope;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.query.QueryResult;
import com.couchbase.client.java.query.ReactiveQueryResult;
import com.couchbase.transactions.AttemptContext;
import com.couchbase.transactions.AttemptContextReactive;
import com.couchbase.transactions.ReactiveSingleQueryTransactionResult;
import com.couchbase.transactions.SingleQueryTransactionResult;
import com.couchbase.transactions.TransactionContext;
import com.couchbase.transactions.TransactionResult;
import com.couchbase.transactions.cleanup.ClusterData;
import com.couchbase.transactions.cleanup.TransactionsCleanup;
import com.couchbase.transactions.components.ATR;
import com.couchbase.transactions.components.ActiveTransactionRecord;
import com.couchbase.transactions.config.MergedTransactionConfig;
import com.couchbase.transactions.config.PerTransactionConfig;
import com.couchbase.transactions.config.PerTransactionConfigBuilder;
import com.couchbase.transactions.config.SingleQueryTransactionConfig;
import com.couchbase.transactions.config.SingleQueryTransactionConfigBuilder;
import com.couchbase.transactions.config.TransactionConfig;
import com.couchbase.transactions.deferred.TransactionSerializedContext;
import com.couchbase.transactions.error.RetryTransaction;
import com.couchbase.transactions.error.TransactionCommitAmbiguous;
import com.couchbase.transactions.error.TransactionFailed;
import com.couchbase.transactions.error.external.TransactionOperationFailed;
import com.couchbase.transactions.forwards.Supported;
import com.couchbase.transactions.log.EventBusPersistedLogger;
import com.couchbase.transactions.log.PersistedLogWriter;
import com.couchbase.transactions.log.TransactionLogEvent;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.util.DebugUtil;
import com.couchbase.transactions.util.QueryUtil;
import com.couchbase.transactions.util.SchedulerUtil;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

public class TransactionsReactive {
    static final int MAX_ATTEMPTS = 100;
    private final TransactionsCleanup cleanup;
    private final TransactionConfig config;
    private EventBusPersistedLogger persistedLogger;

    static TransactionsReactive create(Cluster cluster, TransactionConfig config) {
        return new TransactionsReactive(cluster, config);
    }

    private TransactionsReactive(Cluster cluster, TransactionConfig config) {
        Objects.requireNonNull(cluster);
        Objects.requireNonNull(config);
        ClusterData clusterData = new ClusterData(cluster);
        this.config = config;
        MergedTransactionConfig merged = new MergedTransactionConfig(config, Optional.empty());
        this.cleanup = new TransactionsCleanup(merged, clusterData);
        config.persistentLoggingCollection().ifPresent(collection -> {
            PersistedLogWriter persistedLogWriter = new PersistedLogWriter((Collection)collection, 100000);
            this.persistedLogger = new EventBusPersistedLogger(cluster.environment().eventBus(), persistedLogWriter, merged);
        });
    }

    private Mono<TransactionResult> executeTransaction(Mono<AttemptContextReactive> createAttempt, MergedTransactionConfig config, TransactionContext overall, Function<AttemptContextReactive, Mono<Void>> transactionLogic, boolean singleQueryTransactionMode) {
        AtomicReference startTime = new AtomicReference();
        return createAttempt.publishOn(SchedulerUtil.scheduler).doOnSubscribe(v -> {
            if (startTime.get() == null) {
                startTime.set(System.nanoTime());
            }
        }).doOnNext(ctx -> ctx.LOGGER.info(ctx.attemptId(), "starting attempt %d/%s/%s", overall.numAttempts(), ctx.transactionId(), ctx.attemptId())).flatMap(ctx -> ((Mono)transactionLogic.apply((AttemptContextReactive)ctx)).onErrorResume(err -> Mono.error((Throwable)ctx.convertToOperationFailedIfNeeded((Throwable)err, singleQueryTransactionMode))).then(ctx.implicitCommit(singleQueryTransactionMode)).onErrorResume(err -> ctx.lambdaEnd(this.cleanup, (Throwable)err, singleQueryTransactionMode)).then(ctx.lambdaEnd(this.cleanup, null, singleQueryTransactionMode)).then(ctx.transactionEnd(null, singleQueryTransactionMode)).onErrorResume(err -> {
            if (err instanceof RetryTransaction) {
                return Mono.error((Throwable)err);
            }
            if (err instanceof TransactionFailed) {
                if (config.logOnFailure() && !config.logDirectly()) {
                    EventBus eventBus = this.cleanup.clusterData().cluster().environment().eventBus();
                    overall.LOGGER.logs().forEach(log -> eventBus.publish((Event)new TransactionLogEvent(config.logOnFailureLevel(), TransactionLogEvent.DEFAULT_CATEGORY, log.toString())));
                }
                return Mono.error((Throwable)err);
            }
            return ctx.transactionEnd((Throwable)err, singleQueryTransactionMode);
        }).doOnNext(v -> overall.span().attribute("db.couchbase.transactions.retries", overall.numAttempts()).finish()).doOnError(err -> overall.span().attribute("db.couchbase.transactions.retries", overall.numAttempts()).failWith((Throwable)err))).retryWhen(this.executeCreateRetryWhen(overall)).doOnTerminate(() -> overall.LOGGER.info("finished txn in %dus", TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - (Long)startTime.get())));
    }

    private Retry executeCreateRetryWhen(TransactionContext overall) {
        Predicate<RetryContext> predicate = context -> {
            Throwable exception = context.exception();
            return exception instanceof RetryTransaction;
        };
        return DefaultRetry.create(predicate).exponentialBackoff(Duration.of(1L, ChronoUnit.MILLIS), Duration.of(100L, ChronoUnit.MILLIS)).doOnRetry(v -> overall.LOGGER.info("<>", "retrying transaction after backoff %dmillis", v.backoff().toMillis())).jitter(Jitter.random()).retryMax(100L).toReactorRetry();
    }

    AttemptContextReactive createAttemptContext(TransactionContext overall, MergedTransactionConfig config, String attemptId) {
        return config.attemptContextFactory().create(overall, config, attemptId, this, Optional.of(overall.span()));
    }

    public Mono<TransactionResult> run(Function<AttemptContextReactive, Mono<Void>> transactionLogic, PerTransactionConfig perConfig) {
        return Mono.defer(() -> {
            MergedTransactionConfig merged = new MergedTransactionConfig(this.config, Optional.ofNullable(perConfig));
            TransactionContext overall = new TransactionContext(this.cleanup.clusterData().cluster().environment().requestTracer(), this.cleanup.clusterData().cluster().environment().eventBus(), UUID.randomUUID().toString(), TransactionsReactive.now(), Duration.ZERO, merged);
            overall.LOGGER.info(TransactionsReactive.configDebug(this.config, perConfig, this.cleanup.clusterData().cluster().core()));
            Mono createAttempt = Mono.fromCallable(() -> {
                String attemptId = UUID.randomUUID().toString();
                return this.createAttemptContext(overall, merged, attemptId);
            });
            Function<AttemptContextReactive, Mono<Void>> runLogic = ctx -> Mono.defer(() -> (Mono)transactionLogic.apply((AttemptContextReactive)ctx));
            return this.executeTransaction((Mono<AttemptContextReactive>)createAttempt, merged, overall, runLogic, false);
        });
    }

    private void logElidedStacktrace(AttemptContextReactive ctx, Throwable err) {
        ctx.LOGGER.info(ctx.attemptId(), DebugUtil.createElidedStacktrace(err));
    }

    private static String configDebug(TransactionConfig config, @Nullable PerTransactionConfig perConfig, Core core) {
        StringBuilder sb = new StringBuilder();
        sb.append("library version: ");
        sb.append(TransactionsReactive.class.getPackage().getImplementationVersion());
        sb.append(" SDK version: ");
        sb.append(core.context().environment().clientVersion().orElse("-"));
        sb.append(" config: ");
        sb.append("atrs=");
        sb.append(config.numAtrs());
        sb.append(", metadataCollection=");
        sb.append(config.metadataCollection());
        sb.append(", expiry=");
        if (perConfig != null) {
            sb.append(perConfig.expirationTime().orElse(config.transactionExpirationTime()).toMillis());
        } else {
            sb.append(config.transactionExpirationTime().toMillis());
        }
        sb.append("msecs durability=");
        sb.append(config.durabilityLevel());
        if (perConfig != null) {
            sb.append(" per-txn config=");
            sb.append(" durability=");
            sb.append(perConfig.durabilityLevel());
        }
        sb.append(", supported=");
        sb.append(Supported.SUPPORTED);
        return sb.toString();
    }

    public Mono<TransactionResult> run(Function<AttemptContextReactive, Mono<Void>> transactionLogic) {
        return this.run(transactionLogic, PerTransactionConfigBuilder.create().build());
    }

    @Stability.Volatile
    public Mono<TransactionResult> commit(TransactionSerializedContext serialized, PerTransactionConfig perConfig) {
        return this.deferred(serialized, perConfig, ctx -> Mono.empty());
    }

    @Stability.Volatile
    public Mono<TransactionResult> rollback(TransactionSerializedContext serialized, PerTransactionConfig perConfig) {
        return this.deferred(serialized, perConfig, ctx -> ctx.rollback());
    }

    @Stability.Volatile
    private Mono<TransactionResult> deferred(TransactionSerializedContext serialized, PerTransactionConfig perConfig, Function<AttemptContextReactive, Mono<Void>> initial) {
        MergedTransactionConfig merged = new MergedTransactionConfig(this.config, Optional.ofNullable(perConfig));
        JsonObject hydrated = JsonObject.fromJson((String)serialized.encodeAsString());
        String atrBucket = hydrated.getString("atrBucket");
        String atrScope = hydrated.getString("atrScope");
        String atrCollectionName = hydrated.getString("atrCollection");
        String atrId = hydrated.getString("atrId");
        ReactiveCollection atrCollection = this.cleanup.clusterData().getBucketFromName(atrBucket).scope(atrScope).collection(atrCollectionName);
        return ActiveTransactionRecord.getAtr(atrCollection, atrId, OptionsWrapperUtil.kvTimeoutNonMutating(merged, atrCollection.core()), null).flatMap(atrOpt -> {
            if (!atrOpt.isPresent()) {
                return Mono.error((Throwable)new IllegalStateException(String.format("ATR %s/%s could not be found", atrBucket, atrId)));
            }
            ATR atr = (ATR)atrOpt.get();
            Duration currentTimeServer = Duration.ofNanos(atr.cas());
            Duration startTimeServer = Duration.ofMillis(hydrated.getLong("startTimeServerMillis"));
            Duration timeElapsed = currentTimeServer.minus(startTimeServer);
            TransactionContext overall = new TransactionContext(this.cleanup.clusterData().cluster().environment().requestTracer(), this.cleanup.clusterData().cluster().environment().eventBus(), UUID.randomUUID().toString(), Duration.ofNanos(System.nanoTime()), timeElapsed, merged);
            overall.LOGGER.info("elapsed time = %dmsecs (ATR start time %dmsecs, current ATR time %dmsecs)", timeElapsed.toMillis(), startTimeServer.toMillis(), currentTimeServer.toMillis());
            Mono createAttempt = Mono.fromCallable(() -> merged.attemptContextFactory().createFrom(hydrated, overall, merged, this));
            Function<AttemptContextReactive, Mono<Void>> runLogic = ctx -> Mono.defer(() -> (Mono)initial.apply((AttemptContextReactive)ctx));
            return this.executeTransaction((Mono<AttemptContextReactive>)createAttempt, merged, overall, runLogic, false);
        });
    }

    Mono<TransactionResult> runBlocking(Consumer<AttemptContext> txnLogic, PerTransactionConfig perConfig) {
        return Mono.defer(() -> {
            MergedTransactionConfig merged = new MergedTransactionConfig(this.config, Optional.ofNullable(perConfig));
            TransactionContext overall = new TransactionContext(this.cleanup.clusterData().cluster().environment().requestTracer(), this.cleanup.clusterData().cluster().environment().eventBus(), UUID.randomUUID().toString(), TransactionsReactive.now(), Duration.ZERO, merged);
            overall.LOGGER.info(TransactionsReactive.configDebug(this.config, perConfig, this.cleanup.clusterData().cluster().core()));
            Mono createAttempt = Mono.defer(() -> {
                String attemptId = UUID.randomUUID().toString();
                return Mono.just((Object)this.createAttemptContext(overall, merged, attemptId));
            });
            Function<AttemptContextReactive, Mono<Void>> transactionLogic = ctx -> Mono.defer(() -> {
                AttemptContext ctxBlocking = new AttemptContext((AttemptContextReactive)ctx);
                return Mono.fromRunnable(() -> txnLogic.accept(ctxBlocking)).then();
            });
            return this.executeTransaction((Mono<AttemptContextReactive>)createAttempt, merged, overall, transactionLogic, false);
        });
    }

    public TransactionConfig config() {
        return this.config;
    }

    private static Duration now() {
        return Duration.of(System.nanoTime(), ChronoUnit.NANOS);
    }

    TransactionsCleanup cleanup() {
        return this.cleanup;
    }

    @Stability.Uncommitted
    public Mono<ReactiveSingleQueryTransactionResult> query(String statement) {
        return this.query(null, statement, SingleQueryTransactionConfigBuilder.create().build());
    }

    @Stability.Uncommitted
    public Mono<ReactiveSingleQueryTransactionResult> query(String statement, SingleQueryTransactionConfig queryOptions) {
        return this.query(null, statement, queryOptions);
    }

    @Stability.Uncommitted
    public Mono<ReactiveSingleQueryTransactionResult> query(ReactiveScope scope, String statement) {
        return this.query(scope, statement, SingleQueryTransactionConfigBuilder.create().build());
    }

    @Stability.Uncommitted
    public Mono<ReactiveSingleQueryTransactionResult> query(ReactiveScope scope, String statement, SingleQueryTransactionConfig queryOptions) {
        return Mono.defer(() -> {
            MergedTransactionConfig merged = MergedTransactionConfig.createFromSingleQueryTransactionConfig(this.config, Optional.ofNullable(queryOptions));
            TransactionContext overall = new TransactionContext(this.cleanup.clusterData().cluster().environment().requestTracer(), this.cleanup.clusterData().cluster().environment().eventBus(), UUID.randomUUID().toString(), TransactionsReactive.now(), Duration.ZERO, merged);
            overall.LOGGER.info(TransactionsReactive.configDebug(this.config, null, this.cleanup.clusterData().cluster().core()));
            Mono createAttempt = Mono.fromCallable(() -> {
                String attemptId = UUID.randomUUID().toString();
                return this.createAttemptContext(overall, merged, attemptId);
            });
            AtomicReference qr = new AtomicReference();
            Function<AttemptContextReactive, Mono<Void>> runLogic = ctx -> Mono.defer(() -> ctx.query(scope, statement, queryOptions.queryOptions(), true).doOnNext(ret -> qr.set(ret)).then());
            Consumer<Throwable> errorHandler = TransactionsReactive.singleQueryHandleErrorDuringRowStreaming(overall);
            return this.executeTransaction((Mono<AttemptContextReactive>)createAttempt, merged, overall, runLogic, true).map(result -> {
                ReactiveQueryResult queryResponse = SDKAccessUtil.createReactiveQueryResult(overall.LOGGER, (ReactiveQueryResult)qr.get(), errorHandler, this.cleanup.clusterData().cluster().environment().jsonSerializer());
                return new ReactiveSingleQueryTransactionResult(result.log(), queryResponse);
            });
        });
    }

    private static Consumer<Throwable> singleQueryHandleErrorDuringRowStreaming(TransactionContext overall) {
        Consumer<Throwable> errorHandler = err -> {
            TransactionFailed ret;
            RuntimeException converted = QueryUtil.convertQueryError(err);
            overall.LOGGER.warn("", "got error on rows stream %s, converted from %s", DebugUtil.dbg(converted), DebugUtil.dbg(err));
            TransactionResult result = new TransactionResult(overall.attempts(), overall.LOGGER, Duration.of(System.nanoTime() - overall.startTimeClient().toNanos(), ChronoUnit.NANOS), overall.transactionId(), overall.serialized());
            if (converted instanceof TransactionOperationFailed) {
                TransactionOperationFailed tof = (TransactionOperationFailed)converted;
                switch (tof.toRaise()) {
                    case TRANSACTION_FAILED_POST_COMMIT: 
                    case TRANSACTION_SUCCESS: {
                        ret = new TransactionFailed(result);
                        break;
                    }
                    case TRANSACTION_EXPIRED: {
                        String msg = "Transaction has expired configured timeout of " + overall.expirationTime().toMillis() + "msecs.  The transaction is not committed.";
                        ret = new UnambiguousTimeoutException(msg, null);
                        break;
                    }
                    case TRANSACTION_COMMIT_AMBIGUOUS: {
                        String msg = "It is ambiguous whether the transaction committed";
                        ret = new TransactionCommitAmbiguous((Throwable)err, result, msg);
                        break;
                    }
                    default: {
                        ret = new TransactionFailed((Throwable)err, result);
                        break;
                    }
                }
            } else {
                ret = new TransactionFailed(result);
            }
            throw ret;
        };
        return errorHandler;
    }

    SingleQueryTransactionResult queryBlocking(@Nullable Scope scope, String statement, SingleQueryTransactionConfig queryOptions) {
        return (SingleQueryTransactionResult)Mono.defer(() -> {
            MergedTransactionConfig merged = MergedTransactionConfig.createFromSingleQueryTransactionConfig(this.config, Optional.ofNullable(queryOptions));
            TransactionContext overall = new TransactionContext(this.cleanup.clusterData().cluster().environment().requestTracer(), this.cleanup.clusterData().cluster().environment().eventBus(), UUID.randomUUID().toString(), TransactionsReactive.now(), Duration.ZERO, merged);
            overall.LOGGER.info(TransactionsReactive.configDebug(this.config, null, this.cleanup.clusterData().cluster().core()));
            Mono createAttempt = Mono.fromCallable(() -> {
                String attemptId = UUID.randomUUID().toString();
                return this.createAttemptContext(overall, merged, attemptId);
            });
            AtomicReference qr = new AtomicReference();
            Function<AttemptContextReactive, Mono<Void>> runLogic = ctx -> Mono.defer(() -> ctx.queryBlocking(scope, statement, queryOptions.queryOptions(), true).doOnNext(ret -> qr.set(ret)).then());
            return this.executeTransaction((Mono<AttemptContextReactive>)createAttempt, merged, overall, runLogic, true).map(result -> new SingleQueryTransactionResult(result.log(), (QueryResult)qr.get(), result.unstagingComplete()));
        }).block();
    }
}

