/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.protocol.blocking;

import io.netty.util.concurrent.FastThreadLocalThread;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.jetlinks.core.defaults.BlockingDeviceOperator;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.codec.MessageCodecContext;
import org.jetlinks.core.monitor.Monitor;
import org.jetlinks.core.monitor.logger.Logger;
import org.jetlinks.core.utils.Reactors;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

class BlockingMessageCodecContext<T extends MessageCodecContext> {
    final Monitor monitor;
    final T context;
    final Duration timeout;
    final ContextView ctx;
    private Queue<Mono<?>> async;

    @Nullable
    public BlockingDeviceOperator getDevice() {
        return this.wrapBlocking(this.context.getDevice());
    }

    @Nullable
    public BlockingDeviceOperator getDevice(String deviceId) {
        return this.wrapBlocking((DeviceOperator)this.await(this.context.getDevice(deviceId)));
    }

    @Nonnull
    public Mono<DeviceOperator> getDeviceAsync(String deviceId) {
        return this.context.getDevice(deviceId);
    }

    public <R> R await(Mono<R> async) {
        return (R)Reactors.await((Mono)async.contextWrite(this.ctx), (Duration)this.timeout);
    }

    public synchronized void async(Mono<?> async) {
        if (this.async == null) {
            this.async = (Queue)Queues.unboundedMultiproducer().get();
        }
        this.async.add(async);
    }

    <V> Mono<V> runAsync() {
        Queue<Mono<?>> async = this.async;
        if (async == null) {
            return Mono.empty();
        }
        return new AsyncRunner(async);
    }

    public Monitor monitor() {
        return this.monitor;
    }

    public Logger logger() {
        return this.monitor.logger();
    }

    private BlockingDeviceOperator wrapBlocking(DeviceOperator device) {
        if (device == null) {
            return null;
        }
        return new BlockingDeviceOperator(device, this.timeout, this.ctx);
    }

    public BlockingMessageCodecContext(Monitor monitor, T context, Duration timeout, ContextView ctx) {
        this.monitor = monitor;
        this.context = context;
        this.timeout = timeout;
        this.ctx = ctx;
    }

    private static class AsyncRunner<V>
    extends Mono<V>
    implements Subscription {
        private final Queue<Mono<?>> async;
        private volatile CoreSubscriber<? super V> actual;
        private static final AtomicReferenceFieldUpdater<AsyncRunner, BaseSubscriber> PENDING = AtomicReferenceFieldUpdater.newUpdater(AsyncRunner.class, BaseSubscriber.class, "pending");
        private volatile BaseSubscriber<Object> pending;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void subscribe(@Nonnull CoreSubscriber<? super V> actual) {
            AsyncRunner asyncRunner = this;
            synchronized (asyncRunner) {
                if (this.actual != null) {
                    actual.onError((Throwable)Exceptions.duplicateOnSubscribeException());
                    return;
                }
                this.actual = actual;
            }
            this.actual.onSubscribe((Subscription)this);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void darin() {
            Mono<?> task;
            Queue<Mono<?>> queue = this.async;
            synchronized (queue) {
                task = this.async.poll();
            }
            if (task == null) {
                this.actual.onComplete();
                return;
            }
            BaseSubscriber<Object> subscriber = new BaseSubscriber<Object>(){

                @Nonnull
                public Context currentContext() {
                    return actual.currentContext();
                }

                protected void hookOnError(@Nonnull Throwable throwable) {
                    actual.onError(throwable);
                }

                protected void hookFinally(@Nonnull SignalType type) {
                    if (type == SignalType.ON_COMPLETE) {
                        this.darin();
                    }
                }
            };
            PENDING.set(this, (BaseSubscriber)subscriber);
            Thread thread = Thread.currentThread();
            if (Schedulers.isNonBlockingThread((Thread)thread) || thread instanceof FastThreadLocalThread) {
                task.subscribe((CoreSubscriber)subscriber);
            } else {
                Schedulers.parallel().schedule(() -> AsyncRunner.lambda$darin$0(task, (BaseSubscriber)subscriber));
            }
        }

        public void request(long n) {
            this.darin();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() {
            Queue<Mono<?>> queue = this.async;
            synchronized (queue) {
                this.async.clear();
            }
            BaseSubscriber pending = PENDING.getAndSet(this, null);
            if (pending != null) {
                pending.cancel();
            }
        }

        public AsyncRunner(Queue<Mono<?>> async) {
            this.async = async;
        }

        private static /* synthetic */ void lambda$darin$0(Mono task, BaseSubscriber subscriber) {
            task.subscribe((CoreSubscriber)subscriber);
        }
    }
}

