/*
 * Decompiled with CFR 0.152.
 */
package org.noear.socketd.transport.stream.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.noear.socketd.exception.SocketDException;
import org.noear.socketd.exception.SocketDTimeoutException;
import org.noear.socketd.transport.core.MessageInternal;
import org.noear.socketd.transport.core.Reply;
import org.noear.socketd.transport.stream.RequestStream;
import org.noear.socketd.transport.stream.impl.StreamBase;
import org.noear.socketd.utils.IoConsumer;

public class RequestStreamImpl
extends StreamBase<RequestStream>
implements RequestStream {
    private final CompletableFuture<Reply> future = new CompletableFuture();

    public RequestStreamImpl(String sid, long timeout) {
        super(sid, 1, timeout);
    }

    @Override
    public boolean isDone() {
        return this.future.isDone();
    }

    @Override
    public void onError(Throwable error) {
        if (this.doOnError != null) {
            this.doOnError.accept(error);
        }
        this.future.completeExceptionally(error);
    }

    @Override
    public void onReply(MessageInternal reply) {
        this.future.complete(reply);
    }

    @Override
    public Reply await() {
        try {
            return this.future.get(this.timeout(), TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            throw new SocketDTimeoutException("Request reply timeout > " + this.timeout() + ", sid=" + this.sid());
        }
        catch (Throwable e) {
            if (e instanceof ExecutionException) {
                e = e.getCause();
            }
            if (e instanceof SocketDException) {
                throw (SocketDException)e;
            }
            throw new SocketDException("Request failed, sid=" + this.sid(), e);
        }
    }

    @Override
    public RequestStream thenReply(IoConsumer<Reply> onReply) {
        this.future.thenAccept(r -> {
            try {
                onReply.accept((Reply)r);
            }
            catch (Throwable e) {
                this.onError(e);
            }
        });
        return this;
    }
}

