/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.ipc;

import java.util.concurrent.atomic.AtomicInteger;
import org.jetlinks.supports.ipc.IpcResponse;
import org.jetlinks.supports.ipc.ResponseType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

class IpcRequestHandler<RES>
implements Disposable {
    private static final Logger log = LoggerFactory.getLogger(IpcRequestHandler.class);
    EmitterProcessor<RES> processor = EmitterProcessor.create((int)Integer.MAX_VALUE);
    Disposable.Composite disposable = Disposables.composite();
    FluxSink<RES> sink = this.processor.sink();
    private final AtomicInteger seqInc = new AtomicInteger();
    private final AtomicInteger totalSeq = new AtomicInteger(-1);

    IpcRequestHandler() {
    }

    Mono<RES> handleRequest() {
        return this.processor.next().doFinally(s -> this.disposable.dispose());
    }

    Flux<RES> handleStream() {
        return this.processor.doFinally(s -> this.disposable.dispose());
    }

    void complete() {
        if (this.processor.isDisposed()) {
            log.debug("handler is disposed");
        }
        this.processor.onComplete();
        this.sink.complete();
    }

    synchronized void handle(IpcResponse<RES> res) {
        if (res.hasResult()) {
            this.sink.next(res.getResult());
        }
        if (res.hasError()) {
            this.error(res.getError());
        } else {
            int seq;
            int resSeq = res.getSeq();
            int n = seq = resSeq < 0 ? -1 : this.seqInc.getAndIncrement();
            if (res.getType() == ResponseType.complete) {
                if (resSeq < 0 || seq > resSeq || this.totalSeq.get() != -1) {
                    this.complete();
                } else {
                    log.debug("ipc response complete early,seq[{}],total[{}]", (Object)this.seqInc, (Object)resSeq);
                    this.totalSeq.set(resSeq);
                }
            } else {
                int total = this.totalSeq.get();
                if (total >= 0 && seq + 1 >= total) {
                    this.complete();
                }
            }
        }
    }

    void error(Throwable err) {
        this.sink.error(err);
    }

    IpcRequestHandler<RES> doOnDispose(Disposable disposable) {
        this.disposable.add(disposable);
        return this;
    }

    public void dispose() {
        this.complete();
    }
}

