/*
 * Decompiled with CFR 0.152.
 */
package com.linecorp.armeria.common.stream;

import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.common.stream.AbortingSubscriber;
import com.linecorp.armeria.common.stream.CancelledSubscriptionException;
import com.linecorp.armeria.common.stream.NeverInvokedSubscriber;
import com.linecorp.armeria.common.stream.NoopSubscription;
import com.linecorp.armeria.common.stream.SignalLengthGetter;
import com.linecorp.armeria.common.stream.StreamMessage;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.internal.shaded.guava.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public abstract class AbstractStreamMessageDuplicator<T, U extends StreamMessage<T>>
implements SafeCloseable {
    private static final CompletableFuture[] EMPTY_FUTURES = new CompletableFuture[0];
    private final StreamMessageProcessor<T> processor;
    private final EventExecutor duplicatorExecutor;

    protected AbstractStreamMessageDuplicator(U publisher, SignalLengthGetter<? super T> signalLengthGetter, @Nullable EventExecutor executor, long maxSignalLength) {
        Objects.requireNonNull(publisher, "publisher");
        Objects.requireNonNull(signalLengthGetter, "signalLengthGetter");
        Preconditions.checkArgument(maxSignalLength >= 0L, "maxSignalLength: %s (expected: >= 0)", maxSignalLength);
        this.duplicatorExecutor = executor != null ? executor : (EventExecutor)RequestContext.mapCurrent(RequestContext::eventLoop, () -> CommonPools.workerGroup().next());
        this.processor = new StreamMessageProcessor(publisher, signalLengthGetter, this.duplicatorExecutor, maxSignalLength);
    }

    public U duplicateStream() {
        return this.duplicateStream(false);
    }

    public U duplicateStream(boolean lastStream) {
        if (!this.processor.isDuplicable()) {
            throw new IllegalStateException("duplicator is closed or last downstream is added.");
        }
        return this.doDuplicateStream(new ChildStreamMessage<T>(this, this.processor, lastStream));
    }

    protected abstract U doDuplicateStream(StreamMessage<T> var1);

    protected EventExecutor duplicatorExecutor() {
        return this.duplicatorExecutor;
    }

    @Override
    public void close() {
        this.processor.close();
    }

    static class SignalQueue {
        private static final AtomicIntegerFieldUpdater<SignalQueue> lastRemovalRequestedOffsetUpdater = AtomicIntegerFieldUpdater.newUpdater(SignalQueue.class, "lastRemovalRequestedOffset");
        private final SignalLengthGetter<Object> signalLengthGetter;
        @Nullable
        volatile Object[] elements;
        private volatile int head;
        private volatile int tail;
        private volatile int size;
        private int headOffset;
        private volatile int lastRemovalRequestedOffset;

        SignalQueue(SignalLengthGetter<Object> signalLengthGetter) {
            this.signalLengthGetter = signalLengthGetter;
            this.elements = new Object[16];
        }

        int addAndRemoveIfRequested(Object o) {
            Objects.requireNonNull(o);
            int removedLength = 0;
            if (this.headOffset < this.lastRemovalRequestedOffset) {
                removedLength = this.removeElements();
            }
            int t = this.tail;
            this.elements[t] = o;
            ++this.size;
            this.tail = t + 1 & this.elements.length - 1;
            if (this.tail == this.head) {
                this.doubleCapacity();
            }
            return removedLength;
        }

        private int removeElements() {
            int removalRequestedOffset = this.lastRemovalRequestedOffset;
            int numElementsToBeRemoved = removalRequestedOffset - this.headOffset;
            int bitMask = this.elements.length - 1;
            int oldHead = this.head;
            int removedLength = 0;
            for (int numRemovals = 0; numRemovals < numElementsToBeRemoved; ++numRemovals) {
                int index = oldHead + numRemovals & bitMask;
                Object o = this.elements[index];
                if (!(o instanceof CloseEvent)) {
                    removedLength += this.signalLengthGetter.length(o);
                }
                ReferenceCountUtil.safeRelease((Object)o);
                this.elements[index] = null;
            }
            this.head = oldHead + numElementsToBeRemoved & bitMask;
            this.headOffset = removalRequestedOffset;
            this.size -= numElementsToBeRemoved;
            return removedLength;
        }

        private void doubleCapacity() {
            assert (this.head == this.tail);
            int h = this.head;
            Object[] elements = this.elements;
            int n = elements.length;
            int r = n - h;
            int newCapacity = n << 1;
            if (newCapacity < 0) {
                throw new IllegalStateException("published more than Integer.MAX_VALUE signals.");
            }
            Object[] a = new Object[newCapacity];
            int hOffset = this.headOffset;
            if ((hOffset & newCapacity - 1) == (hOffset & n - 1)) {
                System.arraycopy(elements, h, a, h, r);
                System.arraycopy(elements, 0, a, n, h);
                this.tail += n;
            } else {
                System.arraycopy(elements, h, a, h + n, r);
                System.arraycopy(elements, 0, a, 0, h);
                this.head = h + n;
            }
            this.elements = a;
        }

        Object get(int offset) {
            int head = this.head;
            int tail = this.tail;
            int length = this.elements.length;
            int convertedIndex = offset & length - 1;
            Preconditions.checkState(this.size > 0, "queue is empty");
            Preconditions.checkArgument(head < tail ? head <= convertedIndex && convertedIndex < tail : head <= convertedIndex && convertedIndex < length || 0 <= convertedIndex && convertedIndex < tail, "offset: %s is invalid. head: %s, tail: %s, capacity: %s ", (Object)offset, (Object)head, (Object)tail, (Object)length);
            Preconditions.checkArgument(offset >= this.lastRemovalRequestedOffset, "offset: %s is invalid. (expected: >= lastRemovalRequestedOffset: %s)", offset, this.lastRemovalRequestedOffset);
            return this.elements[convertedIndex];
        }

        void requestRemovalAheadOf(int offset) {
            int oldLastRemovalRequestedOffset;
            do {
                if ((oldLastRemovalRequestedOffset = this.lastRemovalRequestedOffset) < offset) continue;
                return;
            } while (!lastRemovalRequestedOffsetUpdater.compareAndSet(this, oldLastRemovalRequestedOffset, offset));
        }

        int size() {
            return this.size;
        }

        void clear() {
            Object[] oldElements = this.elements;
            if (oldElements == null) {
                return;
            }
            this.elements = null;
            int t = this.tail;
            for (int i = this.head; i < t; ++i) {
                ReferenceCountUtil.safeRelease((Object)oldElements[i]);
            }
        }
    }

    private static final class CloseEvent {
        static final CloseEvent SUCCESSFUL_CLOSE = new CloseEvent(null);
        @Nullable
        private final Throwable cause;

        CloseEvent(@Nullable Throwable cause) {
            this.cause = cause;
        }

        public String toString() {
            if (this.cause == null) {
                return "CloseEvent";
            }
            return "CloseEvent(" + this.cause + ')';
        }
    }

    static class DownstreamSubscription<T>
    implements Subscription {
        private static final int REQUEST_REMOVAL_THRESHOLD = 50;
        static final AtomicLongFieldUpdater<DownstreamSubscription> demandUpdater = AtomicLongFieldUpdater.newUpdater(DownstreamSubscription.class, "demand");
        private static final AtomicReferenceFieldUpdater<DownstreamSubscription, Throwable> cancelledOrAbortedUpdater = AtomicReferenceFieldUpdater.newUpdater(DownstreamSubscription.class, Throwable.class, "cancelledOrAborted");
        private final StreamMessage<T> streamMessage;
        private Subscriber<? super T> subscriber;
        private final StreamMessageProcessor<T> processor;
        private final EventExecutor executor;
        private final boolean withPooledObjects;
        final boolean lastSubscription;
        private boolean invokedOnSubscribe;
        private volatile long demand;
        @Nullable
        private volatile Throwable cancelledOrAborted;
        private volatile int offset;
        private long cumulativeDemand;
        private boolean inOnNext;

        DownstreamSubscription(ChildStreamMessage<T> streamMessage, Subscriber<? super T> subscriber, StreamMessageProcessor<T> processor, EventExecutor executor, boolean withPooledObjects, boolean lastSubscription) {
            this.streamMessage = streamMessage;
            this.subscriber = subscriber;
            this.processor = processor;
            this.executor = executor;
            this.withPooledObjects = withPooledObjects;
            this.lastSubscription = lastSubscription;
        }

        CompletableFuture<Void> completionFuture() {
            return this.streamMessage.completionFuture();
        }

        Subscriber<? super T> subscriber() {
            return this.subscriber;
        }

        void clearSubscriber() {
            this.subscriber = NeverInvokedSubscriber.get();
        }

        void invokeOnSubscribe() {
            if (this.invokedOnSubscribe) {
                return;
            }
            this.invokedOnSubscribe = true;
            if (this.executor.inEventLoop()) {
                this.subscriber.onSubscribe((Subscription)this);
            } else {
                this.executor.execute(() -> this.subscriber.onSubscribe((Subscription)this));
            }
        }

        public void request(long n) {
            block2: {
                long newDemand;
                long oldDemand;
                if (n <= 0L) {
                    IllegalArgumentException cause = new IllegalArgumentException("n: " + n + " (expected: > 0, see Reactive Streams specification rule 3.9)");
                    this.processor.unsubscribe(this, cause);
                    return;
                }
                this.accumulateDemand(n);
                this.processor.requestDemand(this.cumulativeDemand);
                while (!demandUpdater.compareAndSet(this, oldDemand, newDemand = (oldDemand = this.demand) >= Long.MAX_VALUE - n ? Long.MAX_VALUE : oldDemand + n)) {
                }
                if (oldDemand != 0L) break block2;
                this.signal();
            }
        }

        private void accumulateDemand(long n) {
            this.cumulativeDemand = n == Long.MAX_VALUE || Long.MAX_VALUE - n >= this.cumulativeDemand ? Long.MAX_VALUE : (this.cumulativeDemand += n);
        }

        void signal() {
            if (this.executor.inEventLoop()) {
                this.doSignal();
            } else {
                this.executor.execute(this::doSignal);
            }
        }

        private void doSignal() {
            SignalQueue signals = this.processor.signals();
            while (this.doSignalSingle(signals)) {
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private boolean doSignalSingle(SignalQueue signals) {
            long demand;
            if (this.inOnNext) {
                return false;
            }
            if (this.cancelledOrAborted != null) {
                this.processor.unsubscribe(this, this.cancelledOrAborted);
                return false;
            }
            if (this.offset == this.processor.upstreamOffset) {
                return false;
            }
            Object signal = signals.get(this.offset);
            if (signal instanceof CloseEvent) {
                ++this.offset;
                this.processor.unsubscribe(this, ((CloseEvent)signal).cause);
                return false;
            }
            while ((demand = this.demand) != 0L) {
                if (demand != Long.MAX_VALUE && !demandUpdater.compareAndSet(this, demand, demand - 1L)) continue;
                ++this.offset;
                Object obj = signal;
                ReferenceCountUtil.touch((Object)obj);
                try {
                    if (this.withPooledObjects) {
                        if (obj instanceof ByteBufHolder) {
                            obj = DownstreamSubscription.retainedDuplicate((ByteBufHolder)obj);
                        } else if (obj instanceof ByteBuf) {
                            obj = DownstreamSubscription.retainedDuplicate((ByteBuf)obj);
                        }
                    } else if (obj instanceof ByteBufHolder) {
                        obj = DownstreamSubscription.copy((ByteBufHolder)obj);
                    } else if (obj instanceof ByteBuf) {
                        obj = DownstreamSubscription.copy((ByteBuf)obj);
                    }
                }
                catch (Throwable thrown) {
                    this.processor.unsubscribe(this, thrown);
                    return false;
                }
                if (this.processor.isLastDownstreamAdded() && ++this.processor.downstreamSignaledCounter >= 50) {
                    this.processor.downstreamSignaledCounter = 0;
                    int minOffset = Integer.MAX_VALUE;
                    for (DownstreamSubscription s : ((StreamMessageProcessor)this.processor).downstreamSubscriptions) {
                        minOffset = Math.min(minOffset, s.offset);
                    }
                    this.processor.signals().requestRemovalAheadOf(minOffset);
                }
                this.inOnNext = true;
                try {
                    this.subscriber.onNext(obj);
                }
                finally {
                    this.inOnNext = false;
                }
                return true;
            }
            return false;
        }

        public void cancel() {
            if (cancelledOrAbortedUpdater.compareAndSet(this, null, CancelledSubscriptionException.get())) {
                this.signal();
            }
        }

        void abort() {
            if (cancelledOrAbortedUpdater.compareAndSet(this, null, AbortedStreamException.get())) {
                this.signal();
            }
        }

        private static <T> T retainedDuplicate(ByteBufHolder o) {
            return (T)o.replace(o.content().retainedDuplicate());
        }

        private static <T> T retainedDuplicate(ByteBuf o) {
            return (T)o.retainedDuplicate();
        }

        private static <T> T copy(ByteBufHolder o) {
            return (T)o.replace(Unpooled.copiedBuffer((ByteBuf)o.content()));
        }

        private static <T> T copy(ByteBuf o) {
            return (T)Unpooled.copiedBuffer((ByteBuf)o);
        }
    }

    private static class ChildStreamMessage<T>
    implements StreamMessage<T> {
        private static final AtomicReferenceFieldUpdater<ChildStreamMessage, DownstreamSubscription> subscriptionUpdater = AtomicReferenceFieldUpdater.newUpdater(ChildStreamMessage.class, DownstreamSubscription.class, "subscription");
        private final AbstractStreamMessageDuplicator<T, ?> parent;
        private final StreamMessageProcessor<T> processor;
        private final boolean lastStream;
        @Nullable
        private volatile DownstreamSubscription<T> subscription;
        private final CompletableFuture<Void> completionFuture = new CompletableFuture();

        ChildStreamMessage(AbstractStreamMessageDuplicator<T, ?> parent, StreamMessageProcessor<T> processor, boolean lastStream) {
            this.parent = parent;
            this.processor = processor;
            this.lastStream = lastStream;
        }

        @Override
        public boolean isOpen() {
            return this.processor.upstream().isOpen() && !this.completionFuture.isDone();
        }

        @Override
        public boolean isEmpty() {
            if (this.isOpen()) {
                return false;
            }
            return this.processor.upstream().isEmpty();
        }

        @Override
        public CompletableFuture<Void> completionFuture() {
            return this.completionFuture;
        }

        @Override
        public void subscribe(Subscriber<? super T> subscriber) {
            Objects.requireNonNull(subscriber, "subscriber");
            this.subscribe(subscriber, this.parent.duplicatorExecutor(), false);
        }

        @Override
        public void subscribe(Subscriber<? super T> subscriber, boolean withPooledObjects) {
            Objects.requireNonNull(subscriber, "subscriber");
            this.subscribe0(subscriber, this.parent.duplicatorExecutor(), withPooledObjects);
        }

        @Override
        public void subscribe(Subscriber<? super T> subscriber, EventExecutor executor) {
            this.subscribe(subscriber, executor, false);
        }

        @Override
        public void subscribe(Subscriber<? super T> subscriber, EventExecutor executor, boolean withPooledObjects) {
            Objects.requireNonNull(subscriber, "subscriber");
            Objects.requireNonNull(executor, "executor");
            this.subscribe0(subscriber, executor, withPooledObjects);
        }

        private void subscribe0(Subscriber<? super T> subscriber, EventExecutor executor, boolean withPooledObjects) {
            DownstreamSubscription<? super T> subscription = new DownstreamSubscription<T>(this, subscriber, this.processor, executor, withPooledObjects, this.lastStream);
            if (!subscriptionUpdater.compareAndSet(this, null, subscription)) {
                ChildStreamMessage.failLateSubscriber(executor, subscriber, this.subscription.subscriber());
                return;
            }
            this.processor.subscribe(subscription);
        }

        private static void failLateSubscriber(EventExecutor executor, Subscriber<?> lateSubscriber, Subscriber<?> oldSubscriber) {
            RuntimeException cause = oldSubscriber instanceof AbortingSubscriber ? AbortedStreamException.get() : new IllegalStateException("subscribed by other subscriber already");
            executor.execute(() -> {
                lateSubscriber.onSubscribe((Subscription)NoopSubscription.INSTANCE);
                lateSubscriber.onError(cause);
            });
        }

        @Override
        public void abort() {
            DownstreamSubscription<T> currentSubscription = this.subscription;
            if (currentSubscription != null) {
                currentSubscription.abort();
                return;
            }
            DownstreamSubscription newSubscription = new DownstreamSubscription(this, AbortingSubscriber.get(), this.processor, (EventExecutor)ImmediateEventExecutor.INSTANCE, false, false);
            if (subscriptionUpdater.compareAndSet(this, null, newSubscription)) {
                newSubscription.completionFuture().completeExceptionally(AbortedStreamException.get());
            } else {
                this.subscription.abort();
            }
        }
    }

    static class StreamMessageProcessor<T>
    implements Subscriber<T> {
        private static final AtomicLongFieldUpdater<StreamMessageProcessor> requestedDemandUpdater = AtomicLongFieldUpdater.newUpdater(StreamMessageProcessor.class, "requestedDemand");
        private final StreamMessage<T> upstream;
        private final SignalQueue signals;
        private final SignalLengthGetter<Object> signalLengthGetter;
        private final EventExecutor processorExecutor;
        private final int maxSignalLength;
        private int signalLength;
        private final Set<DownstreamSubscription<T>> downstreamSubscriptions = Collections.newSetFromMap(new ConcurrentHashMap());
        volatile int downstreamSignaledCounter;
        volatile int upstreamOffset;
        private volatile long requestedDemand;
        @Nullable
        private volatile Subscription upstreamSubscription;
        private volatile State state = State.DUPLICABLE;

        StreamMessageProcessor(StreamMessage<T> upstream, SignalLengthGetter<?> signalLengthGetter, EventExecutor executor, long maxSignalLength) {
            this.upstream = upstream;
            this.signalLengthGetter = signalLengthGetter;
            this.processorExecutor = executor;
            this.maxSignalLength = maxSignalLength == 0L || maxSignalLength > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)maxSignalLength;
            this.signals = new SignalQueue(this.signalLengthGetter);
            upstream.subscribe(this, this.processorExecutor, true);
        }

        StreamMessage<T> upstream() {
            return this.upstream;
        }

        SignalQueue signals() {
            return this.signals;
        }

        public void onSubscribe(Subscription s) {
            this.upstreamSubscription = s;
            if (this.processorExecutor.inEventLoop()) {
                this.downstreamSubscriptions.forEach(DownstreamSubscription::invokeOnSubscribe);
            } else {
                this.processorExecutor.execute(() -> this.downstreamSubscriptions.forEach(DownstreamSubscription::invokeOnSubscribe));
            }
        }

        public void onNext(T obj) {
            this.pushSignal(obj);
        }

        public void onError(Throwable cause) {
            if (cause == null) {
                cause = new IllegalStateException("onError() was invoked with null cause.");
            }
            this.pushSignal(new CloseEvent(cause));
        }

        public void onComplete() {
            this.pushSignal(CloseEvent.SUCCESSFUL_CLOSE);
        }

        private void pushSignal(Object obj) {
            if (this.processorExecutor.inEventLoop()) {
                this.doPushSignal(obj);
            } else {
                this.processorExecutor.execute(() -> this.doPushSignal(obj));
            }
        }

        private void doPushSignal(Object obj) {
            int dataLength;
            if (this.state == State.CLOSED) {
                ReferenceCountUtil.safeRelease((Object)obj);
                return;
            }
            if (!(obj instanceof CloseEvent) && (dataLength = this.signalLengthGetter.length(obj)) > 0) {
                int allowedMaxSignalLength = this.maxSignalLength - this.signalLength;
                if (dataLength > allowedMaxSignalLength) {
                    this.upstream.abort();
                    throw new IllegalStateException("signal length greater than the maxSignalLength: " + this.maxSignalLength);
                }
                this.signalLength += dataLength;
            }
            try {
                int removedLength = this.signals.addAndRemoveIfRequested(obj);
                this.signalLength -= removedLength;
            }
            catch (IllegalStateException e) {
                this.upstream.abort();
                throw e;
            }
            ++this.upstreamOffset;
            if (!this.downstreamSubscriptions.isEmpty()) {
                this.downstreamSubscriptions.forEach(DownstreamSubscription::signal);
            }
        }

        void subscribe(DownstreamSubscription<T> subscription) {
            if (this.processorExecutor.inEventLoop()) {
                this.doSubscribe(subscription);
            } else {
                this.processorExecutor.execute(() -> this.doSubscribe(subscription));
            }
        }

        private void doSubscribe(DownstreamSubscription<T> subscription) {
            if (this.state != State.DUPLICABLE) {
                throw new IllegalStateException("duplicator is closed or last downstream is added.");
            }
            this.downstreamSubscriptions.add(subscription);
            if (subscription.lastSubscription) {
                this.state = State.LAST_DOWNSTREAM_ADDED;
            }
            if (this.upstreamSubscription != null) {
                if (this.processorExecutor.inEventLoop()) {
                    subscription.invokeOnSubscribe();
                } else {
                    this.processorExecutor.execute(subscription::invokeOnSubscribe);
                }
            }
        }

        void unsubscribe(DownstreamSubscription<T> subscription, @Nullable Throwable cause) {
            if (this.processorExecutor.inEventLoop()) {
                this.doUnsubscribe(subscription, cause);
            } else {
                this.processorExecutor.execute(() -> this.doUnsubscribe(subscription, cause));
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doUnsubscribe(DownstreamSubscription<T> subscription, @Nullable Throwable cause) {
            if (!this.downstreamSubscriptions.remove(subscription)) {
                return;
            }
            Subscriber<T> subscriber = subscription.subscriber();
            subscription.clearSubscriber();
            CompletableFuture<Void> completionFuture = subscription.completionFuture();
            if (cause == null) {
                try {
                    subscriber.onComplete();
                }
                finally {
                    completionFuture.complete(null);
                    this.doCleanupIfLastSubscription();
                }
                return;
            }
            try {
                if (!(cause instanceof CancelledSubscriptionException)) {
                    subscriber.onError(cause);
                }
            }
            finally {
                completionFuture.completeExceptionally(cause);
                this.doCleanupIfLastSubscription();
            }
        }

        private void doCleanupIfLastSubscription() {
            if (this.isLastDownstreamAdded() && this.downstreamSubscriptions.isEmpty() && this.state == State.LAST_DOWNSTREAM_ADDED) {
                this.state = State.CLOSED;
                this.upstream.abort();
                this.signals.clear();
            }
        }

        void requestDemand(long cumulativeDemand) {
            long currentRequested;
            while (cumulativeDemand > (currentRequested = this.requestedDemand)) {
                if (!requestedDemandUpdater.compareAndSet(this, currentRequested, cumulativeDemand)) continue;
                this.upstreamSubscription.request(cumulativeDemand - currentRequested);
                break;
            }
        }

        boolean isDuplicable() {
            return this.state == State.DUPLICABLE;
        }

        boolean isLastDownstreamAdded() {
            return this.state == State.LAST_DOWNSTREAM_ADDED;
        }

        void close() {
            if (this.processorExecutor.inEventLoop()) {
                this.doClose();
            } else {
                this.processorExecutor.execute(this::doClose);
            }
        }

        void doClose() {
            if (this.state != State.CLOSED) {
                this.state = State.CLOSED;
                this.upstream.abort();
                this.doCleanup();
            }
        }

        private void doCleanup() {
            ArrayList completionFutures = new ArrayList(this.downstreamSubscriptions.size());
            this.downstreamSubscriptions.forEach(s -> {
                s.abort();
                CompletableFuture<Void> future = s.completionFuture();
                completionFutures.add(future);
            });
            this.downstreamSubscriptions.clear();
            CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(completionFutures.toArray(EMPTY_FUTURES));
            allDoneFuture.whenComplete((unused1, unused2) -> this.signals.clear());
        }

        private static enum State {
            DUPLICABLE,
            LAST_DOWNSTREAM_ADDED,
            CLOSED;

        }
    }
}

