/*
 * Decompiled with CFR 0.152.
 */
package ai.grakn.engine.loader;

import ai.grakn.engine.loader.Loader;
import ai.grakn.engine.loader.TransactionState;
import ai.grakn.engine.util.ConfigProperties;
import ai.grakn.graql.InsertQuery;
import ai.grakn.util.ErrorMessage;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import javax.xml.ws.http.HTTPException;
import mjson.Json;
import spark.utils.IOUtils;

public class DistributedLoader
extends Loader {
    private static ConfigProperties prop = ConfigProperties.getInstance();
    private static ExecutorService executor = Executors.newSingleThreadExecutor();
    private Future future;
    private long pollingFrequency;
    private String graphName;
    private int currentHost;
    private List<String> hosts;
    private Map<String, Semaphore> availability;
    private Map<String, Integer> jobsTerminated;
    private static final String POST = "http://%s:" + ConfigProperties.getInstance().getProperty("server.port") + "/transaction/new" + "?" + "graphName" + "=%s";
    private static final String GET = "http://%s:" + ConfigProperties.getInstance().getProperty("server.port") + "/transaction/loaderState";

    public DistributedLoader(String graphName, Collection<String> hosts) {
        this.hosts = Lists.newArrayList(hosts);
        this.currentHost = 0;
        this.setBatchSize(prop.getPropertyAsInt("blockingLoader.batch-size"));
        this.setPollingFrequency(prop.getPropertyAsLong("loader.polling-frequency"));
        this.setThreadsNumber(prop.getAvailableThreads());
        this.resetAvailabilityMap();
        this.resetJobsTerminated();
        this.graphName = graphName;
    }

    @Override
    public Loader setThreadsNumber(int number) {
        this.threadsNumber = number;
        this.resetAvailabilityMap();
        return this;
    }

    public void resetAvailabilityMap() {
        this.availability = new HashMap<String, Semaphore>();
        this.hosts.forEach(h -> this.availability.put((String)h, new Semaphore(this.threadsNumber * 3)));
    }

    public void resetJobsTerminated() {
        this.jobsTerminated = new HashMap<String, Integer>();
        this.hosts.forEach(h -> this.jobsTerminated.put((String)h, 0));
    }

    public void setPollingFrequency(long number) {
        this.pollingFrequency = number;
    }

    @Override
    public void waitToFinish() {
        this.flush();
        if (this.future != null) {
            try {
                this.future.get();
            }
            catch (InterruptedException | ExecutionException e) {
                this.LOG.error(e.getMessage());
            }
        }
    }

    @Override
    public void sendQueriesToLoader(Collection<InsertQuery> queries) {
        Json inserts = Json.array();
        queries.stream().map(Object::toString).forEach(arg_0 -> ((Json)inserts).add(arg_0));
        HttpURLConnection currentConn = this.acquireNextHost();
        this.executePost(currentConn, inserts.toString());
        int responseCode = this.getResponseCode(currentConn);
        if (responseCode != 201) {
            throw new HTTPException(responseCode);
        }
        this.loadingJobs.incrementAndGet();
        this.LOG.info("Transaction sent to host: " + this.hosts.get(this.currentHost));
        if (this.future == null) {
            this.startCheckingStatus();
        }
        currentConn.disconnect();
    }

    private String getHostState(String host) {
        HttpURLConnection connection = this.getHost(host, GET);
        String response = this.getResponseBody(connection);
        connection.disconnect();
        return response;
    }

    private boolean transactionsIsEmpty() {
        return this.loadingJobs.get() + this.enqueuedJobs.get() == 0;
    }

    private void startCheckingStatus() {
        this.future = executor.submit(this::checkForStatusLoop);
    }

    private void stopCheckingStatus() {
        executor.shutdownNow();
        executor = Executors.newSingleThreadExecutor();
        this.future = null;
    }

    public void checkForStatusLoop() {
        while (!this.transactionsIsEmpty()) {
            int runningQueued = 0;
            int runningLoading = 0;
            int runningError = 0;
            int runningFinished = 0;
            for (String host : this.availability.keySet()) {
                Json state = Json.read((String)this.getHostState(host));
                this.LOG.info("State from host [" + host + "]:");
                this.LOG.info(state.toString());
                int queued = state.at(TransactionState.State.QUEUED.name()).asInteger();
                int loading = state.at(TransactionState.State.LOADING.name()).asInteger();
                int error = state.at(TransactionState.State.ERROR.name()).asInteger();
                int finished = state.at(TransactionState.State.FINISHED.name()).asInteger();
                int terminated = finished + error;
                int permitsToRelease = terminated - this.jobsTerminated.get(host);
                this.availability.get(host).release(permitsToRelease);
                this.jobsTerminated.put(host, terminated);
                runningQueued += queued;
                runningLoading += loading;
                runningError += error;
                runningFinished += finished;
            }
            this.enqueuedJobs.set(runningQueued);
            this.loadingJobs.set(runningLoading);
            this.errorJobs.set(runningError);
            this.finishedJobs.set(runningFinished);
            this.printLoaderState();
            try {
                Thread.sleep(this.pollingFrequency);
            }
            catch (InterruptedException e) {
                this.LOG.error("Exception", (Throwable)e);
            }
        }
        this.stopCheckingStatus();
    }

    private HttpURLConnection acquireNextHost() {
        String host = this.nextHost();
        while (!this.availability.get(host).tryAcquire()) {
            host = this.nextHost();
        }
        return this.getHost(host, POST);
    }

    private String nextHost() {
        ++this.currentHost;
        if (this.currentHost == this.hosts.size()) {
            this.currentHost = 0;
        }
        return this.hosts.get(this.currentHost);
    }

    private HttpURLConnection getHost(String host, String format) {
        HttpURLConnection urlConn = null;
        try {
            String url = String.format(format, host, this.graphName);
            urlConn = (HttpURLConnection)new URL(url).openConnection();
            urlConn.setDoOutput(true);
        }
        catch (IOException e) {
            this.LOG.error("IOException", (Throwable)e);
        }
        return urlConn;
    }

    private String executePost(HttpURLConnection connection, String body) {
        try {
            connection.setRequestMethod("POST");
            connection.addRequestProperty("Content-Type", "application/POST");
            connection.setRequestProperty("Content-Length", Integer.toString(body.length()));
            connection.getOutputStream().write(body.getBytes("UTF8"));
            return connection.getResponseMessage();
        }
        catch (HTTPException e) {
            this.LOG.error(ErrorMessage.ERROR_IN_DISTRIBUTED_TRANSACTION.getMessage(new Object[]{connection.getURL().toString(), e.getStatusCode(), this.getResponseMessage(connection), body}));
        }
        catch (IOException e) {
            this.LOG.error(ErrorMessage.ERROR_COMMUNICATING_TO_HOST.getMessage(new Object[]{connection.getURL().toString()}));
        }
        return null;
    }

    private String getResponseMessage(HttpURLConnection connection) {
        try {
            return connection.getResponseMessage();
        }
        catch (IOException e) {
            this.LOG.error("IOException", (Throwable)e);
            return null;
        }
    }

    private String getResponseBody(HttpURLConnection connection) {
        try {
            return IOUtils.toString((InputStream)connection.getInputStream());
        }
        catch (IOException e) {
            this.LOG.error("IOException", (Throwable)e);
            return null;
        }
    }

    private int getResponseCode(HttpURLConnection connection) {
        try {
            return connection.getResponseCode();
        }
        catch (IOException e) {
            this.LOG.error("IOException", (Throwable)e);
            return 0;
        }
    }
}

