/*
 * Decompiled with CFR 0.152.
 */
package io.activej.rpc.protocol;

import io.activej.async.exception.AsyncCloseException;
import io.activej.common.MemSize;
import io.activej.csp.ChannelConsumer;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelSupplier;
import io.activej.csp.process.frames.ChannelFrameDecoder;
import io.activej.csp.process.frames.ChannelFrameEncoder;
import io.activej.csp.process.frames.FrameFormat;
import io.activej.datastream.AbstractStreamConsumer;
import io.activej.datastream.AbstractStreamSupplier;
import io.activej.datastream.StreamDataAcceptor;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.net.socket.tcp.AsyncTcpSocket;
import io.activej.rpc.protocol.RpcMessage;
import io.activej.serializer.BinarySerializer;
import java.time.Duration;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

public final class RpcStream {
    private final ChannelDeserializer<RpcMessage> deserializer;
    private final ChannelSerializer<RpcMessage> serializer;
    private Listener listener;
    private final AbstractStreamConsumer<RpcMessage> internalConsumer = new AbstractStreamConsumer<RpcMessage>(){};
    private final AbstractStreamSupplier<RpcMessage> internalSupplier = new AbstractStreamSupplier<RpcMessage>(){

        protected void onResumed() {
            RpcStream.this.deserializer.updateDataAcceptor();
            RpcStream.this.listener.onSenderReady((StreamDataAcceptor<RpcMessage>)this.getDataAcceptor());
        }

        protected void onSuspended() {
            if (RpcStream.this.server) {
                RpcStream.this.deserializer.updateDataAcceptor();
            }
            RpcStream.this.listener.onSenderSuspended();
        }
    };
    private final boolean server;
    private final AsyncTcpSocket socket;

    public RpcStream(AsyncTcpSocket socket, BinarySerializer<RpcMessage> messageSerializer, MemSize initialBufferSize, Duration autoFlushInterval, @Nullable FrameFormat frameFormat, boolean server) {
        this.server = server;
        this.socket = socket;
        ChannelSerializer serializer = ChannelSerializer.create(messageSerializer).withInitialBufferSize(initialBufferSize).withAutoFlushInterval(autoFlushInterval).withSerializationErrorHandler((message, e) -> this.listener.onSerializationError((RpcMessage)message, (Exception)e));
        ChannelDeserializer deserializer = ChannelDeserializer.create(messageSerializer);
        if (frameFormat != null) {
            ChannelFrameDecoder decompressor = ChannelFrameDecoder.create((FrameFormat)frameFormat);
            ChannelFrameEncoder compressor = ChannelFrameEncoder.create((FrameFormat)frameFormat);
            ChannelSupplier.ofSocket((AsyncTcpSocket)socket).bindTo((ChannelInput)decompressor.getInput());
            decompressor.getOutput().bindTo(deserializer.getInput());
            serializer.getOutput().bindTo(compressor.getInput());
            compressor.getOutput().set(ChannelConsumer.ofSocket((AsyncTcpSocket)socket));
        } else {
            ChannelSupplier.ofSocket((AsyncTcpSocket)socket).bindTo(deserializer.getInput());
            serializer.getOutput().set(ChannelConsumer.ofSocket((AsyncTcpSocket)socket));
        }
        deserializer.streamTo(this.internalConsumer);
        this.deserializer = deserializer;
        this.serializer = serializer;
    }

    public void setListener(Listener listener) {
        this.listener = listener;
        this.deserializer.getEndOfStream().whenResult(listener::onReceiverEndOfStream).whenException(listener::onReceiverError);
        this.serializer.getAcknowledgement().whenException(listener::onSenderError);
        this.internalSupplier.streamTo(this.serializer);
        this.internalConsumer.resume((StreamDataAcceptor)this.listener);
    }

    public void receiverSuspend() {
        this.internalConsumer.suspend();
    }

    public void receiverResume() {
        this.internalConsumer.resume((StreamDataAcceptor)this.listener);
    }

    public void sendEndOfStream() {
        this.internalSupplier.sendEndOfStream();
    }

    public void close() {
        this.closeEx((Exception)new AsyncCloseException("RPC Channel Closed"));
    }

    public void closeEx(@NotNull Exception e) {
        this.socket.closeEx(e);
        this.serializer.closeEx(e);
        this.deserializer.closeEx(e);
    }

    public static interface Listener
    extends StreamDataAcceptor<RpcMessage> {
        public void onReceiverEndOfStream();

        public void onReceiverError(@NotNull Exception var1);

        public void onSenderError(@NotNull Exception var1);

        public void onSerializationError(RpcMessage var1, @NotNull Exception var2);

        public void onSenderReady(@NotNull StreamDataAcceptor<RpcMessage> var1);

        public void onSenderSuspended();
    }
}

