/*
 * Decompiled with CFR 0.152.
 */
package io.vlingo.wire.fdx.bidirectional.rsocket;

import io.rsocket.Payload;
import io.rsocket.util.ByteBufPayload;
import io.vlingo.actors.Logger;
import io.vlingo.common.pool.ElasticResourcePool;
import io.vlingo.wire.channel.RequestChannelConsumer;
import io.vlingo.wire.channel.RequestChannelConsumerProvider;
import io.vlingo.wire.channel.RequestResponseContext;
import io.vlingo.wire.channel.ResponseSenderChannel;
import io.vlingo.wire.message.ConsumerByteBuffer;
import io.vlingo.wire.message.ConsumerByteBufferPool;
import java.nio.ByteBuffer;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.UnicastProcessor;

class RSocketChannelContext
implements RequestResponseContext<FluxSink<ConsumerByteBuffer>> {
    private final RequestChannelConsumer consumer;
    private final Logger logger;
    private final ConsumerByteBufferPool readBufferPool;
    private final UnicastProcessor<Payload> processor;
    private Object closingData;
    private Object consumerData;

    RSocketChannelContext(RequestChannelConsumerProvider consumerProvider, int maxBufferPoolSize, int maxMessageSize, Logger logger) {
        this.consumer = consumerProvider.requestChannelConsumer();
        this.logger = logger;
        this.readBufferPool = new ConsumerByteBufferPool(ElasticResourcePool.Config.of((int)maxBufferPoolSize), maxMessageSize);
        this.processor = UnicastProcessor.create();
    }

    UnicastProcessor<Payload> processor() {
        return this.processor;
    }

    @Override
    public <T> T consumerData() {
        return (T)this.consumerData;
    }

    @Override
    public <T> T consumerData(T workingData) {
        this.consumerData = workingData;
        return workingData;
    }

    @Override
    public boolean hasConsumerData() {
        return this.consumerData != null;
    }

    @Override
    public String id() {
        return null;
    }

    @Override
    public ResponseSenderChannel sender() {
        return null;
    }

    @Override
    public void whenClosing(Object data) {
        this.closingData = data;
    }

    public void close() {
        if (this.closingData != null) {
            try {
                this.consumer.closeWith(this, this.closingData);
            }
            catch (Exception e) {
                this.logger.error("Failed to close client channel because: " + e.getMessage(), (Throwable)e);
            }
        }
    }

    public void consume(Payload request) {
        ConsumerByteBuffer pooledBuffer = this.readBufferPool.acquire("RSocketChannelContext#consume");
        try {
            pooledBuffer.put(request.getData());
            this.consumer.consume(this, pooledBuffer.flip());
        }
        catch (Throwable t) {
            pooledBuffer.release();
            throw t;
        }
    }

    @Override
    public void abandon() {
        this.close();
        this.processor.dispose();
    }

    @Override
    public void respondWith(ConsumerByteBuffer buffer) {
        this.processor.onNext((Object)ByteBufPayload.create((ByteBuffer)buffer.asByteBuffer()));
    }
}

