/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.transactions;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.retry.reactor.DefaultRetry;
import com.couchbase.client.core.retry.reactor.Jitter;
import com.couchbase.client.core.retry.reactor.RetryContext;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.Collection;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.transactions.AttemptContext;
import com.couchbase.transactions.AttemptContextReactive;
import com.couchbase.transactions.TransactionAttempt;
import com.couchbase.transactions.TransactionContext;
import com.couchbase.transactions.TransactionResult;
import com.couchbase.transactions.cleanup.ClusterData;
import com.couchbase.transactions.cleanup.TransactionsCleanup;
import com.couchbase.transactions.components.ATR;
import com.couchbase.transactions.components.ActiveTransactionRecord;
import com.couchbase.transactions.config.PerTransactionConfig;
import com.couchbase.transactions.config.PerTransactionConfigBuilder;
import com.couchbase.transactions.config.TransactionConfig;
import com.couchbase.transactions.deferred.TransactionSerializedContext;
import com.couchbase.transactions.error.TransactionCommitAmbiguous;
import com.couchbase.transactions.error.TransactionExpired;
import com.couchbase.transactions.error.TransactionFailed;
import com.couchbase.transactions.error.external.TransactionOperationFailed;
import com.couchbase.transactions.error.internal.ErrorClasses;
import com.couchbase.transactions.error.internal.TransactionOperationFailedBuilder;
import com.couchbase.transactions.forwards.Supported;
import com.couchbase.transactions.log.EventBusPersistedLogger;
import com.couchbase.transactions.log.PersistedLogWriter;
import com.couchbase.transactions.log.TransactionLogEvent;
import com.couchbase.transactions.support.AttemptContextFactory;
import com.couchbase.transactions.support.AttemptStates;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import com.couchbase.transactions.util.DebugUtil;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

public class TransactionsReactive {
    static final int MAX_ATTEMPTS = 1000;
    private final TransactionsCleanup cleanup;
    private final TransactionConfig config;
    private AttemptContextFactory attemptContextFactory;
    private EventBusPersistedLogger persistedLogger;

    static TransactionsReactive create(Cluster cluster, TransactionConfig config) {
        return new TransactionsReactive(cluster, config);
    }

    private TransactionsReactive(Cluster cluster, TransactionConfig config) {
        Objects.requireNonNull(cluster);
        Objects.requireNonNull(config);
        ClusterData clusterData = new ClusterData(cluster);
        this.config = config;
        this.attemptContextFactory = config.attemptContextFactory();
        this.cleanup = new TransactionsCleanup(this.config, clusterData);
        config.persistentLoggingCollection().ifPresent(collection -> {
            PersistedLogWriter persistedLogWriter = new PersistedLogWriter((Collection)collection, 100000);
            this.persistedLogger = new EventBusPersistedLogger(cluster.environment().eventBus(), persistedLogWriter, config);
        });
    }

    private Mono<TransactionResult> executeTransaction(TransactionConfig config, TransactionContext overall, Mono<AttemptContextReactive> transactionLogic) {
        AtomicReference<Long> startTime = new AtomicReference<Long>(0L);
        return Mono.just((Object)overall).subscribeOn(Schedulers.elastic()).doOnSubscribe(v -> startTime.set(System.nanoTime())).then(transactionLogic).flatMap(this::executeImplicitCommit).doOnNext(ctx -> this.executeAddAttemptAndCleanupRequest(config, overall, (AttemptContextReactive)ctx)).onErrorResume(err -> this.executeHandleErrorsPreRetry(config, overall, (Throwable)err)).retryWhen(this.executeCreateRetryWhen(overall)).onErrorResume(err -> this.executeHandleErrorsPostRetry(overall, (Throwable)err)).doOnError(err -> {
            if (config.logOnFailure() && !config.logDirectly()) {
                EventBus eventBus = this.cleanup.clusterData().cluster().environment().eventBus();
                overall.LOGGER.logs().forEach(log -> eventBus.publish((Event)new TransactionLogEvent(config.logOnFailureLevel(), TransactionLogEvent.DEFAULT_CATEGORY, log.toString())));
            }
        }).doOnSuccess(v -> overall.LOGGER.info("finished txn in %dms", (System.nanoTime() - (Long)startTime.get()) / 1000000L)).single().map(v -> TransactionsReactive.createResultFromContext(overall)).doFinally(v -> overall.span().ifPresent(SpanWrapper::finish));
    }

