/*
 * Decompiled with CFR 0.152.
 */
package io.activej.datastream.stats;

import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamConsumer;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.processor.StreamTransformer;
import io.activej.datastream.stats.StreamStats;

public class StreamStatsForwarder<T>
implements StreamTransformer<T, T>,
WithInitializer<StreamStatsForwarder<T>> {
    private final Input input;
    private final Output output;
    private final StreamStats<T> stats;

    private StreamStatsForwarder(StreamStats<T> stats) {
        this.stats = stats;
        this.input = new Input();
        this.output = new Output();
        this.input.getAcknowledgement().whenException(this.output::closeEx);
        this.output.getAcknowledgement().whenResult(this.input::acknowledge).whenException(this.input::closeEx);
    }

    public static <T> StreamStatsForwarder<T> create(StreamStats<T> stats) {
        return new StreamStatsForwarder<T>(stats);
    }

    @Override
    public StreamConsumer<T> getInput() {
        return this.input;
    }

    @Override
    public StreamSupplier<T> getOutput() {
        return this.output;
    }

    protected final class Input
    extends AbstractStreamConsumer<T> {
        protected Input() {
        }

        @Override
        protected void onStarted() {
            StreamStatsForwarder.this.stats.onStarted();
            this.resume(StreamStatsForwarder.this.output.getDataAcceptor());
        }

        @Override
        protected void onEndOfStream() {
            StreamStatsForwarder.this.stats.onEndOfStream();
            StreamStatsForwarder.this.output.sendEndOfStream();
        }

        @Override
        protected void onError(Exception e) {
            StreamStatsForwarder.this.stats.onError(e);
        }
    }

    protected final class Output
    extends AbstractStreamSupplier<T> {
        protected Output() {
        }

        @Override
        protected void onResumed() {
            StreamStatsForwarder.this.stats.onResume();
            StreamStatsForwarder.this.input.resume(this.getDataAcceptor());
        }

        @Override
        protected void onSuspended() {
            StreamStatsForwarder.this.stats.onSuspend();
            StreamStatsForwarder.this.input.suspend();
        }

        @Override
        protected void onError(Exception e) {
            StreamStatsForwarder.this.stats.onError(e);
        }
    }
}

