/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.transactions.util;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.transactions.AttemptContextReactive;
import com.couchbase.transactions.log.TransactionLogger;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

@Stability.Internal
public class MonoBridge<T> {
    private final String dbg;
    @Nullable
    private final TransactionLogger logger;
    private boolean done;
    private final Object syncer;
    private final Sinks.One<T> actual = Sinks.one();
    private final Mono<T> external = this.actual.asMono();

    public MonoBridge(Mono<T> feedFrom, String dbg, AttemptContextReactive syncer, @Nullable TransactionLogger logger) {
        this.dbg = dbg;
        this.logger = logger;
        this.syncer = syncer;
        feedFrom.onErrorResume(err -> {
            AttemptContextReactive attemptContextReactive = syncer;
            synchronized (attemptContextReactive) {
                if (!this.done) {
                    if (logger != null) {
                        logger.info("", "MB: [%s] propagating err %s", dbg, err.toString());
                    }
                    this.actual.tryEmitError(err).orThrow();
                } else if (logger != null) {
                    logger.info("", "MB: [%s] skipping err propagating as done", dbg);
                }
                return Mono.empty();
            }
        }).subscribe(next -> {
            if (!this.done) {
                if (logger != null) {
                    logger.info("", "MB: [%s] propagating next", dbg);
                }
                this.actual.tryEmitValue(next).orThrow();
            } else if (logger != null) {
                logger.info("", "MB: [%s] skipping next propagating as done", dbg);
            }
        }, err -> {
            throw new IllegalStateException("Should not reach MonoBridge error producer");
        }, () -> {
            if (!this.done) {
                if (logger != null) {
                    logger.info("", "MB: [%s] propagating complete", dbg);
                }
                this.actual.tryEmitValue(null);
            } else if (logger != null) {
                logger.info("", "MB: [%s] skipping complete propagating as done", dbg);
            }
        });
        this.external.doOnCancel(() -> {
            if (logger != null) {
                logger.info("", "MB: [%s] is cancelled", dbg);
            }
            this.done = true;
        }).doOnTerminate(() -> {
            if (logger != null) {
                logger.info("", "MB: [%s] is errored or complete", dbg);
            }
            this.done = true;
        });
    }

    public Mono<T> external() {
        return this.external;
    }
}

