/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.transport.local;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.transport.local.LocalSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

final class LocalDuplexConnection
implements DuplexConnection {
    private final LocalSocketAddress address;
    private final ByteBufAllocator allocator;
    private final Flux<ByteBuf> in;
    private final Mono<Void> onClose;
    private final UnboundedProcessor out;

    LocalDuplexConnection(String name, ByteBufAllocator allocator, Flux<ByteBuf> in, UnboundedProcessor out, Mono<Void> onClose) {
        this.address = new LocalSocketAddress(name);
        this.allocator = Objects.requireNonNull(allocator, "allocator must not be null");
        this.in = Objects.requireNonNull(in, "in must not be null");
        this.out = Objects.requireNonNull(out, "out must not be null");
        this.onClose = Objects.requireNonNull(onClose, "onClose must not be null");
    }

    public void dispose() {
        this.out.onComplete();
    }

    public boolean isDisposed() {
        return this.out.isDisposed();
    }

    public Mono<Void> onClose() {
        return this.onClose;
    }

    public Flux<ByteBuf> receive() {
        return this.in.transform(Operators.lift((__, actual) -> new ByteBufReleaserOperator((CoreSubscriber<? super ByteBuf>)actual, this)));
    }

    public void sendFrame(int streamId, ByteBuf frame) {
        if (streamId == 0) {
            this.out.onNextPrioritized(frame);
        } else {
            this.out.onNext(frame);
        }
    }

    public void sendErrorAndClose(RSocketErrorException e) {
        ByteBuf errorFrame = ErrorFrameCodec.encode((ByteBufAllocator)this.allocator, (int)0, (Throwable)e);
        this.out.onNext(errorFrame);
        this.dispose();
    }

    public ByteBufAllocator alloc() {
        return this.allocator;
    }

    public SocketAddress remoteAddress() {
        return this.address;
    }

    static class ByteBufReleaserOperator
    implements CoreSubscriber<ByteBuf>,
    Subscription,
    Fuseable.QueueSubscription<ByteBuf> {
        final CoreSubscriber<? super ByteBuf> actual;
        final LocalDuplexConnection parent;
        Subscription s;

        public ByteBufReleaserOperator(CoreSubscriber<? super ByteBuf> actual, LocalDuplexConnection parent) {
            this.actual = actual;
            this.parent = parent;
        }

        public void onSubscribe(Subscription s) {
            if (Operators.validate((Subscription)this.s, (Subscription)s)) {
                this.s = s;
                this.actual.onSubscribe((Subscription)this);
            }
        }

        public void onNext(ByteBuf buf) {
            try {
                this.actual.onNext((Object)buf);
            }
            finally {
                buf.release();
            }
        }

        public void onError(Throwable t) {
            this.parent.out.onError(t);
            this.actual.onError(t);
        }

        public void onComplete() {
            this.parent.out.onComplete();
            this.actual.onComplete();
        }

        public void request(long n) {
            this.s.request(n);
        }

        public void cancel() {
            this.s.cancel();
            this.parent.out.onComplete();
        }

        public int requestFusion(int requestedMode) {
            return 0;
        }

        public ByteBuf poll() {
            throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
        }

        public int size() {
            throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
        }

        public boolean isEmpty() {
            throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
        }

        public void clear() {
            throw new UnsupportedOperationException("Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.");
        }
    }
}