    private Retry executeCreateRetryWhen(TransactionContext overall) {
        Predicate<RetryContext> predicate = context -> {
            Throwable exception = context.exception();
            if (!(exception instanceof TransactionOperationFailed)) {
                throw new IllegalStateException("Non-TransactionOperationFailed '" + DebugUtil.dbg(exception) + "' received during retry, this is a bug");
            }
            TransactionOperationFailed e = (TransactionOperationFailed)exception;
            overall.LOGGER.info("TransactionOperationFailed retryTransaction=%s", e.retryTransaction());
            return e.retryTransaction();
        };
        return DefaultRetry.create(predicate).exponentialBackoff(Duration.of(1L, ChronoUnit.MILLIS), Duration.of(100L, ChronoUnit.MILLIS)).doOnRetry(v -> overall.LOGGER.info("<>", "retrying transaction after backoff %dmillis", v.backoff().toMillis())).jitter(Jitter.random()).retryMax(1000L).toReactorRetry();
    }

    private Mono<AttemptContextReactive> executeHandleErrorsPreRetry(TransactionConfig config, TransactionContext overall, Throwable err) {
        if (!(err instanceof TransactionOperationFailed)) {
            overall.LOGGER.warn("<>", "received non-TransactionOperationFailed error %s, unable to rollback as don't have context", err.getClass().getSimpleName());
            return Mono.error((Throwable)new IllegalStateException());
        }
        Mono<Void> autoRollback = Mono.empty();
        Mono cleanupReq = Mono.empty();
        TransactionOperationFailed e = (TransactionOperationFailed)err;
        AttemptContextReactive ctx = e.context();
        overall.LOGGER.info("<>", "finishing attempt off after error '%s'", e);
        if (e.autoRollbackAttempt()) {
            if (ctx.state() == AttemptStates.NOT_STARTED && !ctx.queryMode()) {
                ctx.LOGGER.info(ctx.attemptId(), "told to auto-rollback but in NOT_STARTED state, so nothing to do - skipping rollback");
            } else {
                ctx.LOGGER.info(ctx.attemptId(), "auto-rolling-back on error");
                autoRollback = ctx.rollbackInternal(false);
            }
        } else {
            ctx.LOGGER.info(ctx.attemptId(), "has been told to skip auto-rollback");
        }
        if (!config.runRegularAttemptsCleanupThread()) {
            ctx.LOGGER.trace(ctx.attemptId(), "skipping addition of cleanup request on failure as regular cleanup disabled");
        } else {
            cleanupReq = Mono.fromRunnable(() -> this.addCleanupRequestForContext(ctx));
        }
        Mono addAttempt = Mono.fromRunnable(() -> {
            TransactionAttempt ta = TransactionAttempt.createFromContext(ctx, Optional.of(err));
            overall.addAttempt(ta);
            ctx.LOGGER.info(ctx.attemptId(), "added attempt %s after error", ta);
            ctx.span().finish();
        });
        Mono cleanupReqForLambda = cleanupReq;
        return autoRollback.onErrorResume(er -> {
            overall.LOGGER.info("<>", "rollback failed with '%s' %s, raising original error but with retryTransaction turned off", er.getClass().getSimpleName(), er.getMessage());
            return cleanupReqForLambda.then(addAttempt).then(Mono.error((Throwable)TransactionOperationFailedBuilder.createError(e.context(), e.causingErrorClass()).raiseException(e.toRaise()).cause(e.getCause()).build()));
        }).then(cleanupReqForLambda).then(addAttempt).then(Mono.defer(() -> {
            if (e.retryTransaction() && overall.hasExpiredClientSide(config)) {
                overall.LOGGER.info("<>", "original error planned to retry transaction, but it has subsequently expired");
                return Mono.error((Throwable)TransactionOperationFailedBuilder.createError(ctx, ErrorClasses.FAIL_EXPIRY).doNotRollbackAttempt().raiseException(TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_EXPIRED).build());
            }
            overall.LOGGER.info("<>", "reraising original exception %s", DebugUtil.dbg(err));
            return Mono.error((Throwable)err);
        }));
    }

