/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream;

import io.activej.common.Checks;
import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.promise.Promise;
import java.util.HashSet;
import java.util.Set;
import org.jetbrains.annotations.NotNull;

public final class StreamConsumerSwitcher<T>
extends AbstractStreamConsumer<T>
implements WithInitializer<StreamConsumerSwitcher<T>> {
    private InternalSupplier internalSupplier = new InternalSupplier();
    private final Set<InternalSupplier> pendingAcknowledgements = new HashSet<InternalSupplier>();

    private StreamConsumerSwitcher() {
    }

    public static <T> StreamConsumerSwitcher<T> create() {
        return new StreamConsumerSwitcher<T>();
    }

    public Promise<Void> switchTo(@NotNull StreamConsumer<T> consumer) {
        InternalSupplier internalSupplierNew;
        Checks.checkState((!this.isComplete() ? 1 : 0) != 0);
        Checks.checkState((!this.isEndOfStream() ? 1 : 0) != 0);
        assert (this.internalSupplier != null);
        InternalSupplier internalSupplierOld = this.internalSupplier;
        this.internalSupplier = internalSupplierNew = new InternalSupplier();
        internalSupplierNew.streamTo(consumer);
        internalSupplierOld.sendEndOfStream();
        return internalSupplierNew.getAcknowledgement();
    }

    public int getPendingAcknowledgements() {
        return this.pendingAcknowledgements.size();
    }

    @Override
    protected void onStarted() {
        this.resume(this.internalSupplier.getDataAcceptor());
    }

    @Override
    protected void onEndOfStream() {
        this.internalSupplier.sendEndOfStream();
    }

    @Override
    protected void onError(Exception e) {
        this.internalSupplier.closeEx(e);
        for (InternalSupplier pendingAcknowledgement : this.pendingAcknowledgements) {
            pendingAcknowledgement.getConsumer().closeEx(e);
        }
    }

    @Override
    protected void onCleanup() {
        this.internalSupplier = null;
        this.pendingAcknowledgements.clear();
    }

    private class InternalSupplier
    extends AbstractStreamSupplier<T> {
        private InternalSupplier() {
        }

        @Override
        protected void onStarted() {
            StreamConsumerSwitcher.this.pendingAcknowledgements.add(this);
        }

        @Override
        protected void onResumed() {
            if (StreamConsumerSwitcher.this.internalSupplier == this) {
                StreamConsumerSwitcher.this.resume(this.getDataAcceptor());
            }
        }

        @Override
        protected void onSuspended() {
            if (StreamConsumerSwitcher.this.internalSupplier == this) {
                StreamConsumerSwitcher.this.suspend();
            }
        }

        @Override
        protected void onAcknowledge() {
            StreamConsumerSwitcher.this.pendingAcknowledgements.remove(this);
            if (StreamConsumerSwitcher.this.pendingAcknowledgements.isEmpty()) {
                StreamConsumerSwitcher.this.acknowledge();
            }
        }

        @Override
        protected void onError(Exception e) {
            StreamConsumerSwitcher.this.closeEx(e);
        }
    }
}

