/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.rest.broadcaster;

import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.kylin.common.logging.SetLogCategory;
import org.apache.kylin.common.persistence.transaction.BroadcastEventReadyNotifier;
import org.apache.kylin.common.util.AddressUtil;
import org.apache.kylin.common.util.ClusterConstant;
import org.apache.kylin.common.util.DaemonThreadFactory;
import org.apache.kylin.common.util.NamedThreadFactory;
import org.apache.kylin.rest.broadcaster.BroadcastEventHandler;
import org.apache.kylin.rest.cluster.ClusterManager;
import org.apache.kylin.rest.response.ServerInfoResponse;
import org.apache.kylin.tool.restclient.RestClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Broadcaster
implements Closeable {
    private static final Logger logger = LoggerFactory.getLogger(Broadcaster.class);
    private final ClusterManager clusterManager;
    private final ExecutorService eventPollExecutor;
    private final ExecutorService eventHandlerExecutor;
    private final BlockingQueue<Runnable> runnableQueue;
    private final BlockingQueue<BroadcastEventReadyNotifier> eventQueue;
    private final ConcurrentHashMap<String, RestClient> restClientMap;
    private volatile boolean isRunning;
    private volatile BroadcastEventHandler handler;

    @Autowired
    public Broadcaster(ClusterManager clusterManager) {
        this.clusterManager = clusterManager;
        this.runnableQueue = new LinkedBlockingQueue<Runnable>();
        this.eventQueue = new LinkedBlockingQueue<BroadcastEventReadyNotifier>();
        this.restClientMap = new ConcurrentHashMap();
        this.eventHandlerExecutor = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, this.runnableQueue, (ThreadFactory)new DaemonThreadFactory("BroadcastEvent-handler"), new ThreadPoolExecutor.DiscardPolicy());
        this.eventPollExecutor = Executors.newSingleThreadExecutor((ThreadFactory)new NamedThreadFactory("BroadcastEvent-poll"));
    }

    public void start() {
        this.isRunning = true;
        this.eventPollExecutor.submit(this::consumeEvent);
    }

    public void register(BroadcastEventHandler handler) {
        this.handler = handler;
    }

    public void unregister() {
        this.handler = null;
    }

    public void announce(BroadcastEventReadyNotifier event) {
        if (this.eventQueue.contains(event)) {
            try (SetLogCategory ignored = new SetLogCategory("schedule");){
                logger.debug("broadcast event queue has contain this event: {}", (Object)event);
            }
            return;
        }
        if (!this.eventQueue.offer(event)) {
            logger.warn("unable to send broadcast ");
        }
    }

    public void consumeEvent() {
        try (SetLogCategory ignored = new SetLogCategory("schedule");){
            while (this.isRunning) {
                BroadcastEventReadyNotifier notifier = this.eventQueue.take();
                this.handleEvent(notifier);
            }
        }
        catch (InterruptedException e) {
            logger.error("consume broadcast event fail: ", (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    private void handleEvent(BroadcastEventReadyNotifier notifier) {
        try {
            Set<String> notifyNodes = this.getBroadcastNodes(notifier);
            if (notifyNodes.isEmpty()) {
                logger.debug("no need broadcast the event {} to other node.", (Object)notifier);
                return;
            }
            CountDownLatch latch = new CountDownLatch(notifyNodes.size());
            String identity = AddressUtil.getLocalInstance();
            for (String node : notifyNodes) {
                this.eventHandlerExecutor.submit(() -> {
                    try {
                        if (identity.equals(node)) {
                            if (this.handler != null) {
                                this.handler.handleLocally(notifier);
                            }
                        } else {
                            this.remoteHandle(node, notifier);
                        }
                        logger.info("Broadcast to {} notify.", (Object)node);
                    }
                    catch (IOException e) {
                        logger.warn("Failed to notify.", (Throwable)e);
                    }
                    finally {
                        latch.countDown();
                    }
                });
            }
            if (!latch.await(5L, TimeUnit.SECONDS)) {
                logger.warn("Failed to broadcast due to timeout. current BroadcastEvent-handler task num {}", (Object)this.runnableQueue.size());
            }
        }
        catch (InterruptedException e) {
            logger.warn("Thread interrupted");
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            logger.warn("failed to broadcast", (Throwable)e);
        }
    }

    private void remoteHandle(String node, BroadcastEventReadyNotifier notifier) throws IOException {
        this.restClientMap.computeIfAbsent(node, RestClient::new);
        this.restClientMap.get(node).notify(notifier);
    }

    private Set<String> getBroadcastNodes(BroadcastEventReadyNotifier notifier) {
        Set<String> nodes;
        switch (notifier.getBroadcastScope()) {
            case LEADER_NODES: {
                nodes = this.getNodesByModes(ClusterConstant.ServerModeEnum.ALL, ClusterConstant.ServerModeEnum.JOB);
                break;
            }
            case ALL_NODES: {
                nodes = this.getNodesByModes(ClusterConstant.ServerModeEnum.ALL);
                break;
            }
            case JOB_NODES: {
                nodes = this.getNodesByModes(ClusterConstant.ServerModeEnum.JOB);
                break;
            }
            case QUERY_NODES: {
                nodes = this.getNodesByModes(ClusterConstant.ServerModeEnum.QUERY);
                break;
            }
            case QUERY_AND_ALL: {
                nodes = this.getNodesByModes(ClusterConstant.ServerModeEnum.QUERY, ClusterConstant.ServerModeEnum.ALL);
                break;
            }
            default: {
                nodes = this.getNodesByModes(ClusterConstant.ServerModeEnum.ALL, ClusterConstant.ServerModeEnum.JOB, ClusterConstant.ServerModeEnum.QUERY);
            }
        }
        if (!notifier.needBroadcastSelf()) {
            String identity = AddressUtil.getLocalInstance();
            return nodes.stream().filter(node -> !node.equals(identity)).collect(Collectors.toSet());
        }
        return nodes;
    }

    private Set<String> getNodesByModes(ClusterConstant.ServerModeEnum ... serverModeEnums) {
        if (ArrayUtils.isEmpty((Object[])serverModeEnums)) {
            return Collections.emptySet();
        }
        Set serverModeNameSets = Stream.of(serverModeEnums).filter(Objects::nonNull).map(ClusterConstant.ServerModeEnum::getName).collect(Collectors.toSet());
        List<ServerInfoResponse> nodes = this.clusterManager.getServersFromCache();
        Set<Object> result = Sets.newHashSet();
        if (CollectionUtils.isEmpty(nodes)) {
            logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config");
        } else {
            result = nodes.stream().filter(node -> serverModeNameSets.contains(node.getMode())).map(ServerInfoResponse::getHost).collect(Collectors.toSet());
        }
        return result;
    }

    @Override
    public void close() {
        this.isRunning = false;
    }
}