    private Mono<AttemptContextReactive> executeHandleErrorsPostRetry(TransactionContext overall, Throwable err) {
        TransactionFailed ret;
        if (!(err instanceof TransactionOperationFailed)) {
            return Mono.error((Throwable)new IllegalStateException("Non-TransactionOperationFailed '" + DebugUtil.dbg(err) + "' received, this is a bug"));
        }
        TransactionResult result = TransactionsReactive.createResultFromContext(overall);
        TransactionOperationFailed e = (TransactionOperationFailed)err;
        if (e.toRaise() == TransactionOperationFailed.FinalErrorToRaise.TRANSACTION_FAILED_POST_COMMIT) {
            e.context().LOGGER.info(e.context().attemptId(), "converted TRANSACTION_FAILED_POST_COMMIT to success, unstagingComplete() will be false");
            return Mono.just((Object)e.context());
        }
        switch (e.toRaise()) {
            case TRANSACTION_EXPIRED: {
                String msg = "Transaction has expired configured timeout of " + this.config.transactionExpirationTime().toMillis() + "msecs.  The transaction is not committed.";
                ret = new TransactionExpired(e.getCause(), result, msg);
                break;
            }
            case TRANSACTION_COMMIT_AMBIGUOUS: {
                String msg = "It is ambiguous whether the transaction committed";
                ret = new TransactionCommitAmbiguous(e.getCause(), result, msg);
                break;
            }
            default: {
                ret = new TransactionFailed(e.getCause(), result);
            }
        }
        e.context().LOGGER.info(e.context().attemptId(), "converted TransactionOperationFailed %s to final error %s", new Object[]{e.toRaise(), ret});
        return Mono.error((Throwable)ret);
    }

    private void executeAddAttemptAndCleanupRequest(TransactionConfig config, TransactionContext overall, AttemptContextReactive ctx) {
        TransactionAttempt ta = TransactionAttempt.createFromContext(ctx, Optional.empty());
        overall.addAttempt(ta);
        ctx.LOGGER.info(ctx.attemptId(), "added attempt %s after success", ta);
        if (config.runRegularAttemptsCleanupThread()) {
            this.addCleanupRequestForContext(ctx);
        } else {
            ctx.LOGGER.trace(ctx.attemptId(), "skipping addition of cleanup request on success");
        }
        ctx.span().finish();
    }

    private Mono<AttemptContextReactive> executeImplicitCommit(AttemptContextReactive ctx) {
        return Mono.defer(() -> {
            if (!ctx.isDone()) {
                if (ctx.serialized().isPresent()) {
                    return Mono.just((Object)ctx);
                }
                ctx.LOGGER.trace(ctx.attemptId(), "doing implicit commit");
                return ctx.commit().then(Mono.just((Object)ctx)).onErrorResume(err -> Mono.error((Throwable)TransactionOperationFailed.convertToOperationFailedIfNeeded(err, ctx, TransactionsReactive.createResultFromContext(ctx.overall()))));
            }
            return Mono.just((Object)ctx);
        });
    }

    AttemptContextReactive createAttemptContext(TransactionContext overall, TransactionConfig config, String attemptId) {
        if (overall != null) {
            return this.attemptContextFactory.create(overall, config, attemptId, this, overall.span());
        }
        return null;
    }

    public Mono<TransactionResult> run(Function<AttemptContextReactive, Mono<Void>> transactionLogic, PerTransactionConfig perConfig) {
        return Mono.defer(() -> {
            TransactionContext overall = new TransactionContext(this.cleanup.clusterData().cluster().environment().eventBus(), UUID.randomUUID().toString(), TransactionsReactive.now(), Duration.ZERO, this.config, perConfig);
            AtomicReference<Long> startTime = new AtomicReference<Long>(0L);
            Mono ob = Mono.fromCallable(() -> {
                String txnId = UUID.randomUUID().toString();
                return this.createAttemptContext(overall, this.config, txnId);
            }).flatMap(ctx -> {
                ctx.LOGGER.info("starting attempt %d/%s/%s", overall.numAttempts(), ctx.transactionId(), ctx.attemptId());
                ctx.LOGGER.info(TransactionsReactive.configDebug(this.config, perConfig));
                Mono result = (Mono)transactionLogic.apply((AttemptContextReactive)ctx);
                return result.onErrorResume(err -> {
                    ctx.LOGGER.info(ctx.attemptId(), "caught exception '%s' in async, rethrowing", err);
                    this.logElidedStacktrace((AttemptContextReactive)ctx, (Throwable)err);
                    return Mono.error((Throwable)TransactionOperationFailed.convertToOperationFailedIfNeeded(err, ctx, TransactionsReactive.createResultFromContext(overall)));
                }).thenReturn(ctx);
            }).doOnSubscribe(v -> startTime.set(System.nanoTime())).doOnNext(v -> v.LOGGER.trace(v.attemptId(), "finished attempt %d in %sms", overall.numAttempts(), (System.nanoTime() - (Long)startTime.get()) / 1000000L));
            return this.executeTransaction(this.config, overall, (Mono<AttemptContextReactive>)ob);
        });
    }

