/*
 * Decompiled with CFR 0.152.
 */
package hex.tree.xgboost.remote;

import hex.steam.SteamMessageSender;
import hex.steam.SteamMessenger;
import hex.tree.xgboost.XGBoostModel;
import hex.tree.xgboost.exec.RemoteXGBoostExecutor;
import java.io.IOException;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import org.apache.log4j.Logger;
import water.H2O;
import water.Job;
import water.Key;
import water.fvec.Frame;

public class SteamExecutorStarter
implements SteamMessenger {
    private static final Logger LOG = Logger.getLogger(SteamExecutorStarter.class);
    private static SteamExecutorStarter instance;
    private final Object sendingLock = new Object[0];
    private final Object clusterLock = new Object[0];
    private SteamMessageSender sender;
    private ClusterInfo cluster;
    private final Deque<Map<String, String>> receivedMessages = new LinkedList<Map<String, String>>();

    public static SteamExecutorStarter getInstance() {
        return instance;
    }

    public SteamExecutorStarter() {
        instance = this;
    }

    public RemoteXGBoostExecutor getRemoteExecutor(XGBoostModel model, Frame train, Job<XGBoostModel> job) throws IOException {
        ClusterInfo clusterInfo = this.ensureClusterStarted((Key<XGBoostModel>)model._key, job);
        return SteamExecutorStarter.makeExecutor(model, train, clusterInfo);
    }

    public void startCluster(Key<XGBoostModel> key, Job<XGBoostModel> job) throws IOException {
        this.ensureClusterStarted(key, job);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ClusterInfo ensureClusterStarted(Key<XGBoostModel> key, Job<XGBoostModel> job) throws IOException {
        Object object = this.clusterLock;
        synchronized (object) {
            if (this.cluster == null) {
                LOG.info((Object)("Starting external cluster for model " + key + "."));
                this.startCluster(job);
            } else {
                LOG.info((Object)("External cluster available, starting model " + key + " now."));
            }
            return this.cluster;
        }
    }

    private void startCluster(Job<XGBoostModel> job) throws IOException {
        this.clearMessages();
        Map<String, String> startRequest = this.makeStartRequest();
        this.sendMessage(startRequest);
        while (!job.stop_requested()) {
            Map<String, String> response = this.waitForMessage();
            if (response != null) {
                if ("started".equals(response.get("status"))) {
                    String remoteUri = response.get("uri");
                    String userName = response.get("user");
                    String password = response.get("password");
                    this.cluster = new ClusterInfo(remoteUri, userName, password);
                    LOG.info((Object)("External cluster started at " + remoteUri + "."));
                    break;
                }
                if ("starting".equals(response.get("status"))) {
                    LOG.info((Object)"Continuing to wait for external cluster to start.");
                    continue;
                }
                if ("failed".equals(response.get("status"))) {
                    throw new IllegalStateException("Failed to start external cluster: " + response.get("reason"));
                }
                throw new IllegalStateException("Unknown status received from steam: " + response.get("status") + ", reason:" + response.get("reason"));
            }
            throw new IllegalStateException("No response received from Steam.");
        }
    }

    private static RemoteXGBoostExecutor makeExecutor(XGBoostModel model, Frame train, ClusterInfo cluster) {
        return new RemoteXGBoostExecutor(model, train, cluster.uri, cluster.userName, cluster.password);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearMessages() {
        Deque<Map<String, String>> deque = this.receivedMessages;
        synchronized (deque) {
            this.receivedMessages.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<String, String> waitForMessage() {
        int timeout = Integer.parseInt(H2O.getSysProperty((String)"steam.notification.timeout", (String)"20000"));
        Deque<Map<String, String>> deque = this.receivedMessages;
        synchronized (deque) {
            if (!this.receivedMessages.isEmpty()) {
                return this.receivedMessages.pop();
            }
            try {
                this.receivedMessages.wait(timeout);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (!this.receivedMessages.isEmpty()) {
                return this.receivedMessages.pop();
            }
            return null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onConnectionStateChange(SteamMessageSender sender) {
        Object object = this.sendingLock;
        synchronized (object) {
            this.sender = sender;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendMessage(Map<String, String> message) throws IOException {
        Object object = this.sendingLock;
        synchronized (object) {
            if (this.sender == null) {
                throw new IOException("Steam communication chanel is not open.");
            }
            this.sender.sendMessage(message);
        }
    }

    public void onMessage(Map<String, String> message) {
        if ("stopXGBoostClusterNotification".equals(message.get("_type"))) {
            this.handleStopRequest(message);
        } else if ("xgboostClusterStartNotification".equals(message.get("_type"))) {
            this.queueResponse(message);
        } else {
            LOG.debug((Object)("Ignoring message " + message.get("_id") + " " + message.get("_type")));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void queueResponse(Map<String, String> message) {
        Deque<Map<String, String>> deque = this.receivedMessages;
        synchronized (deque) {
            LOG.info((Object)("Received message response " + message.get("_id")));
            this.receivedMessages.add(message);
            this.receivedMessages.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleStopRequest(Map<String, String> message) {
        LOG.info((Object)("Received stop request " + message.get("_id")));
        boolean xgBoostInProgress = this.isXGBoostInProgress();
        if (xgBoostInProgress) {
            LOG.info((Object)"Responding to stop request with allowed=false");
            this.sendStopResponse(message, false);
        } else {
            Object object = this.clusterLock;
            synchronized (object) {
                LOG.info((Object)"Responding to stop request with allowed=true");
                this.sendStopResponse(message, true);
                this.cluster = null;
            }
        }
    }

    private void sendStopResponse(Map<String, String> request, boolean allow) {
        try {
            this.sendMessage(this.makeStopConfirmation(request, allow));
        }
        catch (IOException e) {
            LOG.error((Object)"Failed to send stop cluster response.", (Throwable)e);
        }
    }

    private boolean isXGBoostInProgress() {
        return Arrays.stream(Job.jobs()).anyMatch(job -> job.isRunning() && job._result.get() instanceof XGBoostModel);
    }

    private Map<String, String> makeStartRequest() {
        HashMap<String, String> req = new HashMap<String, String>();
        req.put("_type", "startXGBoostCluster");
        req.put("_id", H2O.SELF.getIpPortString() + "_startXGBoost");
        return req;
    }

    private Map<String, String> makeStopConfirmation(Map<String, String> message, boolean allow) {
        HashMap<String, String> req = new HashMap<String, String>();
        req.put("_type", "stopXGBoostClusterConfirmation");
        req.put("_id", message.get("_id") + "_response");
        req.put("allowed", Boolean.toString(allow));
        return req;
    }

    private static class ClusterInfo {
        final String uri;
        final String userName;
        final String password;

        private ClusterInfo(String uri, String userName, String password) {
            this.uri = uri;
            this.userName = userName;
            this.password = password;
        }
    }
}

