/*
 * Decompiled with CFR 0.152.
 */
package alluxio.client.block.stream;

import alluxio.client.block.stream.DataMessageClientResponseObserver;
import alluxio.client.block.stream.GrpcBlockingStream;
import alluxio.grpc.DataMessage;
import alluxio.grpc.DataMessageMarshaller;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.io.grpc.stub.StreamObserver;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;

@NotThreadSafe
public class GrpcDataMessageBlockingStream<ReqT, ResT>
extends GrpcBlockingStream<ReqT, ResT> {
    private final DataMessageMarshaller<ReqT> mRequestMarshaller;
    private final DataMessageMarshaller<ResT> mResponseMarshaller;

    public GrpcDataMessageBlockingStream(Function<StreamObserver<ResT>, StreamObserver<ReqT>> rpcFunc, int bufferSize, String description, DataMessageMarshaller<ReqT> requestMarshaller, DataMessageMarshaller<ResT> responseMarshaller) {
        super(resObserver -> {
            DataMessageClientResponseObserver newObserver = new DataMessageClientResponseObserver(resObserver, requestMarshaller, responseMarshaller);
            return (StreamObserver)rpcFunc.apply(newObserver);
        }, bufferSize, description);
        this.mRequestMarshaller = requestMarshaller;
        this.mResponseMarshaller = responseMarshaller;
    }

    @Override
    public ResT receive(long timeoutMs) throws IOException {
        if (this.mResponseMarshaller == null) {
            return super.receive(timeoutMs);
        }
        DataMessage<ResT, DataBuffer> message = this.receiveDataMessage(timeoutMs);
        if (message == null) {
            return null;
        }
        return this.mResponseMarshaller.combineData(message);
    }

    public DataMessage<ResT, DataBuffer> receiveDataMessage(long timeoutMs) throws IOException {
        Preconditions.checkNotNull(this.mResponseMarshaller, "Cannot retrieve data message without a response marshaller.");
        Object response = super.receive(timeoutMs);
        if (response == null) {
            return null;
        }
        Object buffer = this.mResponseMarshaller.pollBuffer(response);
        return new DataMessage(response, buffer);
    }

    public void sendDataMessage(DataMessage<ReqT, DataBuffer> message, long timeoutMs) throws IOException {
        if (this.mRequestMarshaller != null) {
            this.mRequestMarshaller.offerBuffer(message.getBuffer(), message.getMessage());
        }
        super.send(message.getMessage(), timeoutMs);
    }

    @Override
    public Optional<ResT> waitForComplete(long timeoutMs) throws IOException {
        DataMessage<ResT, DataBuffer> message;
        if (this.mResponseMarshaller == null) {
            return super.waitForComplete(timeoutMs);
        }
        DataMessage<ResT, DataBuffer> prevMessage = null;
        while (!this.isCanceled() && (message = this.receiveDataMessage(timeoutMs)) != null) {
            if (prevMessage != null && prevMessage.getBuffer() != null) {
                prevMessage.getBuffer().release();
            }
            prevMessage = message;
        }
        ResT result = this.mResponseMarshaller.combineData(prevMessage);
        return Optional.ofNullable(super.waitForComplete(timeoutMs).orElse(result));
    }
}