    private void logElidedStacktrace(AttemptContextReactive ctx, Throwable err) {
        StackTraceElement[] st;
        for (StackTraceElement s : st = err.getStackTrace()) {
            String str = s.toString();
            if (str.startsWith("reactor.") || str.startsWith("java.") || str.startsWith("com.couchbase.client.core")) continue;
            ctx.LOGGER.info(ctx.attemptId(), "          " + s.toString());
        }
    }

    private static String configDebug(TransactionConfig config, PerTransactionConfig perConfig) {
        StringBuilder sb = new StringBuilder();
        sb.append("config: ");
        sb.append("atrs=");
        sb.append(config.numAtrs());
        sb.append(", expiry=");
        sb.append(config.transactionExpirationTime().toMillis());
        sb.append("msecs durability=");
        sb.append(config.durabilityLevel());
        sb.append(" per-txn config=");
        sb.append(" durability=");
        sb.append(perConfig.durabilityLevel());
        sb.append(", supported=");
        sb.append(Supported.SUPPORTED);
        return sb.toString();
    }

    public Mono<TransactionResult> run(Function<AttemptContextReactive, Mono<Void>> transactionLogic) {
        return this.run(transactionLogic, PerTransactionConfigBuilder.create().build());
    }

    @Stability.Volatile
    public Mono<TransactionResult> commit(TransactionSerializedContext serialized, PerTransactionConfig perConfig) {
        return this.deferred(serialized, perConfig, ctx -> Mono.empty());
    }

    @Stability.Volatile
    public Mono<TransactionResult> rollback(TransactionSerializedContext serialized, PerTransactionConfig perConfig) {
        return this.deferred(serialized, perConfig, ctx -> ctx.rollback());
    }

    @Stability.Volatile
    private Mono<TransactionResult> deferred(TransactionSerializedContext serialized, PerTransactionConfig perConfig, Function<AttemptContextReactive, Mono<Void>> initial) {
        JsonObject hydrated = JsonObject.fromJson((String)serialized.encodeAsString());
        String atrBucket = hydrated.getString("atrBucket");
        String atrScope = hydrated.getString("atrScope");
        String atrCollectionName = hydrated.getString("atrCollection");
        String atrId = hydrated.getString("atrId");
        ReactiveCollection atrCollection = this.cleanup.clusterData().getBucketFromName(atrBucket).scope(atrScope).collection(atrCollectionName);
        return ActiveTransactionRecord.getAtr(atrCollection, atrId, OptionsWrapperUtil.kvTimeoutNonMutating(this.config, atrCollection.core()), null).flatMap(atrOpt -> {
            if (!atrOpt.isPresent()) {
                return Mono.error((Throwable)new IllegalStateException(String.format("ATR %s/%s could not be found", atrBucket, atrId)));
            }
            ATR atr = (ATR)atrOpt.get();
            Duration currentTimeServer = Duration.ofNanos(atr.cas());
            Duration startTimeServer = Duration.ofMillis(hydrated.getLong("startTimeServerMillis"));
            Duration timeElapsed = currentTimeServer.minus(startTimeServer);
            TransactionContext overall = new TransactionContext(this.cleanup.clusterData().cluster().environment().eventBus(), UUID.randomUUID().toString(), Duration.ofNanos(System.nanoTime()), timeElapsed, this.config, perConfig);
            AtomicReference<Long> startTime = new AtomicReference<Long>(0L);
            overall.LOGGER.info("elapsed time = %dmsecs (ATR start time %dmsecs, current ATR time %dmsecs)", timeElapsed.toMillis(), startTimeServer.toMillis(), currentTimeServer.toMillis());
            Mono ob = Mono.defer(() -> {
                AttemptContextReactive ctx = this.attemptContextFactory.createFrom(hydrated, overall, this.config, this);
                ctx.LOGGER.info("starting attempt %d/%s/%s", overall.numAttempts(), ctx.transactionId(), ctx.attemptId());
                ctx.LOGGER.info(TransactionsReactive.configDebug(this.config, perConfig));
                return ((Mono)initial.apply(ctx)).subscribeOn(Schedulers.elastic()).onErrorResume(err -> {
                    ctx.LOGGER.info(ctx.attemptId(), "caught exception '%s' in deferred, rethrowing", err);
                    this.logElidedStacktrace(ctx, (Throwable)err);
                    return Mono.error((Throwable)TransactionOperationFailed.convertToOperationFailedIfNeeded(err, ctx, TransactionsReactive.createResultFromContext(overall)));
                }).doOnSubscribe(v -> startTime.set(System.nanoTime())).doOnNext(v -> ctx.LOGGER.trace(ctx.attemptId(), "finished attempt %d in %sms", overall.numAttempts(), (System.nanoTime() - (Long)startTime.get()) / 1000000L)).thenReturn((Object)ctx);
            });
            return this.executeTransaction(this.config, overall, (Mono<AttemptContextReactive>)ob);
        });
    }

