/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.pipes.internal;

import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.jcr.RepositoryException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestType;
import org.apache.sling.distribution.DistributionResponse;
import org.apache.sling.distribution.Distributor;
import org.apache.sling.distribution.SimpleDistributionRequest;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.apache.sling.pipes.BasePipe;
import org.apache.sling.pipes.ExecutionResult;
import org.apache.sling.pipes.OutputWriter;
import org.apache.sling.pipes.Pipe;
import org.apache.sling.pipes.PipeBindings;
import org.apache.sling.pipes.PipeBuilder;
import org.apache.sling.pipes.PipeExecutor;
import org.apache.sling.pipes.Plumber;
import org.apache.sling.pipes.PlumberMXBean;
import org.apache.sling.pipes.internal.ContainerPipe;
import org.apache.sling.pipes.internal.JsonWriter;
import org.apache.sling.pipes.internal.ManifoldPipe;
import org.apache.sling.pipes.internal.PipeBuilderImpl;
import org.apache.sling.pipes.internal.PipeMonitor;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(service={Plumber.class, JobConsumer.class}, property={"job.topics=org/apache/sling/pipes/topic"})
@Designate(ocd=Configuration.class)
public class PlumberImpl
implements Plumber,
JobConsumer,
PlumberMXBean {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    public static final int DEFAULT_BUFFER_SIZE = 1000;
    protected static final String PN_MONITORED = "monitored";
    protected static final String MONITORED_PIPES_QUERY = String.format("//element(*,nt:base)[@sling:resourceType='%s' and @%s]", "slingPipes/container", "monitored");
    protected static final String MBEAN_NAME_FORMAT = "org.apache.sling.pipes:name=%s";
    Map<String, Class<? extends BasePipe>> registry;
    public static final String SLING_EVENT_TOPIC = "org/apache/sling/pipes/topic";
    private Configuration configuration;
    private Map serviceUser;
    private List<String> allowedUsers;
    private Map<String, PipeMonitor> monitoredPipes;
    @Reference(policy=ReferencePolicy.DYNAMIC, cardinality=ReferenceCardinality.OPTIONAL)
    protected volatile Distributor distributor = null;
    @Reference
    JobManager jobManager;
    @Reference
    ResourceResolverFactory factory;

    @Activate
    public void activate(Configuration configuration) {
        this.configuration = configuration;
        this.serviceUser = configuration.serviceUser() != null ? Collections.singletonMap("sling.service.subservice", configuration.serviceUser()) : null;
        this.allowedUsers = Arrays.asList(configuration.authorizedUsers());
        this.registry = new HashMap<String, Class<? extends BasePipe>>();
        this.registerPipes();
        this.toggleJmxRegistration(this, PlumberMXBean.class.getName(), true);
        this.refreshMonitoredPipes();
    }

    protected void registerPipes() {
        this.registerPipe("slingPipes/container", ContainerPipe.class);
        this.registerPipe("slingPipes/manifold", ManifoldPipe.class);
        for (Method method : PipeBuilder.class.getDeclaredMethods()) {
            PipeExecutor executor = method.getAnnotation(PipeExecutor.class);
            if (executor == null) continue;
            this.registerPipe(executor.resourceType(), executor.pipeClass());
        }
    }

    @Override
    public Map getServiceUser() {
        return this.serviceUser;
    }

    @Deactivate
    public void deactivate() {
        this.toggleJmxRegistration(null, PlumberMXBean.class.getName(), false);
        if (this.monitoredPipes != null) {
            for (String path : this.monitoredPipes.keySet()) {
                this.toggleJmxRegistration(null, path, false);
            }
        }
    }

    private void toggleJmxRegistration(Object instance, String name, boolean register) {
        try {
            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
            ObjectName oName = ObjectName.getInstance(String.format(MBEAN_NAME_FORMAT, name));
            if (register && !server.isRegistered(oName)) {
                server.registerMBean(instance, oName);
            }
            if (!register && server.isRegistered(oName)) {
                server.unregisterMBean(oName);
            }
        }
        catch (Exception e) {
            this.log.error("unable to toggle mbean {} registration", (Object)name, (Object)e);
        }
    }

    @Override
    public Pipe getPipe(Resource resource) {
        return this.getPipe(resource, null);
    }

    @Override
    public Pipe getPipe(Resource resource, PipeBindings upperBindings) {
        if (resource == null || !this.registry.containsKey(resource.getResourceType())) {
            this.log.error("Pipe configuration resource is either null, or its type is not registered");
        } else {
            try {
                Class<? extends BasePipe> pipeClass = this.registry.get(resource.getResourceType());
                return pipeClass.getDeclaredConstructor(Plumber.class, Resource.class, PipeBindings.class).newInstance(this, resource, upperBindings);
            }
            catch (Exception e) {
                this.log.error("Unable to properly instantiate the pipe configured in {}", (Object)resource.getPath(), (Object)e);
            }
        }
        return null;
    }

    @Override
    public Job executeAsync(ResourceResolver resolver, String path, Map bindings) {
        if (this.allowedUsers.contains(resolver.getUserID())) {
            return this.executeAsync(path, bindings);
        }
        return null;
    }

    @Override
    public Job executeAsync(String path, Map bindings) {
        if (StringUtils.isBlank((CharSequence)((String)this.serviceUser.get("sling.service.subservice")))) {
            this.log.error("please configure plumber service user");
        }
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("path", path);
        props.put("additionalBindings", bindings);
        return this.jobManager.addJob(SLING_EVENT_TOPIC, props);
    }

    @Override
    public ExecutionResult execute(ResourceResolver resolver, String path, Map additionalBindings, OutputWriter writer, boolean save) throws Exception {
        Resource pipeResource = resolver.getResource(path);
        Pipe pipe = this.getPipe(pipeResource);
        if (pipe == null) {
            throw new Exception("unable to build pipe based on configuration at " + path);
        }
        return this.execute(resolver, pipe, additionalBindings, writer, save);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ExecutionResult execute(ResourceResolver resolver, Pipe pipe, Map additionalBindings, OutputWriter writer, boolean save) throws Exception {
        ExecutionResult executionResult;
        boolean success = false;
        PipeMonitor monitor = null;
        long start = System.currentTimeMillis();
        try {
            if (additionalBindings != null) {
                pipe.getBindings().addBindings(additionalBindings);
            }
            if (additionalBindings != null && additionalBindings.containsKey("readOnly") && ((Boolean)additionalBindings.get("readOnly")).booleanValue() && pipe.modifiesContent() && !pipe.isDryRun()) {
                throw new Exception("This pipe modifies content, you should use a POST request");
            }
            this.log.debug("[{}] before execution hook is called", (Object)pipe);
            pipe.before();
            this.log.info("[{}] execution starts, save ({})", (Object)pipe, (Object)save);
            Resource confResource = pipe.getResource();
            writer.setPipe(pipe);
            if (this.isRunning(confResource)) {
                throw new RuntimeException("Pipe is already running");
            }
            monitor = this.monitoredPipes.get(confResource.getPath());
            this.writeStatus(pipe, "started");
            resolver.commit();
            if (monitor != null) {
                monitor.starts();
            }
            ExecutionResult result = new ExecutionResult(writer);
            Iterator<Resource> it = pipe.getOutput();
            while (it.hasNext()) {
                Resource resource = it.next();
                this.checkError(pipe, result);
                if (resource == null) continue;
                this.log.debug("[{}] retrieved {}", (Object)pipe.getName(), (Object)resource.getPath());
                result.addResultItem(resource);
                this.persist(resolver, pipe, result, resource);
            }
            this.checkError(pipe, result);
            if (save && pipe.modifiesContent()) {
                this.persist(resolver, pipe, result, null);
            }
            writer.ends();
            if (monitor != null) {
                monitor.ends();
                monitor.setLastResult(result);
            }
            success = true;
            executionResult = result;
        }
        catch (Throwable throwable) {
            this.writeStatus(pipe, "finished");
            resolver.commit();
            long length = System.currentTimeMillis() - start;
            String time = length < 1000L ? length + "ms" : length / 1000L + "s";
            this.log.info("[{}] done executing in {}.", (Object)pipe.getName(), (Object)time);
            this.log.debug("[{}] after execution hook is called", (Object)pipe);
            pipe.after();
            if (!success && monitor != null) {
                monitor.failed();
            }
            throw throwable;
        }
        this.writeStatus(pipe, "finished");
        resolver.commit();
        long length = System.currentTimeMillis() - start;
        String time = length < 1000L ? length + "ms" : length / 1000L + "s";
        this.log.info("[{}] done executing in {}.", (Object)pipe.getName(), (Object)time);
        this.log.debug("[{}] after execution hook is called", (Object)pipe);
        pipe.after();
        if (!success && monitor != null) {
            monitor.failed();
        }
        return executionResult;
    }

    protected void checkError(Pipe pipe, ExecutionResult result) {
        String error = pipe.getBindings().popCurrentError();
        if (StringUtils.isNotBlank((CharSequence)error)) {
            result.addError(error);
        }
    }

    protected void persist(ResourceResolver resolver, Pipe pipe, ExecutionResult result, Resource currentResource) throws Exception {
        if (pipe.modifiesContent() && resolver.hasChanges() && !pipe.isDryRun() && (currentResource == null || result.size() % (long)this.configuration.bufferSize() == 0L)) {
            this.log.info("[{}] saving changes...", (Object)pipe.getName());
            this.writeStatus(pipe, currentResource == null ? "finished" : currentResource.getPath());
            resolver.commit();
            if (currentResource == null && this.distributor != null && StringUtils.isNotBlank((CharSequence)pipe.getDistributionAgent())) {
                this.log.info("a distribution agent is configured, will try to distribute the changes");
                SimpleDistributionRequest request = new SimpleDistributionRequest(DistributionRequestType.ADD, true, result.getCurrentPathSet().toArray(new String[result.getCurrentPathSet().size()]));
                DistributionResponse response = this.distributor.distribute(pipe.getDistributionAgent(), resolver, (DistributionRequest)request);
                this.log.info("distribution response : {}", (Object)response);
            }
            if (result.size() > (long)this.configuration.bufferSize()) {
                result.emptyCurrentSet();
            }
            if (this.configuration.sleep() > 0L) {
                this.log.debug("sleeping for {}ms", (Object)this.configuration.sleep());
                Thread.sleep(this.configuration.sleep());
            }
        }
    }

    @Override
    public void registerPipe(String type, Class<? extends BasePipe> pipeClass) {
        this.registry.put(type, pipeClass);
    }

    @Override
    public boolean isTypeRegistered(String type) {
        return this.registry.containsKey(type);
    }

    protected void writeStatus(Pipe pipe, String status) throws RepositoryException {
        if (StringUtils.isNotBlank((CharSequence)status)) {
            ModifiableValueMap vm = (ModifiableValueMap)pipe.getResource().adaptTo(ModifiableValueMap.class);
            vm.put((Object)"status", (Object)status);
            GregorianCalendar cal = new GregorianCalendar();
            cal.setTime(new Date());
            vm.put((Object)"statusModified", (Object)cal);
        }
    }

    @Override
    public String getStatus(Resource pipeResource) {
        String status;
        Resource statusResource = pipeResource.getChild("status");
        if (statusResource != null && StringUtils.isNotBlank((CharSequence)(status = (String)statusResource.adaptTo(String.class)))) {
            return status;
        }
        return "finished";
    }

    @Override
    public PipeBuilder newPipe(ResourceResolver resolver) {
        PipeBuilderImpl builder = new PipeBuilderImpl(resolver, this);
        return builder;
    }

    @Override
    public boolean isRunning(Resource pipeResource) {
        return !this.getStatus(pipeResource).equals("finished");
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public JobConsumer.JobResult process(Job job) {
        try (ResourceResolver resolver = this.factory.getServiceResourceResolver(this.serviceUser);){
            String path = (String)job.getProperty("path");
            Map bindings = (Map)job.getProperty("additionalBindings");
            JsonWriter writer = new JsonWriter();
            ((OutputWriter)writer).starts();
            this.execute(resolver, path, bindings, (OutputWriter)writer, true);
            JobConsumer.JobResult jobResult = JobConsumer.JobResult.OK;
            return jobResult;
        }
        catch (LoginException e) {
            this.log.error("unable to retrieve resolver for executing scheduled pipe", (Throwable)e);
            return JobConsumer.JobResult.FAILED;
        }
        catch (Exception e) {
            this.log.error("failed to execute the pipe", (Throwable)e);
        }
        return JobConsumer.JobResult.FAILED;
    }

    @Override
    public void refreshMonitoredPipes() {
        HashMap<String, PipeMonitor> map = new HashMap<String, PipeMonitor>();
        this.getMonitoredPipes().stream().forEach(bean -> map.put(bean.getPath(), (PipeMonitor)bean));
        if (this.monitoredPipes != null) {
            Collection shouldBeRemoved = CollectionUtils.subtract(this.monitoredPipes.keySet(), map.keySet());
            for (String path : shouldBeRemoved) {
                this.toggleJmxRegistration(null, path, false);
            }
        }
        this.monitoredPipes = map;
        for (String path : this.monitoredPipes.keySet()) {
            this.toggleJmxRegistration(this.monitoredPipes.get(path), path, true);
        }
    }

    protected Collection<PipeMonitor> getMonitoredPipes() {
        ArrayList<PipeMonitor> beans = new ArrayList<PipeMonitor>();
        if (this.serviceUser != null) {
            try (ResourceResolver resolver = this.factory.getServiceResourceResolver(this.serviceUser);){
                Iterator resourceIterator = resolver.findResources(MONITORED_PIPES_QUERY, "xpath");
                while (resourceIterator.hasNext()) {
                    beans.add(new PipeMonitor(this, this.getPipe((Resource)resourceIterator.next())));
                }
            }
            catch (LoginException e) {
                this.log.error("unable to retrieve resolver for collecting exposed pipes", (Throwable)e);
            }
            catch (Exception e) {
                this.log.error("failed to execute the pipe", (Throwable)e);
            }
        } else {
            this.log.warn("no service user configured, pipes can't be monitored");
        }
        return beans;
    }

    @ObjectClassDefinition(name="Apache Sling Pipes : Plumber configuration")
    public static @interface Configuration {
        @AttributeDefinition(description="Number of iterations after which plumber should saves a pipe execution")
        public int bufferSize() default 1000;

        @AttributeDefinition(description="Number of milliseconds of sleep after each persistence")
        public long sleep() default 0L;

        @AttributeDefinition(description="Name of service user, with appropriate rights, that will be used for async execution")
        public String serviceUser();

        @AttributeDefinition(description="Users allowed to register async pipes")
        public String[] authorizedUsers() default {"admin"};
    }
}

