/*
 * Decompiled with CFR 0.152.
 */
package forklift.consumer;

import forklift.Forklift;
import forklift.classloader.CoreClassLoaders;
import forklift.classloader.RunAsClassLoader;
import forklift.concurrent.Executors;
import forklift.consumer.Consumer;
import forklift.consumer.ConsumerService;
import forklift.consumer.ConsumerThread;
import forklift.deployment.Deployment;
import forklift.deployment.DeploymentEvents;
import forklift.source.SourceI;
import forklift.source.SourceUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerDeploymentEvents
implements DeploymentEvents {
    private static final Logger log = LoggerFactory.getLogger(ConsumerDeploymentEvents.class);
    private final Map<Deployment, List<ConsumerThread>> deployments = new HashMap<Deployment, List<ConsumerThread>>();
    private final Map<Deployment, List<ConsumerService>> serviceDeployments = new HashMap<Deployment, List<ConsumerService>>();
    private final Forklift forklift;
    private final ExecutorService executor;

    public ConsumerDeploymentEvents(Forklift forklift, ExecutorService executor) {
        this.forklift = forklift;
        this.executor = executor;
    }

    public ConsumerDeploymentEvents(Forklift forklift) {
        this(forklift, Executors.newCoreThreadPool("consumer-deployment-events"));
    }

    @Override
    public synchronized void onDeploy(Deployment deployment) {
        log.info("Deploying: " + deployment);
        ArrayList threads = new ArrayList();
        ArrayList services = new ArrayList();
        RunAsClassLoader.run(deployment.getClassLoader(), () -> deployment.getServices().forEach(s -> {
            try {
                log.info("Starting service {}", s);
                ConsumerService service = new ConsumerService((Class<?>)s);
                service.onDeploy();
                services.add(service);
            }
            catch (Exception e) {
                log.error("", (Throwable)e);
                return;
            }
        }));
        deployment.getConsumers().forEach(consumerClass -> {
            List<SourceI> sources = SourceUtil.getSourcesAsList(consumerClass);
            sources.forEach(source -> {
                log.info("Found source {} on {}", source, consumerClass);
                Consumer consumer = new Consumer((Class<?>)consumerClass, this.forklift, deployment.getClassLoader(), (SourceI)source, sources);
                consumer.setServices(services);
                ConsumerThread thread = new ConsumerThread(consumer);
                threads.add(thread);
                this.executor.submit(thread);
            });
        });
        this.deployments.put(deployment, threads);
        this.serviceDeployments.put(deployment, services);
    }

    @Override
    public synchronized void onUndeploy(Deployment deployment) {
        List<ConsumerService> services;
        log.info("Undeploying: " + deployment);
        List<ConsumerThread> threads = this.deployments.remove(deployment);
        if (threads != null && !threads.isEmpty()) {
            threads.forEach(t -> {
                t.shutdown();
                try {
                    t.join(60000L);
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
        }
        if ((services = this.serviceDeployments.remove(deployment)) != null && !services.isEmpty()) {
            services.forEach(s -> {
                try {
                    s.onUndeploy();
                }
                catch (Exception e) {
                    log.warn("", (Throwable)e);
                }
            });
        }
        CoreClassLoaders.getInstance().unregister(deployment.getClassLoader());
    }
}

