/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import io.scalecube.services.annotations.ServiceMethod;
import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import lombok.Generated;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.CommonDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.rpc.RpcManager;
import org.jetlinks.core.rpc.RpcService;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.supports.cluster.AbstractDeviceOperationBroker;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;

public class RpcDeviceOperationBroker
extends AbstractDeviceOperationBroker {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RpcDeviceOperationBroker.class);
    private final RpcManager rpcManager;
    private final DeviceSessionManager sessionManager;
    private final Sinks.Many<Message> sendToDevice = Sinks.unsafe().many().unicast().onBackpressureBuffer((Queue)Queues.unboundedMultiproducer().get());
    private final Map<AbstractDeviceOperationBroker.AwaitKey, RepayableDeviceMessage<?>> awaits = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(5L)).removalListener(notify -> {
        if (notify.getCause() == RemovalCause.EXPIRED) {
            try {
                log.debug("discard await reply message[{}] message,{}", notify.getKey(), notify.getValue());
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
    }).build().asMap();
    private final List<Function<Message, Mono<Void>>> handler = new CopyOnWriteArrayList<Function<Message, Mono<Void>>>();
    static MessageType[] types = MessageType.values();

    public RpcDeviceOperationBroker(RpcManager rpcManager, DeviceSessionManager sessionManager) {
        this.rpcManager = rpcManager;
        this.sessionManager = sessionManager;
        rpcManager.registerService((Object)new ServiceImpl());
        Schedulers.parallel().schedulePeriodically(this::checkExpires, 10L, 10L, TimeUnit.SECONDS);
    }

    @Override
    public Flux<DeviceStateInfo> getDeviceState(String deviceGatewayServerId, Collection<String> deviceIdList) {
        if (deviceIdList.size() == 1) {
            String deviceId = deviceIdList.iterator().next();
            return this.sessionManager.checkAlive(deviceId, false).map(alive -> new DeviceStateInfo(deviceId, alive != false ? (byte)1 : (byte)-1)).flux();
        }
        return Flux.fromIterable(deviceIdList).flatMap(id -> this.sessionManager.checkAlive(id, false).map(alive -> new DeviceStateInfo(id, alive != false ? (byte)1 : (byte)-1)));
    }

    @Override
    public Disposable handleGetDeviceState(String serverId, Function<Publisher<String>, Flux<DeviceStateInfo>> stateMapper) {
        return Disposables.disposed();
    }

    @Override
    public Mono<Integer> send(String deviceGatewayServerId, Publisher<? extends Message> message) {
        if (this.rpcManager.currentServerId().equals(deviceGatewayServerId)) {
            return Flux.from(message).flatMap(this::handleSendToDevice).then(Reactors.ALWAYS_ONE);
        }
        return Flux.from(message).flatMap(msg -> {
            msg.addHeader(Headers.sendFrom, (Object)this.rpcManager.currentServerId());
            ByteBuf buf = this.encode((Message)msg);
            ByteBuf unreleasableBuffer = Unpooled.unreleasableBuffer((ByteBuf)buf);
            return this.rpcManager.getService(deviceGatewayServerId, Service.class).flatMap(service -> service.send(unreleasableBuffer).then(Reactors.ALWAYS_ONE)).switchIfEmpty(Reactors.ALWAYS_ZERO).doFinally(ignore -> ReferenceCountUtil.release((Object)buf));
        }).reduce((Object)0, Integer::sum);
    }

    private Mono<Void> handleSendToDevice(Message message) {
        return this.doSendToDevice(message);
    }

    public Disposable handleSendToDeviceMessage(String serverId, Function<Message, Mono<Void>> handler) {
        this.handler.add(handler);
        return () -> this.handler.remove(handler);
    }

    private void addAwaitReplyKey(Message message) {
        if (message instanceof RepayableDeviceMessage && !message.getHeader(Headers.sendAndForget).orElse(false).booleanValue()) {
            RepayableDeviceMessage msg = (RepayableDeviceMessage)message;
            this.awaits.put(this.getAwaitReplyKey((DeviceMessage)msg), msg);
        }
    }

    private Mono<Void> doSendToDevice(Message message) {
        return TraceHolder.writeContextTo((Object)message, Message::addHeader).flatMap(msg -> {
            if (this.sendToDevice.currentSubscriberCount() == 0 && this.handler.isEmpty()) {
                log.warn("no handler for message {}", msg);
                return this.doReply(this.createReply((Message)msg).error(ErrorCode.SYSTEM_ERROR));
            }
            if (this.sendToDevice.currentSubscriberCount() != 0) {
                try {
                    this.sendToDevice.emitNext(msg, Reactors.emitFailureHandler());
                }
                catch (Throwable err) {
                    return this.doReply(this.createReply((Message)msg).error(err));
                }
            }
            return this.doSendToDevice((Message)msg, this.handler).onErrorResume(error -> this.reply(this.createReply(message).error(error)).then());
        });
    }

    private Mono<Void> doSendToDevice(Message message, List<Function<Message, Mono<Void>>> handlers) {
        if (handlers.size() == 1) {
            return handlers.get(0).apply(message);
        }
        return Flux.fromIterable(handlers).concatMap(h -> (Publisher)h.apply(message)).then();
    }

    private DeviceMessageReply createReply(Message message) {
        if (message instanceof RepayableDeviceMessage) {
            return ((RepayableDeviceMessage)message).newReply();
        }
        if (message instanceof DeviceMessage) {
            return new CommonDeviceMessageReply().deviceId(((DeviceMessage)message).getDeviceId()).messageId(message.getMessageId());
        }
        return new CommonDeviceMessageReply().messageId(message.getMessageId());
    }

    @Override
    public Mono<Integer> send(Publisher<? extends BroadcastMessage> message) {
        return Reactors.ALWAYS_ZERO;
    }

    @Override
    public Flux<Message> handleSendToDeviceMessage(String serverId) {
        return this.sendToDevice.asFlux();
    }

    @Override
    protected Mono<Void> doReply(DeviceMessageReply reply) {
        RepayableDeviceMessage<?> request = this.awaits.remove(this.getAwaitReplyKey((DeviceMessage)reply));
        String serviceId = null;
        if (request != null) {
            serviceId = request.getHeader(Headers.sendFrom).orElse(null);
        }
        Flux serviceFlux = serviceId != null ? this.rpcManager.getService(serviceId, Service.class).flux() : this.rpcManager.getServices(Service.class).map(RpcService::service);
        ByteBuf buf = this.encode((Message)reply);
        ByteBuf unreleasableBuffer = Unpooled.unreleasableBuffer((ByteBuf)buf);
        return serviceFlux.flatMap(service -> service.reply(unreleasableBuffer)).then().doFinally(ignore -> ReferenceCountUtil.release((Object)buf));
    }

    protected ObjectInput createInput(ByteBuf input) {
        return new ObjectInputStream((InputStream)new ByteBufInputStream(input, true));
    }

    protected ObjectOutput createOutput(ByteBuf output) {
        return new ObjectOutputStream((OutputStream)new ByteBufOutputStream(output));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private Message decode(ByteBuf buf) {
        try (ObjectInput input = this.createInput(buf);){
            MessageType type = types[input.readByte()];
            DeviceMessage msg = type.forDevice();
            if (msg != null) {
                msg.readExternal(input);
                DeviceMessage deviceMessage = msg;
                return deviceMessage;
            }
            Message message = (Message)SerializeUtils.readObject((ObjectInput)input);
            return message;
        }
    }

    private ByteBuf encode(Message message) {
        ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
        try (ObjectOutput output = this.createOutput(buf);){
            output.writeByte(message.getMessageType().ordinal());
            if (message.getMessageType().iSupportDevice()) {
                message.writeExternal(output);
            } else {
                SerializeUtils.writeObject((Object)message, (ObjectOutput)output);
            }
        }
        catch (Throwable e) {
            ReferenceCountUtil.safeRelease((Object)buf);
            throw e;
        }
        return buf;
    }

    private class ServiceImpl
    implements Service {
        private ServiceImpl() {
        }

        @Override
        public Mono<Void> send(ByteBuf payload) {
            Message msg = RpcDeviceOperationBroker.this.decode(payload);
            RpcDeviceOperationBroker.this.addAwaitReplyKey(msg);
            return RpcDeviceOperationBroker.this.doSendToDevice(msg);
        }

        @Override
        public Mono<Void> reply(ByteBuf buf) {
            Message msg = RpcDeviceOperationBroker.this.decode(buf);
            if (msg instanceof DeviceMessageReply) {
                RpcDeviceOperationBroker.this.handleReply((DeviceMessageReply)msg);
            }
            return Mono.empty();
        }
    }

    @io.scalecube.services.annotations.Service
    public static interface Service {
        @ServiceMethod
        public Mono<Void> send(ByteBuf var1);

        @ServiceMethod
        public Mono<Void> reply(ByteBuf var1);
    }
}

