/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.wire.fdx.outbound.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.DefaultPayload;
import io.vlingo.common.Completes;
import io.vlingo.common.Scheduler;
import io.vlingo.wire.fdx.outbound.ManagedOutboundChannel;
import io.vlingo.wire.node.Address;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class RSocketOutboundChannel
implements ManagedOutboundChannel {
    private static final Logger logger = LoggerFactory.getLogger(RSocketOutboundChannel.class);
    private final Scheduler scheduler = new Scheduler();
    private final Address address;
    private final Duration connectionTimeout;
    private final ClientTransport transport;
    private RSocket clientSocket;

    public RSocketOutboundChannel(Address address, ClientTransport clientTransport, io.vlingo.actors.Logger logger) {
        this(address, clientTransport, Duration.ofMillis(100L), logger);
    }

    public RSocketOutboundChannel(Address address, ClientTransport clientTransport, Duration connectionTimeout, io.vlingo.actors.Logger logger) {
        this.address = address;
        this.connectionTimeout = connectionTimeout;
        this.transport = clientTransport;
    }

    @Override
    public void close() {
        if (this.clientSocket != null && !this.clientSocket.isDisposed()) {
            try {
                this.clientSocket.dispose();
            }
            catch (Throwable t) {
                logger.error("Unexpected error when closing outbound channel", t);
            }
        }
        this.clientSocket = null;
    }

    @Override
    public Completes<Void> writeAsync(ByteBuffer buffer) {
        Completes result = Completes.using((Scheduler)this.scheduler);
        this.writeAsyncInternal(buffer).subscribe(arg_0 -> ((Completes)result).with(arg_0), t -> result.failed());
        return result;
    }

    @Override
    public void write(ByteBuffer buffer) {
        this.writeAsyncInternal(buffer).block();
    }

    private Mono<Void> writeAsyncInternal(ByteBuffer buffer) {
        return this.prepareSocket().map(socket -> {
            if (socket.isDisposed()) {
                logger.warn("RSocket outbound channel for {} is closed. Message dropped", (Object)this.address);
                return Mono.empty();
            }
            Payload payload = DefaultPayload.create((ByteBuffer)buffer);
            return socket.fireAndForget(payload).doFinally(signalType -> logger.trace("Message sent to {}", (Object)this.address)).doOnError(throwable -> logger.error("Failed write to {}, because: {}", new Object[]{this.address, throwable.getMessage(), throwable}));
        }).orElseGet(() -> {
            logger.debug("RSocket outbound channel for {} not ready. Message dropped", (Object)this.address);
            return Mono.empty();
        });
    }

    private Optional<RSocket> prepareSocket() {
        if (this.clientSocket == null || this.clientSocket.isDisposed()) {
            try {
                this.clientSocket = (RSocket)RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY).connect(this.transport).timeout(this.connectionTimeout, Mono.error((Throwable)new TimeoutException("Timeout establishing connection for " + this.address))).block();
                logger.info("RSocket outbound channel opened for {}", (Object)this.address);
                this.clientSocket.onClose().doFinally(signalType -> {
                    logger.info("RSocket outbound channel for {} is closed", (Object)this.address);
                    this.close();
                }).subscribe(ignored -> {}, throwable -> logger.error("Unexpected error on closing outbound channel", throwable));
            }
            catch (Throwable t) {
                logger.warn("Failed to create RSocket outbound channel for {}, because {}", (Object)this.address, (Object)t.getMessage());
                this.close();
                return Optional.empty();
            }
        }
        return Optional.ofNullable(this.clientSocket);
    }
}

