/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream.processor;

import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.processor.StreamTransformer;

public final class StreamBuffer<T>
implements StreamTransformer<T, T>,
WithInitializer<StreamBuffer<T>> {
    private static final boolean CHECK = Checks.isEnabled(StreamBuffer.class);
    private static final boolean NULLIFY_ON_TAKE_OUT = ApplicationSettings.getBoolean(StreamBuffer.class, (String)"nullifyOnTakeOut", (boolean)true);
    private final Input input;
    private final Output output;
    private final Object[] elements;
    private int tail;
    private int head;
    private final int bufferMinSize;
    private final int bufferMaxSize;
    private final StreamDataAcceptor<T> toBuffer;

    private StreamBuffer(int bufferMinSize, int bufferMaxSize) {
        Checks.checkArgument((bufferMaxSize > 0 && bufferMinSize >= 0 ? 1 : 0) != 0);
        this.bufferMinSize = bufferMinSize;
        this.bufferMaxSize = bufferMaxSize;
        this.elements = new Object[1 << 32 - Integer.numberOfLeadingZeros(this.bufferMaxSize - 1)];
        this.input = new Input();
        this.output = new Output();
        this.toBuffer = item -> {
            this.doAdd(item);
            if (this.size() >= bufferMaxSize) {
                this.input.suspend();
                this.output.flush();
            }
        };
        this.input.getAcknowledgement().whenException(this.output::closeEx);
        this.output.getAcknowledgement().whenResult(this.input::acknowledge).whenException(this.input::closeEx);
    }

    public static <T> StreamBuffer<T> create(int bufferMinSize, int bufferMaxSize) {
        return new StreamBuffer<T>(bufferMinSize, bufferMaxSize);
    }

    public boolean isSaturated() {
        return this.size() >= this.bufferMaxSize;
    }

    public boolean isExhausted() {
        return this.size() <= this.bufferMinSize;
    }

    public boolean isEmpty() {
        return this.tail == this.head;
    }

    public int size() {
        return this.tail - this.head;
    }

    private void doAdd(T value) {
        this.elements[this.tail++ & this.elements.length - 1] = value;
    }

    @Override
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override
    public StreamSupplier<T> getOutput() {
        return this.output;
    }

    private void sync() {
        if (this.size() >= this.bufferMaxSize) {
            this.input.suspend();
        } else if (this.size() <= this.bufferMinSize) {
            if (this.isEmpty() && this.output.isReady()) {
                this.input.resume(this.output.getDataAcceptor());
            } else {
                this.input.resume(this.toBuffer);
            }
        }
    }

    private final class Input
    extends AbstractStreamConsumer<T> {
        private Input() {
        }

        @Override
        protected void onStarted() {
            StreamBuffer.this.sync();
        }

        @Override
        protected void onEndOfStream() {
            StreamBuffer.this.output.flush();
        }
    }

    private final class Output
    extends AbstractStreamSupplier<T> {
        private Output() {
        }

        @Override
        protected void onResumed() {
            this.flush();
        }

        void flush() {
            StreamDataAcceptor<Object> acceptor;
            int head = StreamBuffer.this.head;
            int tail = StreamBuffer.this.tail;
            while ((acceptor = this.getDataAcceptor()) != null && head != tail) {
                int pos = head++ & StreamBuffer.this.elements.length - 1;
                Object item = StreamBuffer.this.elements[pos];
                if (NULLIFY_ON_TAKE_OUT) {
                    ((StreamBuffer)StreamBuffer.this).elements[pos] = null;
                }
                acceptor.accept(item);
            }
            if (CHECK) {
                Checks.checkState((tail == StreamBuffer.this.tail ? 1 : 0) != 0, (Object)"New items have been added to buffer while flushing");
            }
            StreamBuffer.this.head = head;
            if (StreamBuffer.this.isEmpty() && StreamBuffer.this.input.isEndOfStream()) {
                this.sendEndOfStream();
            }
            StreamBuffer.this.sync();
        }

        @Override
        protected void onSuspended() {
            StreamBuffer.this.sync();
        }
    }
}

