/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.protocol.blocking;

import io.netty.util.concurrent.FastThreadLocalThread;
import java.time.Duration;
import java.util.Map;
import javax.annotation.Nonnull;
import org.jetlinks.core.command.CommandSupport;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.DeviceMessageCodec;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.MessageEncodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.ValidateResult;
import org.jetlinks.core.monitor.Monitor;
import org.jetlinks.core.monitor.logger.Logger;
import org.jetlinks.core.monitor.tracer.Tracer;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.core.things.BlockingThingsDataManager;
import org.jetlinks.core.things.ThingsDataManager;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.supports.protocol.blocking.BlockingMessageCodecContext;
import org.jetlinks.supports.protocol.blocking.BlockingMessageDecodeContext;
import org.jetlinks.supports.protocol.blocking.BlockingMessageEncodeContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.ContextView;

public abstract class BlockingDeviceMessageCodec
implements DeviceMessageCodec {
    private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30L);
    protected final ServiceContext context;
    protected final Transport transport;

    public BlockingDeviceMessageCodec(ServiceContext context, Transport transport) {
        this.context = context;
        this.transport = transport;
    }

    public final Transport getSupportTransport() {
        return this.transport;
    }

    protected final Logger logger() {
        return this.context.getMonitor().logger();
    }

    protected final Logger logger(String deviceId) {
        return this.context.getMonitor(deviceId).logger();
    }

    protected final Tracer tracer() {
        return this.context.getMonitor().tracer();
    }

    protected final Tracer tracer(String deviceId) {
        return this.context.getMonitor(deviceId).tracer();
    }

    protected abstract void upstream(BlockingMessageDecodeContext var1);

    protected abstract void downstream(BlockingMessageEncodeContext var1);

    protected Duration getBlockingTimeout() {
        return DEFAULT_TIMEOUT;
    }

    protected <T> T await(Mono<T> mono) {
        return (T)Reactors.await(mono, (Duration)this.getBlockingTimeout());
    }

    protected final CommandSupport getCommandService(String serviceId) {
        return (CommandSupport)this.context.getService(serviceId, CommandSupport.class).orElseThrow(() -> new UnsupportedOperationException("unsupported commandService:" + serviceId));
    }

    protected final BlockingThingsDataManager getDataManager() {
        return this.context.getService(ThingsDataManager.class).map(manager -> new BlockingThingsDataManager(manager, this.getBlockingTimeout())).orElseThrow(() -> new UnsupportedOperationException("unsupported service:ThingsDataManager"));
    }

    protected void validateAndPutProperty(String deviceId, DeviceMetadata metadata, String property, Object value, Map<String, Object> container) {
        PropertyMetadata prop = metadata.getPropertyOrNull(property);
        if (prop == null) {
            this.logger(deviceId).info("message.undefined_property_metadata", new Object[]{property, value});
            return;
        }
        DataType type = prop.getValueType();
        ValidateResult validate = type.validate(value);
        if (!validate.isSuccess()) {
            this.logger(deviceId).warn("message.invalid_property_value", new Object[]{property, value, validate.getErrorMsg()});
            return;
        }
        container.put(property, validate.getValue());
    }

    @Nonnull
    public final Flux<Message> decode(@Nonnull MessageDecodeContext context) {
        return Flux.deferContextual(ctx -> {
            Monitor monitor;
            DeviceOperator device = context.getDevice();
            Monitor monitor2 = monitor = device == null ? this.context.getMonitor() : this.context.getMonitor(device.getDeviceId());
            if (this.isInNonBlocking()) {
                return Mono.fromCallable(() -> {
                    BlockingMessageDecodeContext decodeContext = new BlockingMessageDecodeContext(monitor, context, this.getBlockingTimeout(), (ContextView)ctx);
                    this.upstream(decodeContext);
                    return decodeContext;
                }).flatMap(BlockingMessageCodecContext::runAsync).subscribeOn(Schedulers.boundedElastic());
            }
            BlockingMessageDecodeContext decodeContext = new BlockingMessageDecodeContext(monitor, context, this.getBlockingTimeout(), (ContextView)ctx);
            this.upstream(decodeContext);
            return decodeContext.runAsync();
        });
    }

    @Nonnull
    public final Flux<EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
        return Flux.deferContextual(ctx -> {
            Monitor monitor;
            DeviceOperator device = context.getDevice();
            Monitor monitor2 = monitor = device == null ? this.context.getMonitor() : this.context.getMonitor(device.getDeviceId());
            if (this.isInNonBlocking()) {
                return Mono.fromCallable(() -> {
                    BlockingMessageEncodeContext encodeContext = new BlockingMessageEncodeContext(monitor, context, this.getBlockingTimeout(), (ContextView)ctx);
                    this.downstream(encodeContext);
                    return encodeContext;
                }).flatMap(BlockingMessageCodecContext::runAsync).subscribeOn(Schedulers.boundedElastic());
            }
            BlockingMessageEncodeContext encodeContext = new BlockingMessageEncodeContext(monitor, context, this.getBlockingTimeout(), (ContextView)ctx);
            this.downstream(encodeContext);
            return encodeContext.runAsync();
        });
    }

    protected boolean isInNonBlocking() {
        Thread thread = Thread.currentThread();
        return Schedulers.isNonBlockingThread((Thread)thread) || thread instanceof FastThreadLocalThread;
    }
}

