/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.extension;

import io.reactivex.BackpressureOverflowStrategy;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EmitterImpl<T>
implements Emitter<T> {
    private final AtomicReference<FlowableEmitter<Message<? extends T>>> internal = new AtomicReference();
    private final Flowable<Message<? extends T>> publisher;
    private static final Logger LOGGER = LoggerFactory.getLogger(EmitterImpl.class);
    private final String name;

    EmitterImpl(String name, String overFlowStrategy, long bufferSize, long defaultBufferSize) {
        this.name = name;
        if (defaultBufferSize <= 0L) {
            throw new IllegalArgumentException("The default buffer size must be strictly positive");
        }
        FlowableOnSubscribe deferred = fe -> {
            if (!this.internal.compareAndSet(null, fe.serialize())) {
                fe.onError((Throwable)new Exception("Emitter already created"));
            }
        };
        this.publisher = overFlowStrategy == null ? EmitterImpl.getPublisherUsingBufferStrategy(name, defaultBufferSize, Flowable.create((FlowableOnSubscribe)deferred, (BackpressureStrategy)BackpressureStrategy.BUFFER)) : EmitterImpl.getPublisherForStrategy(name, overFlowStrategy, bufferSize, defaultBufferSize, deferred);
    }

    static <T> Flowable<Message<? extends T>> getPublisherForStrategy(String name, String overFlowStrategy, long bufferSize, long defaultBufferSize, FlowableOnSubscribe<Message<? extends T>> deferred) {
        OnOverflow.Strategy strategy = OnOverflow.Strategy.valueOf((String)overFlowStrategy);
        switch (strategy) {
            case BUFFER: {
                Flowable p = Flowable.create(deferred, (BackpressureStrategy)BackpressureStrategy.BUFFER);
                if (bufferSize > 0L) {
                    return EmitterImpl.getPublisherUsingBufferStrategy(name, bufferSize, p);
                }
                return EmitterImpl.getPublisherUsingBufferStrategy(name, defaultBufferSize, p);
            }
            case UNBOUNDED_BUFFER: {
                return Flowable.create(deferred, (BackpressureStrategy)BackpressureStrategy.BUFFER);
            }
            case DROP: {
                return Flowable.create(deferred, (BackpressureStrategy)BackpressureStrategy.DROP);
            }
            case FAIL: {
                return Flowable.create(deferred, (BackpressureStrategy)BackpressureStrategy.ERROR);
            }
            case LATEST: {
                return Flowable.create(deferred, (BackpressureStrategy)BackpressureStrategy.LATEST);
            }
            case NONE: {
                return Flowable.create(deferred, (BackpressureStrategy)BackpressureStrategy.MISSING);
            }
        }
        throw new IllegalArgumentException("Invalid back pressure strategy: " + overFlowStrategy);
    }

    static <T> Flowable<Message<? extends T>> getPublisherUsingBufferStrategy(String name, long defaultBufferSize, Flowable<Message<? extends T>> stream) {
        return stream.onBackpressureBuffer(defaultBufferSize - 2L, () -> LOGGER.error("Buffer full for emitter {}", (Object)name), BackpressureOverflowStrategy.ERROR);
    }

    public Publisher<Message<? extends T>> getPublisher() {
        return this.publisher;
    }

    boolean isSubscribed() {
        return this.internal.get() != null;
    }

    public synchronized CompletionStage<Void> send(T msg) {
        if (msg == null) {
            throw new IllegalArgumentException("`null` is not a valid value");
        }
        FlowableEmitter<Message<? extends T>> emitter = EmitterImpl.verify(this.internal, this.name);
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        emitter.onNext((Object)Message.of(msg, () -> {
            future.complete(null);
            return future;
        }));
        return future;
    }

    public synchronized <M extends Message<? extends T>> void send(M msg) {
        if (msg == null) {
            throw new IllegalArgumentException("`null` is not a valid value");
        }
        FlowableEmitter<Message<? extends T>> emitter = EmitterImpl.verify(this.internal, this.name);
        emitter.onNext(msg);
    }

    static <T> FlowableEmitter<Message<? extends T>> verify(AtomicReference<FlowableEmitter<Message<? extends T>>> reference, String name) {
        FlowableEmitter<Message<? extends T>> emitter = reference.get();
        if (emitter == null) {
            throw new IllegalStateException("No subscriber found for the channel " + name);
        }
        if (emitter.isCancelled()) {
            throw new IllegalStateException("The subscription to " + name + " has been cancelled");
        }
        return emitter;
    }

    public synchronized void complete() {
        EmitterImpl.verify(this.internal, this.name).onComplete();
    }

    public synchronized void error(Exception e) {
        if (e == null) {
            throw new IllegalArgumentException("`null` is not a valid exception");
        }
        EmitterImpl.verify(this.internal, this.name).onError((Throwable)e);
    }

    public synchronized boolean isCancelled() {
        FlowableEmitter<Message<? extends T>> emitter = this.internal.get();
        return emitter == null || emitter.isCancelled();
    }

    public boolean isRequested() {
        FlowableEmitter<Message<? extends T>> emitter = this.internal.get();
        return !this.isCancelled() && emitter.requested() > 0L;
    }
}

