/*
 * Decompiled with CFR 0.152.
 */
package at.qubic.api.network;

import at.qubic.api.QubicRequest;
import at.qubic.api.domain.MessageType;
import at.qubic.api.domain.QubicMessage;
import at.qubic.api.network.Node;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import lombok.Generated;
import org.apache.commons.codec.binary.Hex;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;

public class BasicNodeOperations {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BasicNodeOperations.class);

    public Mono<QubicMessage> sendAndReceiveNext(Node node, Mono<QubicRequest> request) {
        return this.sendAndReceive(node, request).next();
    }

    public Mono<QubicMessage> sendAndReceiveNext(Node node, Mono<? extends QubicRequest> request, MessageType messageType) {
        return this.sendAndReceive(node, request, messageType).next();
    }

    public Flux<QubicMessage> sendAndReceive(Node node, Mono<QubicRequest> request) {
        return this.sendAndReceive(node, request, null);
    }

    public Flux<QubicMessage> sendAndReceive(Node node, Mono<? extends QubicRequest> request, MessageType type) {
        Predicate<QubicMessage> takeWhilePredicate = type == null ? BasicNodeOperations.isType(MessageType.END_RESPONSE).negate() : BasicNodeOperations.isType(type);
        return node.getClient().connect().flatMapMany(conn -> BasicNodeOperations.exchangeMessages(conn, this.convertToMessageBytes(request)).map(this::readMessage).doOnNext(node::updatePublicPeers).skipWhile(BasicNodeOperations.isType(MessageType.EXCHANGE_PUBLIC_PEERS)).takeWhile(takeWhilePredicate)).timeout(Duration.ofSeconds(5L)).doOnError(e -> this.handleNodeCallError(node, (Throwable)e));
    }

    public Mono<? extends Connection> send(Node node, Mono<QubicRequest> requestMessage) {
        Mono requestBytes = requestMessage.map(req -> req.toMessage().toBytes()).doOnNext(bytes -> log.debug("Sending bytes: [{}]", (Object)Hex.encodeHexString((byte[])bytes)));
        return node.getClient().handle((in, out) -> out.sendByteArray((Publisher)requestBytes)).connect().doOnError(e -> this.handleNodeCallError(node, (Throwable)e));
    }

    private void handleNodeCallError(Node node, Throwable err) {
        if (err instanceof TimeoutException || err instanceof ConnectException) {
            log.warn("Node [{}] call errored: {}", (Object)node.getName(), (Object)err.getMessage());
        } else {
            log.error("Node [{}] call errored.", (Object)node.getName(), (Object)err);
        }
    }

    private static Flux<ByteBuffer> exchangeMessages(Connection conn, Mono<byte[]> requestBytes) {
        return conn.outbound().sendByteArray(requestBytes).then().thenMany((Publisher)conn.inbound().receive().asByteBuffer().doOnTerminate(() -> ((Connection)conn).dispose()));
    }

    private QubicMessage readMessage(ByteBuffer buf) {
        QubicMessage msg = QubicMessage.fromBytes(buf);
        log.trace("Payload: {}", (Object)msg.getPayload());
        if (buf.hasRemaining()) {
            byte[] remaining = new byte[buf.remaining()];
            log.warn("Received [{}] bytes more than expected. Message: {}, remaining: {}.", new Object[]{buf.remaining(), msg, buf.get(remaining)});
            assert (!buf.hasRemaining());
        }
        if (BasicNodeOperations.isType(MessageType.END_RESPONSE).test(msg)) {
            log.debug("Received header with type [{}].", (Object)MessageType.END_RESPONSE);
        }
        return msg;
    }

    private Mono<byte[]> convertToMessageBytes(Mono<? extends QubicRequest> request) {
        return request.map(req -> req.toMessage().toBytes()).doOnNext(bytes -> log.debug("Sending bytes [{}]", (Object)Hex.encodeHexString((byte[])bytes)));
    }

    private static Predicate<QubicMessage> isType(MessageType message) {
        return qubicMessage -> qubicMessage.getHeader().getType() == message;
    }
}

