/*
 * Decompiled with CFR 0.152.
 */
package io.reactivesocket.transport.local;

import io.reactivesocket.DuplexConnection;
import io.reactivesocket.Frame;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

public class LocalDuplexConnection
implements DuplexConnection {
    private final Flux<Frame> in;
    private final Subscriber<Frame> out;
    private final MonoProcessor<Void> closeNotifier;

    public LocalDuplexConnection(Flux<Frame> in, Subscriber<Frame> out, MonoProcessor<Void> closeNotifier) {
        this.in = in;
        this.out = out;
        this.closeNotifier = closeNotifier;
    }

    public Mono<Void> send(Publisher<Frame> frames) {
        return Flux.from(frames).concatMap(this::sendOne).then();
    }

    public Mono<Void> sendOne(Frame frame) {
        return Mono.fromRunnable(() -> this.out.onNext((Object)frame));
    }

    public Flux<Frame> receive() {
        return this.in;
    }

    public Mono<Void> close() {
        return Mono.defer(() -> {
            this.out.onComplete();
            this.closeNotifier.onComplete();
            return this.closeNotifier;
        });
    }

    public Mono<Void> onClose() {
        return this.closeNotifier;
    }

    public double availability() {
        return this.closeNotifier.isDisposed() ? 0.0 : 1.0;
    }
}

