/*
 * Decompiled with CFR 0.152.
 */
package jaicore.search.algorithms.parallel.parallelexploration.distributed;

import ai.libs.jaicore.basic.FileUtil;
import jaicore.search.algorithms.parallel.parallelexploration.distributed.DistributedComputationResult;
import jaicore.search.algorithms.parallel.parallelexploration.distributed.interfaces.DistributedSearchCommunicationLayer;
import jaicore.search.algorithms.parallel.parallelexploration.distributed.interfaces.SerializableGraphGenerator;
import jaicore.search.algorithms.parallel.parallelexploration.distributed.interfaces.SerializableNodeEvaluator;
import jaicore.search.algorithms.standard.bestfirst.nodeevaluation.INodeEvaluator;
import jaicore.search.model.travesaltree.Node;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FolderBasedDistributedSearchCommunicationLayer<T, A, V extends Comparable<V>>
implements DistributedSearchCommunicationLayer<T, A, V> {
    private static final Logger logger = LoggerFactory.getLogger(FolderBasedDistributedSearchCommunicationLayer.class);
    private final Set<String> knownCoworkers = new HashSet<String>();
    private final Path communicationFolder;
    private Map<String, Semaphore> registerTickets = new HashMap<String, Semaphore>();
    private Map<String, BlockingQueue<Collection<Node<T, V>>>> jobQueues = new HashMap<String, BlockingQueue<Collection<Node<T, V>>>>();
    private final Thread masterFolderObserver = new Thread(){

        @Override
        public void run() {
        }
    };
    private final Thread coworkerFolderObserver = new Thread(){

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    for (String coworker : FolderBasedDistributedSearchCommunicationLayer.this.knownCoworkers) {
                        FolderBasedDistributedSearchCommunicationLayer.this.readCoworkersJob(coworker);
                    }
                    for (String registeringCoworker : FolderBasedDistributedSearchCommunicationLayer.this.registerTickets.keySet()) {
                        if (!FolderBasedDistributedSearchCommunicationLayer.this.isAttached(registeringCoworker)) continue;
                        ((Semaphore)FolderBasedDistributedSearchCommunicationLayer.this.registerTickets.get(registeringCoworker)).release();
                        FolderBasedDistributedSearchCommunicationLayer.this.registerTickets.remove(registeringCoworker);
                    }
                    Thread.sleep(500L);
                }
            }
            catch (InterruptedException e) {
                logger.info("Shutting down folder listener.");
            }
        }
    };

    public FolderBasedDistributedSearchCommunicationLayer(Path communicationFolder, boolean isMaster) {
        this.communicationFolder = communicationFolder;
        if (isMaster) {
            this.masterFolderObserver.start();
        } else {
            this.coworkerFolderObserver.start();
        }
    }

    @Override
    public void init() {
        try (Stream<Path> paths = Files.walk(this.communicationFolder, new FileVisitOption[0]);){
            paths.forEach(filePath -> {
                try {
                    if (Files.isRegularFile(filePath, new LinkOption[0]) && !filePath.getFileName().toString().contains("register")) {
                        logger.info("Deleting {}", (Object)filePath.getFileName());
                        Files.delete(filePath);
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Collection<String> detectNewCoworkers() {
        ArrayList<String> newCoworkers = new ArrayList<String>();
        try (Stream<Path> paths = Files.walk(this.communicationFolder, new FileVisitOption[0]);){
            paths.forEach(filePath -> {
                if (Files.isRegularFile(filePath, new LinkOption[0]) && filePath.toFile().getName().startsWith("register-")) {
                    String coworkerId = filePath.toFile().getName().substring("register-".length());
                    logger.info("Recognized coworker {}", (Object)coworkerId);
                    try {
                        Files.delete(filePath);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    newCoworkers.add(coworkerId);
                }
            });
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        for (String newCoworker : newCoworkers) {
            this.knownCoworkers.add(newCoworker);
            if (this.jobQueues.containsKey(newCoworker)) continue;
            this.jobQueues.put(newCoworker, new LinkedBlockingQueue());
        }
        return newCoworkers;
    }

    @Override
    public void detachCoworker(String coworker) {
        File f = new File(this.communicationFolder.toAbsolutePath() + "/attach-" + coworker);
        try {
            Files.delete(f.toPath());
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void createNewJobForCoworker(String coworkerId, Collection<Node<T, V>> nodesToBeSolved) {
        File target = new File(this.communicationFolder.toFile().getAbsolutePath() + "/job-" + coworkerId);
        File tmp = new File(target.getAbsolutePath() + ".tmp");
        logger.info("Writing job for {}: {}", (Object)coworkerId, nodesToBeSolved);
        try (ObjectOutputStream bw = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(tmp)));){
            bw.writeObject(nodesToBeSolved);
            bw.close();
            Files.move(tmp.toPath(), target.toPath(), StandardCopyOption.REPLACE_EXISTING);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public DistributedComputationResult<T, V> readResult(String coworker) {
        File file = new File(this.communicationFolder.toFile().getAbsolutePath() + "/results-" + coworker);
        if (!file.exists()) {
            return null;
        }
        logger.info("Found results from coworker " + coworker);
        boolean success = false;
        int tries = 0;
        DistributedComputationResult result = null;
        while (!success && tries < 4) {
            try {
                ++tries;
                logger.info("Reading file " + file.getAbsolutePath() + " ...");
                ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(file)));
                result = (DistributedComputationResult)in.readObject();
                in.close();
                logger.info("done");
                Files.delete(file.toPath());
                success = true;
            }
            catch (IOException | ClassNotFoundException e) {
                try {
                    Thread.sleep(100L);
                }
                catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
        }
        if (!success) {
            logger.error("Failed to read and/or delete file " + file.getName());
        }
        return result;
    }

    @Override
    public void register(String coworker) throws InterruptedException {
        try {
            if (this.isAttached(coworker)) {
                this.detachCoworker(coworker);
            }
            Semaphore s = new Semaphore(0);
            this.registerTickets.put(coworker, s);
            File f = new File(this.communicationFolder.toAbsolutePath() + "/register-" + coworker);
            f.createNewFile();
            s.acquire(1);
            this.knownCoworkers.add(coworker);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void unregister(String coworker) {
        try {
            File f = new File(this.communicationFolder.toAbsolutePath() + "/register-" + coworker);
            if (f.exists()) {
                logger.info("Deleting {}", (Object)f.getAbsolutePath());
                Files.delete(f.toPath());
            }
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public Collection<Node<T, V>> nextJob(String coworker) throws InterruptedException {
        if (!this.jobQueues.containsKey(coworker)) {
            this.jobQueues.put(coworker, new LinkedBlockingQueue());
        }
        return this.jobQueues.get(coworker).take();
    }

    @Override
    public void reportResult(String coworker, DistributedComputationResult<T, V> result) {
        File target = new File(this.communicationFolder.toFile().getAbsolutePath() + "/results-" + coworker);
        File tmp = new File(target.getAbsolutePath() + ".tmp");
        try (ObjectOutputStream bw = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(tmp)));){
            bw.writeObject(result);
            bw.close();
            Files.move(tmp.toPath(), target.toPath(), StandardCopyOption.REPLACE_EXISTING);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void attachCoworker(String coworker) {
        try {
            File f = new File(this.communicationFolder.toAbsolutePath() + "/attach-" + coworker);
            f.createNewFile();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public boolean isAttached(String coworker) {
        File f = new File(this.communicationFolder.toAbsolutePath() + "/attach-" + coworker);
        return f.exists();
    }

    @Override
    public void setGraphGenerator(SerializableGraphGenerator<T, A> generator) throws Exception {
        FileUtil.serializeObject(generator, (String)(this.communicationFolder.toAbsolutePath() + "/graphgen.ser"));
    }

    @Override
    public void setNodeEvaluator(SerializableNodeEvaluator<T, V> evaluator) throws Exception {
        FileUtil.serializeObject(evaluator, (String)(this.communicationFolder.toAbsolutePath() + "/nodeeval.ser"));
    }

    @Override
    public SerializableGraphGenerator<T, A> getGraphGenerator() throws Exception {
        return (SerializableGraphGenerator)FileUtil.unserializeObject((String)(this.communicationFolder.toAbsolutePath() + "/graphgen.ser"));
    }

    @Override
    public INodeEvaluator<T, V> getNodeEvaluator() throws Exception {
        return (INodeEvaluator)FileUtil.unserializeObject((String)(this.communicationFolder.toAbsolutePath() + "/nodeeval.ser"));
    }

    @Override
    public void close() {
        this.masterFolderObserver.interrupt();
        this.coworkerFolderObserver.interrupt();
    }

    private void readCoworkersJob(String coworker) {
        File f = new File(this.communicationFolder.toAbsolutePath() + "/job-" + coworker);
        if (!f.exists()) {
            return;
        }
        int tries = 0;
        while (tries < 10) {
            try (ObjectInputStream in = new ObjectInputStream(new BufferedInputStream(new FileInputStream(f)));){
                Collection nodes = (Collection)in.readObject();
                in.close();
                Files.delete(f.toPath());
                this.jobQueues.get(coworker).add(nodes);
                return;
            }
            catch (IOException e) {
                try {
                    logger.error("Error reading file " + f.toString() + ", waiting 500ms and retrying.");
                    e.printStackTrace();
                    ++tries;
                    Thread.sleep(500L);
                }
                catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
            catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }
        logger.info("Giving up reading the results of " + coworker);
    }
}

