/*
 * Decompiled with CFR 0.152.
 */
package jaicore.search.algorithms.parallel.parallelexploration.distributed;

import ai.libs.jaicore.basic.sets.SetUtil;
import com.google.common.eventbus.EventBus;
import jaicore.search.algorithms.parallel.parallelexploration.distributed.DistributedComputationResult;
import jaicore.search.algorithms.parallel.parallelexploration.distributed.events.NodePassedToCoworkerEvent;
import jaicore.search.algorithms.parallel.parallelexploration.distributed.interfaces.DistributedSearchCommunicationLayer;
import jaicore.search.algorithms.parallel.parallelexploration.distributed.interfaces.DistributionSearchAdapter;
import jaicore.search.model.travesaltree.Node;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedSearchManager<T, A, V extends Comparable<V>> {
    private static final Logger logger = LoggerFactory.getLogger(DistributedSearchManager.class);
    private final DistributedSearchCommunicationLayer<T, A, V> communicationLayer;
    private final DistributionSearchAdapter<T, V> distAdapter;
    private final BlockingQueue<String> idleCoworkers = new LinkedBlockingQueue<String>();
    private final Set<String> pendingCoworkers = Collections.synchronizedSet(new HashSet());
    private final Map<String, Collection<Node<T, V>>> coworkerJobs = Collections.synchronizedMap(new HashMap());
    private final EventBus eventBus = new EventBus();
    private final List<Thread> auxThreads = new ArrayList<Thread>();

    public EventBus getEventBus() {
        return this.eventBus;
    }

    public DistributedSearchManager(DistributedSearchCommunicationLayer<T, A, V> communicationLayer, DistributionSearchAdapter<T, V> resultProcessor) {
        if (communicationLayer == null) {
            throw new IllegalArgumentException("communication layer must not be null!");
        }
        if (resultProcessor == null) {
            throw new IllegalArgumentException("result processor must not be null!");
        }
        this.communicationLayer = communicationLayer;
        this.distAdapter = resultProcessor;
        this.auxThreads.add(new CoworkerSynchronizerWorker());
        this.auxThreads.add(new JobWriterWorker());
        this.auxThreads.add(new ResultReaderWorker());
        for (Thread t : this.auxThreads) {
            t.start();
        }
    }

    private void detectNewCoworkers() {
        for (String newCoworker : this.communicationLayer.detectNewCoworkers()) {
            logger.info("Detected new coworker {}. Now trying to attach it ...", (Object)newCoworker);
            this.communicationLayer.attachCoworker(newCoworker);
            logger.info("Attached new coworker {}", (Object)newCoworker);
            this.idleCoworkers.add(newCoworker);
        }
    }

    private void detectUnattachedCoworkers() {
        for (String coworker : new ArrayList<String>(this.idleCoworkers)) {
            if (this.communicationLayer.isAttached(coworker)) continue;
            logger.info("Coworker {} was detached!", (Object)coworker);
            this.idleCoworkers.remove(coworker);
        }
        for (String coworker : new ArrayList<String>(this.coworkerJobs.keySet())) {
            if (this.communicationLayer.isAttached(coworker)) continue;
            Collection<Node<T, V>> job = this.coworkerJobs.get(coworker);
            this.coworkerJobs.remove(coworker);
            logger.warn("Busy coworker {} was detached. Resubmitting his job {}.", (Object)coworker, job);
        }
    }

    public int getNumberOfHelpers() {
        return this.idleCoworkers.size() + this.pendingCoworkers.size() + this.coworkerJobs.size();
    }

    public int getNumbetOfIdleCoworkers() {
        return this.idleCoworkers.size();
    }

    public int getNumbetOfPendingCoworkers() {
        return this.pendingCoworkers.size();
    }

    public boolean isBusy() {
        return !this.coworkerJobs.isEmpty();
    }

    public void shutdown() {
        for (String coworker : SetUtil.union((Collection[])new Collection[]{this.idleCoworkers, this.coworkerJobs.keySet()})) {
            this.coworkerJobs.remove(coworker);
            this.communicationLayer.detachCoworker(coworker);
        }
        for (Thread t : this.auxThreads) {
            logger.info("Shutting down {}.", (Object)t);
            t.interrupt();
            try {
                t.join();
                logger.info("Shutdown of {} complete.", (Object)t);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private class ResultReaderWorker
    extends Thread {
        private ResultReaderWorker() {
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    for (String busyCoworker : new ArrayList(DistributedSearchManager.this.coworkerJobs.keySet())) {
                        DistributedComputationResult result = DistributedSearchManager.this.communicationLayer.readResult(busyCoworker);
                        if (result == null) continue;
                        logger.info("Received result with {} open nodes and {} solution(s)", (Object)result.getOpen().size(), (Object)result.getSolutions().size());
                        DistributedSearchManager.this.distAdapter.processResult((Collection)DistributedSearchManager.this.coworkerJobs.get(busyCoworker), result);
                        DistributedSearchManager.this.coworkerJobs.remove(busyCoworker);
                        DistributedSearchManager.this.idleCoworkers.add(busyCoworker);
                    }
                    Thread.sleep(500L);
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private class JobWriterWorker
    extends Thread {
        private JobWriterWorker() {
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    Collection nodes = null;
                    while ((nodes = DistributedSearchManager.this.distAdapter.nextJob()) == null) {
                        Thread.sleep(1000L);
                    }
                    logger.info("Waiting for the next available coworker ...");
                    String coworker = (String)DistributedSearchManager.this.idleCoworkers.take();
                    DistributedSearchManager.this.pendingCoworkers.add(coworker);
                    logger.info("Assigning next job to {}", (Object)coworker);
                    DistributedSearchManager.this.pendingCoworkers.remove(coworker);
                    DistributedSearchManager.this.coworkerJobs.put(coworker, nodes);
                    DistributedSearchManager.this.idleCoworkers.remove(coworker);
                    DistributedSearchManager.this.communicationLayer.createNewJobForCoworker(coworker, nodes);
                    for (Node node : nodes) {
                        DistributedSearchManager.this.eventBus.post(new NodePassedToCoworkerEvent(node));
                    }
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private class CoworkerSynchronizerWorker
    extends Thread {
        private CoworkerSynchronizerWorker() {
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    logger.info("Scanning for new/removed coworkers ...");
                    DistributedSearchManager.this.detectNewCoworkers();
                    DistributedSearchManager.this.detectUnattachedCoworkers();
                    Thread.sleep(1000L);
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }
}

