/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.search.cluster;

import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.search.cluster.BaseNodeMonitor;
import com.yahoo.search.cluster.MonitorConfiguration;
import com.yahoo.search.cluster.NodeManager;
import com.yahoo.search.cluster.TrafficNodeMonitor;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.yolean.UncheckedInterruptedException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ClusterMonitor<T> {
    private static final Logger log = Logger.getLogger(ClusterMonitor.class.getName());
    private final MonitorConfiguration configuration = new MonitorConfiguration();
    private final NodeManager<T> nodeManager;
    private final MonitorThread monitorThread;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new LinkedHashMap());
    private final Set<T> nodesToRemove = new LinkedHashSet<T>();
    private final Set<T> nodesToUpdate = new LinkedHashSet<T>();
    private boolean skipNextWait = false;

    public ClusterMonitor(NodeManager<T> manager, boolean startPingThread) {
        this.nodeManager = manager;
        this.monitorThread = new MonitorThread("search.clustermonitor." + manager.name());
        if (startPingThread) {
            this.monitorThread.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void reconfigure(Collection<T> nodes) {
        if (!this.monitorThread.isAlive()) {
            throw new IllegalStateException("monitor thread must be alive for reconfiguration");
        }
        this.nodesToUpdate.addAll(nodes);
        this.nodesToRemove.addAll(this.nodeMonitors.keySet());
        this.nodesToRemove.removeAll(nodes);
        for (T node : nodes) {
            if (this.nodeMonitors.containsKey(node)) continue;
            this.add(node, true);
        }
        NodeManager<T> nodeManager = this.nodeManager;
        synchronized (nodeManager) {
            this.skipNextWait = true;
            this.nodeManager.notifyAll();
        }
        try {
            while (!this.nodesToRemove.isEmpty() || !this.nodesToUpdate.isEmpty()) {
                this.wait(1L);
            }
        }
        catch (InterruptedException e) {
            throw new UncheckedInterruptedException(e, true);
        }
        this.nodeManager.pingIterationCompleted();
    }

    public void start() {
        if (!this.monitorThread.isAlive()) {
            this.monitorThread.start();
        }
    }

    public MonitorConfiguration getConfiguration() {
        return this.configuration;
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public void add(T node, boolean internal) {
        this.nodeMonitors.put(node, new TrafficNodeMonitor<T>(node, this.configuration, internal));
    }

    public synchronized void failed(T node, ErrorMessage error) {
        this.updateMonitoredNode(node, monitor -> monitor.failed(error), this.nodeManager::failed);
    }

    public synchronized void responded(T node) {
        this.updateMonitoredNode(node, TrafficNodeMonitor::responded, this.nodeManager::working);
    }

    private void updateMonitoredNode(T node, Consumer<TrafficNodeMonitor<T>> monitorUpdate, Consumer<T> nodeUpdate) {
        TrafficNodeMonitor<T> monitor = this.nodeMonitors.get(node);
        if (this.closed.get()) {
            monitor = null;
        }
        if (this.nodesToRemove.remove(node)) {
            this.nodeMonitors.remove(node);
            monitor = null;
        }
        if (monitor != null) {
            Boolean wasWorking = monitor.isKnownWorking();
            monitorUpdate.accept(monitor);
            if (wasWorking != monitor.isKnownWorking()) {
                nodeUpdate.accept(node);
            }
        }
        this.nodesToUpdate.remove(node);
    }

    public synchronized void ping(Executor executor) {
        for (BaseNodeMonitor<T> monitor : this.nodeMonitors()) {
            if (this.closed.get()) {
                return;
            }
            if (this.nodesToRemove.remove(monitor.getNode())) {
                this.nodeMonitors.remove(monitor.getNode());
                continue;
            }
            this.nodeManager.ping(this, monitor.getNode(), executor);
        }
        this.nodeManager.pingIterationCompleted();
    }

    public Iterator<BaseNodeMonitor<T>> nodeMonitorIterator() {
        return this.nodeMonitors().iterator();
    }

    public List<BaseNodeMonitor<T>> nodeMonitors() {
        return List.copyOf(this.nodeMonitors.values());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        this.closed.set(true);
        Object object = this;
        synchronized (object) {
            this.nodeMonitors.clear();
        }
        object = this.nodeManager;
        synchronized (object) {
            this.skipNextWait = true;
            this.nodeManager.notifyAll();
        }
        try {
            if (this.monitorThread.isAlive()) {
                this.monitorThread.join();
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private class MonitorThread
    extends Thread {
        MonitorThread(String name) {
            super(name);
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            log.fine("Starting cluster monitor thread " + this.getName());
            ExecutorService pingExecutor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory((String)"search.ping"));
            while (!ClusterMonitor.this.closed.get()) {
                try {
                    log.finest("Activating ping");
                    ClusterMonitor.this.ping(pingExecutor);
                    NodeManager nodeManager = ClusterMonitor.this.nodeManager;
                    synchronized (nodeManager) {
                        if (!ClusterMonitor.this.skipNextWait) {
                            ClusterMonitor.this.nodeManager.wait(ClusterMonitor.this.configuration.getCheckInterval());
                        }
                        ClusterMonitor.this.skipNextWait = false;
                    }
                }
                catch (Throwable e) {
                    if (ClusterMonitor.this.closed.get() && e instanceof InterruptedException) break;
                    if (!(e instanceof Exception)) {
                        log.log(Level.WARNING, "Error in monitor thread, will quit", e);
                        break;
                    }
                    log.log(Level.WARNING, "Exception in monitor thread", e);
                }
            }
            pingExecutor.shutdown();
            try {
                if (!pingExecutor.awaitTermination(10L, TimeUnit.SECONDS)) {
                    log.warning("Timeout waiting for ping executor to terminate");
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            log.fine("Stopped cluster monitor thread " + this.getName());
        }
    }
}

