/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.extension;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.subscription.BackPressureStrategy;
import io.smallrye.mutiny.subscription.MultiEmitter;
import io.smallrye.reactive.messaging.extension.EmitterConfiguration;
import io.smallrye.reactive.messaging.extension.ThrowingEmitter;
import io.smallrye.reactive.messaging.helpers.BroadcastHelper;
import io.smallrye.reactive.messaging.i18n.ProviderExceptions;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Metadata;
import org.eclipse.microprofile.reactive.messaging.OnOverflow;
import org.reactivestreams.Publisher;

public class EmitterImpl<T>
implements Emitter<T> {
    private final AtomicReference<MultiEmitter<? super Message<? extends T>>> internal = new AtomicReference();
    private final Multi<Message<? extends T>> publisher;
    private final String name;
    private final AtomicReference<Throwable> synchronousFailure = new AtomicReference();

    public EmitterImpl(EmitterConfiguration config, long defaultBufferSize) {
        Multi<Message<T>> tempPublisher;
        this.name = config.name;
        if (defaultBufferSize <= 0L) {
            throw ProviderExceptions.ex.illegalArgumentForDefaultBuffer();
        }
        Consumer<MultiEmitter<? super Message<? extends T>>> deferred = fe -> {
            MultiEmitter previous = this.internal.getAndSet((MultiEmitter<Message<T>>)fe);
            if (previous != null) {
                previous.complete();
            }
        };
        if (config.overflowBufferStrategy == null) {
            Multi multi = Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER);
            tempPublisher = this.getPublisherUsingBufferStrategy(defaultBufferSize, multi);
        } else {
            tempPublisher = this.getPublisherForStrategy(config.overflowBufferStrategy, config.overflowBufferSize, defaultBufferSize, deferred);
        }
        this.publisher = config.broadcast ? (Multi)BroadcastHelper.broadcastPublisher(tempPublisher, config.numberOfSubscriberBeforeConnecting).buildRs() : tempPublisher;
    }

    Multi<Message<? extends T>> getPublisherForStrategy(OnOverflow.Strategy overFlowStrategy, long bufferSize, long defaultBufferSize, Consumer<MultiEmitter<? super Message<? extends T>>> deferred) {
        switch (overFlowStrategy) {
            case BUFFER: {
                if (bufferSize > 0L) {
                    return ThrowingEmitter.create(deferred, bufferSize);
                }
                return ThrowingEmitter.create(deferred, defaultBufferSize);
            }
            case UNBOUNDED_BUFFER: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.BUFFER);
            }
            case THROW_EXCEPTION: {
                return ThrowingEmitter.create(deferred, 0L);
            }
            case DROP: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.DROP);
            }
            case FAIL: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.ERROR);
            }
            case LATEST: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.LATEST);
            }
            case NONE: {
                return Multi.createFrom().emitter(deferred, BackPressureStrategy.IGNORE);
            }
        }
        throw ProviderExceptions.ex.illegalArgumentForBackPressure(overFlowStrategy);
    }

    Multi<Message<? extends T>> getPublisherUsingBufferStrategy(long defaultBufferSize, Multi<Message<? extends T>> stream) {
        int size = (int)defaultBufferSize;
        return stream.on().overflow().buffer(size - 2).onFailure().invoke(t -> this.synchronousFailure.set((Throwable)t));
    }

    public Publisher<Message<? extends T>> getPublisher() {
        return this.publisher;
    }

    boolean isSubscribed() {
        return this.internal.get() != null;
    }

    public synchronized CompletionStage<Void> send(T msg) {
        if (msg == null) {
            throw ProviderExceptions.ex.illegalArgumentForNullValue();
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.emit(Message.of(msg, (Metadata)Metadata.empty(), () -> {
            future.complete(null);
            return CompletableFuture.completedFuture(null);
        }, reason -> {
            future.completeExceptionally((Throwable)reason);
            return CompletableFuture.completedFuture(null);
        }));
        return future;
    }

    private synchronized void emit(Message<? extends T> message) {
        MultiEmitter<Message<T>> emitter = EmitterImpl.verify(this.internal, this.name);
        if (this.synchronousFailure.get() != null) {
            throw ProviderExceptions.ex.illegalStateForEmitter(this.synchronousFailure.get());
        }
        if (emitter.isCancelled()) {
            throw ProviderExceptions.ex.illegalStateForDownstreamCancel();
        }
        emitter.emit(message);
        if (this.synchronousFailure.get() != null) {
            throw ProviderExceptions.ex.illegalStateForEmitterWhileEmitting(this.synchronousFailure.get());
        }
    }

    public synchronized <M extends Message<? extends T>> void send(M msg) {
        if (msg == null) {
            throw ProviderExceptions.ex.illegalArgumentForNullValue();
        }
        this.emit(msg);
    }

    static <T> MultiEmitter<? super Message<? extends T>> verify(AtomicReference<MultiEmitter<? super Message<? extends T>>> reference, String name) {
        MultiEmitter<? super Message<? extends T>> emitter = reference.get();
        if (emitter == null) {
            throw ProviderExceptions.ex.illegalStateForNoSubscriber(name);
        }
        if (emitter.isCancelled()) {
            throw ProviderExceptions.ex.illegalStateForCancelledSubscriber(name);
        }
        return emitter;
    }

    public synchronized void complete() {
        EmitterImpl.verify(this.internal, this.name).complete();
    }

    public synchronized void error(Exception e) {
        if (e == null) {
            throw ProviderExceptions.ex.illegalArgumentForException("null");
        }
        EmitterImpl.verify(this.internal, this.name).fail((Throwable)e);
    }

    public synchronized boolean isCancelled() {
        MultiEmitter<? super Message<? extends T>> emitter = this.internal.get();
        return emitter == null || emitter.isCancelled();
    }

    public boolean hasRequests() {
        MultiEmitter<? super Message<? extends T>> emitter = this.internal.get();
        return !this.isCancelled() && emitter.requested() > 0L;
    }
}

