/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.wire.fdx.bidirectional.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.core.RSocketConnector;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.DefaultPayload;
import io.vlingo.actors.Logger;
import io.vlingo.common.pool.ElasticResourcePool;
import io.vlingo.wire.channel.ResponseChannelConsumer;
import io.vlingo.wire.fdx.bidirectional.ClientRequestResponseChannel;
import io.vlingo.wire.message.ConsumerByteBuffer;
import io.vlingo.wire.message.ConsumerByteBufferPool;
import io.vlingo.wire.node.Address;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.function.Function;
import reactor.core.publisher.EmitterProcessor;
import reactor.retry.Retry;

public class RSocketClientChannel
implements ClientRequestResponseChannel {
    private final EmitterProcessor<Payload> publisher = EmitterProcessor.create();
    private final Logger logger;
    private final ChannelResponseHandler responseHandler;
    private final Address address;
    private final Duration connectionTimeout;
    private final ClientTransport transport;
    private RSocket channelSocket;

    public RSocketClientChannel(ClientTransport clientTransport, Address address, ResponseChannelConsumer consumer, int maxBufferPoolSize, int maxMessageSize, Logger logger) {
        this(clientTransport, address, consumer, maxBufferPoolSize, maxMessageSize, logger, Duration.ofMillis(100L));
    }

    public RSocketClientChannel(ClientTransport clientTransport, Address address, ResponseChannelConsumer consumer, int maxBufferPoolSize, int maxMessageSize, Logger logger, Duration connectionTimeout) {
        this.logger = logger;
        this.address = address;
        this.connectionTimeout = connectionTimeout;
        this.responseHandler = new ChannelResponseHandler(consumer, maxBufferPoolSize, maxMessageSize, logger);
        this.transport = clientTransport;
    }

    @Override
    public void close() {
        if (this.channelSocket != null && !this.channelSocket.isDisposed()) {
            try {
                this.channelSocket.dispose();
            }
            catch (Throwable t) {
                this.logger.error("Unexpected error on closing channel socket", t);
            }
        }
        this.channelSocket = null;
    }

    @Override
    public void requestWith(ByteBuffer buffer) {
        this.prepareChannel();
        if (this.channelSocket != null && !this.channelSocket.isDisposed()) {
            ByteBuffer data = ByteBuffer.allocate(buffer.capacity());
            data.put(buffer);
            data.flip();
            this.publisher.onNext((Object)DefaultPayload.create((ByteBuffer)data));
        } else {
            this.logger.debug("RSocket client channel for {} not ready. Message dropped", new Object[]{this.address});
        }
    }

    @Override
    public void probeChannel() {
        this.prepareChannel();
    }

    private void prepareChannel() {
        try {
            if (this.channelSocket == null || this.channelSocket.isDisposed()) {
                this.channelSocket = (RSocket)RSocketConnector.create().payloadDecoder(PayloadDecoder.ZERO_COPY).connect(this.transport).timeout(this.connectionTimeout).doOnError(throwable -> this.logger.error("Failed to create RSocket client channel for address {}", new Object[]{this.address, throwable})).block();
                if (this.channelSocket != null) {
                    this.channelSocket.requestChannel(this.publisher).retryWhen((Function)Retry.anyOf((Class[])new Class[]{ApplicationErrorException.class}).doOnRetry(retryContext -> this.logger.debug("RSocket client channel for address {} received a retryable error", new Object[]{this.address, retryContext.exception()}))).subscribe(x$0 -> this.responseHandler.handle(x$0), throwable -> this.logger.error("RSocket client channel for address {} received unrecoverable error", new Object[]{this.address, throwable}));
                    this.logger.info("RSocket client channel opened for address {}", new Object[]{this.address});
                    this.channelSocket.onClose().doFinally(signalType -> {
                        this.logger.info("RSocket client channel for address {} is closed", new Object[]{this.address});
                        this.close();
                    }).subscribe(ignored -> {}, throwable -> this.logger.error("Unexpected error on closing RSocket client channel socket for address {}", new Object[]{this.address, throwable}));
                }
            }
        }
        catch (Throwable t) {
            this.logger.warn("Failed to create RSocket client channel for address {}", new Object[]{this.address, t});
            this.close();
        }
    }

    private static class ChannelResponseHandler {
        private final ResponseChannelConsumer consumer;
        private final Logger logger;
        private final ConsumerByteBufferPool readBufferPool;

        private ChannelResponseHandler(ResponseChannelConsumer consumer, int maxBufferPoolSize, int maxMessageSize, Logger logger) {
            this.consumer = consumer;
            this.readBufferPool = new ConsumerByteBufferPool(ElasticResourcePool.Config.of((int)maxBufferPoolSize), maxMessageSize);
            this.logger = logger;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handle(Payload payload) {
            ConsumerByteBuffer pooledBuffer = this.readBufferPool.acquire("RSocketClientChannel#ChannelResponseHandler#handle");
            try {
                ByteBuffer payloadData = payload.getData();
                this.consumer.consume(pooledBuffer.put(payloadData).flip());
            }
            catch (Throwable e) {
                this.logger.error("Unexpected error reading incoming payload", e);
                pooledBuffer.release();
            }
            finally {
                payload.release();
            }
        }
    }
}