    Mono<TransactionResult> runBlocking(Consumer<AttemptContext> txnLogic, PerTransactionConfig perConfig) {
        return Mono.defer(() -> {
            TransactionContext overall = new TransactionContext(this.cleanup.clusterData().cluster().environment().eventBus(), UUID.randomUUID().toString(), TransactionsReactive.now(), Duration.ZERO, this.config, perConfig);
            AtomicReference<Long> startTime = new AtomicReference<Long>(0L);
            Mono ob = Mono.defer(() -> {
                String txnId = UUID.randomUUID().toString();
                AttemptContextReactive ctx = this.createAttemptContext(overall, this.config, txnId);
                AttemptContext ctxBlocking = new AttemptContext(ctx);
                ctx.LOGGER.info("starting attempt %d/%s/%s", overall.numAttempts(), ctx.transactionId(), ctx.attemptId());
                ctx.LOGGER.info(TransactionsReactive.configDebug(this.config, perConfig));
                return Mono.fromRunnable(() -> txnLogic.accept(ctxBlocking)).subscribeOn(Schedulers.elastic()).onErrorResume(err -> {
                    ctx.LOGGER.info(ctx.attemptId(), "caught exception '%s' in runBlocking, rethrowing", err);
                    this.logElidedStacktrace(ctx, (Throwable)err);
                    return Mono.error((Throwable)TransactionOperationFailed.convertToOperationFailedIfNeeded(err, ctx, TransactionsReactive.createResultFromContext(overall)));
                }).doOnSubscribe(v -> startTime.set(System.nanoTime())).doOnNext(v -> ctx.LOGGER.trace(ctx.attemptId(), "finished attempt %d in %sms", overall.numAttempts(), (System.nanoTime() - (Long)startTime.get()) / 1000000L)).thenReturn((Object)ctx);
            });
            return this.executeTransaction(this.config, overall, (Mono<AttemptContextReactive>)ob);
        });
    }

    public TransactionConfig config() {
        return this.config;
    }

    private static Duration now() {
        return Duration.of(System.nanoTime(), ChronoUnit.NANOS);
    }

    TransactionsCleanup cleanup() {
        return this.cleanup;
    }

    private void addCleanupRequestForContext(AttemptContextReactive ctx) {
        if (ctx.queryMode()) {
            ctx.LOGGER.trace(ctx.attemptId(), "Skipping cleanup request as in query mode");
        } else if (ctx.atrId().isPresent() && ctx.atrCollection().isPresent()) {
            if (ctx.state() == AttemptStates.NOT_STARTED) {
                ctx.LOGGER.trace(ctx.attemptId(), "Skipping addition of cleanup request, as NOT_STARTED");
            } else {
                ctx.LOGGER.trace(ctx.attemptId(), "Adding cleanup request for %s/%s to run in %d msecs", ctx.atrCollection().get().name(), ctx.atrId().get(), this.config.transactionExpirationTime().toMillis());
                this.cleanup.add(ctx.createCleanupRequest());
            }
        } else {
            ctx.LOGGER.trace(ctx.attemptId(), "Skipping cleanup request as no ATR entry to remove (due to no mutations)");
        }
    }

    private static TransactionResult createResultFromContext(TransactionContext overall) {
        return new TransactionResult(overall.attempts(), overall.LOGGER, Duration.of(System.nanoTime() - overall.startTimeClient().toNanos(), ChronoUnit.NANOS), overall.transactionId(), overall.serialized());
    }

    @Deprecated
    @Stability.Internal
    public void setAttemptContextFactory(AttemptContextFactory factory) {
        this.attemptContextFactory = factory;
    }
}

