/*
 * Decompiled with CFR 0.152.
 */
package org.jesterj.ingest.model.impl;

import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.util.Supplier;
import org.jesterj.ingest.model.Active;
import org.jesterj.ingest.model.ConfiguredBuildable;
import org.jesterj.ingest.model.DocStatusChange;
import org.jesterj.ingest.model.Document;
import org.jesterj.ingest.model.DocumentProcessor;
import org.jesterj.ingest.model.NextSteps;
import org.jesterj.ingest.model.Plan;
import org.jesterj.ingest.model.Router;
import org.jesterj.ingest.model.Status;
import org.jesterj.ingest.model.Step;
import org.jesterj.ingest.model.impl.DocumentImpl;
import org.jesterj.ingest.model.impl.NamedBuilder;
import org.jesterj.ingest.processors.NoOpProcessor;
import org.jesterj.ingest.routers.RouterBase;

public class StepImpl
implements Step {
    private static final Logger log = LogManager.getLogger();
    public static final String VIA = "<-via->";
    private static final Map<String, Pattern> stepNameInDestinationPatterns = new ConcurrentHashMap<String, Pattern>();
    private final DocumentConsumer documentConsumer = new DocumentConsumer();
    private LinkedBlockingQueue<Document> queue;
    private int batchSize;
    private final LinkedHashMap<String, Step> nextSteps = new LinkedHashMap();
    private volatile boolean active;
    private String stepName;
    private Router router;
    private volatile DocumentProcessor processor = new NoOpProcessor();
    private volatile Thread worker;
    private final Object WORKER_LOCK = new Object();
    private Plan plan;
    private final List<Runnable> deferred = new ArrayList<Runnable>();
    private int shutdownTimeout = 100;
    private final List<Step> priorSteps = new ArrayList<Step>();
    private volatile Set<Step> outputSteps;
    private final Object OUTPUT_STEP_LIST_LOCK = new Object();
    private volatile Set<String> outputDestinationNames;
    private final Object OUTPUT_DEST_NAMES_LOCK = new Object();

    StepImpl() {
    }

    public static Pattern getPatternForStep(String name) {
        return stepNameInDestinationPatterns.computeIfAbsent(name, k -> Pattern.compile("^" + name + "($|<-via->.*$)"));
    }

    @Override
    public Spliterator<Document> spliterator() {
        return this.queue.spliterator();
    }

    @Override
    public boolean isEmpty() {
        return this.queue.isEmpty();
    }

    @Override
    public Document element() {
        return (Document)this.queue.element();
    }

    @Override
    public Document poll(long timeout, TimeUnit unit) throws InterruptedException {
        return this.queue.poll(timeout, unit);
    }

    @Override
    public Stream<Document> parallelStream() {
        return this.queue.parallelStream();
    }

    @Override
    public Document take() throws InterruptedException {
        return this.queue.take();
    }

    @Override
    public void clear() {
        this.queue.clear();
    }

    @Override
    public Iterator<Document> iterator() {
        return this.queue.iterator();
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        throw new UnsupportedOperationException("bulk operations not supported for steps");
    }

    @Override
    public <T> T[] toArray(T[] a) {
        return this.queue.toArray(a);
    }

    @Override
    public boolean addAll(Collection<? extends Document> c) {
        throw new UnsupportedOperationException("bulk operations supported for steps");
    }

    @Override
    public int remainingCapacity() {
        return this.queue.remainingCapacity();
    }

    @Override
    public Stream<Document> stream() {
        return this.queue.stream();
    }

    @Override
    public boolean offer(Document document, long timeout, TimeUnit unit) throws InterruptedException {
        Supplier[] supplierArray = new Supplier[3];
        supplierArray[0] = document::getId;
        supplierArray[1] = this::getName;
        supplierArray[2] = () -> Arrays.asList(new RuntimeException().getStackTrace()).toString().replaceAll(",", "\n");
        log.trace("{} offered (timeout) to {} at {}", supplierArray);
        return this.queue.offer(document, timeout, unit);
    }

    @Override
    public boolean offer(Document document) {
        if (this.active) {
            Supplier[] supplierArray = new Supplier[3];
            supplierArray[0] = document::getId;
            supplierArray[1] = this::getName;
            supplierArray[2] = () -> Arrays.asList(new RuntimeException().getStackTrace()).toString().replaceAll(",", "\n");
            log.trace("{} offered to {} at {}", supplierArray);
            return this.queue.offer(document);
        }
        return false;
    }

    @Override
    public Document poll() {
        return this.queue.poll();
    }

    @Override
    public int drainTo(Collection<? super Document> c, int maxElements) {
        return this.queue.drainTo(c, maxElements);
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        throw new UnsupportedOperationException("bulk operations supported for steps");
    }

    @Override
    public void put(Document document) throws InterruptedException {
        Supplier[] supplierArray = new Supplier[3];
        supplierArray[0] = document::getId;
        supplierArray[1] = this::getName;
        supplierArray[2] = () -> Arrays.asList(new RuntimeException().getStackTrace()).toString().replaceAll(",", "\n");
        log.trace("{} put to {} at {}", supplierArray);
        if (this.active) {
            this.queue.put(document);
        }
    }

    @Override
    public Document peek() {
        return this.queue.peek();
    }

    @Override
    public int size() {
        return this.queue.size();
    }

    @Override
    public boolean contains(Object o) {
        return this.queue.contains(o);
    }

    @Override
    public boolean remove(Object o) {
        return this.queue.remove(o);
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return this.queue.removeAll(c);
    }

    @Override
    public boolean add(Document document) {
        Supplier[] supplierArray = new Supplier[3];
        supplierArray[0] = document::getId;
        supplierArray[1] = this::getName;
        supplierArray[2] = () -> Arrays.asList(new RuntimeException().getStackTrace()).toString().replaceAll(",", "\n");
        log.trace("{} added to {} at {}", supplierArray);
        return this.queue.add(document);
    }

    @Override
    public void forEach(Consumer<? super Document> action) {
        this.queue.forEach((Consumer<Document>)action);
    }

    @Override
    public Document remove() {
        return (Document)this.queue.remove();
    }

    @Override
    public Object[] toArray() {
        return this.queue.toArray();
    }

    @Override
    public boolean removeIf(Predicate<? super Document> filter) {
        return this.queue.removeIf(filter);
    }

    @Override
    public int drainTo(Collection<? super Document> c) {
        return this.queue.drainTo(c);
    }

    @Override
    public int getBatchSize() {
        return this.batchSize;
    }

    @Override
    public NextSteps getNextSteps(Document doc) {
        if (this.nextSteps.size() == 0) {
            log.trace("No next steps for {} from {}", (Object)doc.getId(), (Object)this.getName());
            return null;
        }
        if (this.nextSteps.size() == 1) {
            Supplier[] supplierArray = new Supplier[3];
            supplierArray[0] = () -> this.getNextSteps().keySet();
            supplierArray[1] = doc::getId;
            supplierArray[2] = this::getName;
            log.trace("Single next step {} for {} from {}", supplierArray);
            return new NextSteps(doc, this.nextSteps.values().iterator().next());
        }
        Supplier[] supplierArray = new Supplier[4];
        supplierArray[0] = () -> this.getNextSteps().keySet();
        supplierArray[1] = doc::getId;
        supplierArray[2] = doc::getOrigination;
        supplierArray[3] = this::getName;
        log.trace("Routing among next steps {} for {}({}) from {} ", supplierArray);
        return this.router.route(doc);
    }

    @Override
    public Plan getPlan() {
        return this.plan;
    }

    void setPlan(Plan plan) {
        this.plan = plan;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void activate() {
        log.info("Starting {} ", (Object)this.getName());
        if (this.worker == null || !this.worker.isAlive()) {
            Object object = this.WORKER_LOCK;
            synchronized (object) {
                log.info("Starting new thread for {} ", (Object)this.getName());
                this.worker = new Thread(this);
                this.worker.setName("jj-worker-" + this.stepName + "-" + System.currentTimeMillis());
                this.worker.setDaemon(true);
                this.active = true;
                this.worker.start();
                log.info("started {} ({})", (Object)this.worker.getName(), (Object)this.worker.getId());
            }
        }
        log.info("Started step {} ", (Object)this.getName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void deactivate() {
        log.info("Deactivating step {}", (Object)this.getName());
        this.active = false;
        this.queue.clear();
        if (this.worker != null) {
            Thread workerShuttingDown;
            Object object = this.WORKER_LOCK;
            synchronized (object) {
                workerShuttingDown = this.worker;
                this.worker = null;
            }
            if (workerShuttingDown != null) {
                try {
                    workerShuttingDown.join(this.shutdownTimeout);
                    if (workerShuttingDown.isAlive()) {
                        log.warn("{} was slow shutting down, interrupting..", (Object)this.getName());
                        workerShuttingDown.interrupt();
                    }
                }
                catch (InterruptedException e) {
                    log.error("Thread on which shutdown was was interrupted while shutting down {}", (Object)this.getName());
                }
            }
        }
    }

    @Override
    public boolean isActive() {
        return this.active;
    }

    @Override
    public void sendToNext(Document doc) {
        this.pushToNextIfNotDropped(doc);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<String> getOutputDestinationNames() {
        if (this.outputDestinationNames == null) {
            Object object = this.OUTPUT_DEST_NAMES_LOCK;
            synchronized (object) {
                if (this.outputDestinationNames == null) {
                    Set<Step> downstreamOutputSteps = this.getDownstreamOutputSteps();
                    HashSet<String> destinations = new HashSet<String>();
                    for (Step downstreamOutputStep : downstreamOutputSteps) {
                        String destinationName = downstreamOutputStep.getName();
                        this.appendUpstreamDuplicatingSplitDestinationNamesAndAdd(destinationName, downstreamOutputStep, this, destinations, new HashSet<String>());
                    }
                    this.outputDestinationNames = destinations;
                }
            }
        }
        return this.outputDestinationNames;
    }

    private void appendUpstreamDuplicatingSplitDestinationNamesAndAdd(String destinationName, Step currentStep, Step referenceStep, Set<String> destinations, Set<String> pathElements) {
        pathElements.add(currentStep.getName());
        List<Step> priors = currentStep.getPriorSteps();
        if (priors == null || priors.size() == 0) {
            if (pathElements.contains(referenceStep.getName())) {
                destinations.add(destinationName);
            }
            return;
        }
        for (Step prior : priors) {
            Router router = prior.getRouter();
            if (router != null && router.isDeterministic() && router.getNumberOfOutputCopies() > 1) {
                this.appendUpstreamDuplicatingSplitDestinationNamesAndAdd(destinationName + VIA + currentStep.getName(), prior, referenceStep, destinations, pathElements);
                continue;
            }
            this.appendUpstreamDuplicatingSplitDestinationNamesAndAdd(destinationName, prior, referenceStep, destinations, pathElements);
        }
        pathElements.remove(currentStep.getName());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public Set<Step> getDownstreamOutputSteps() {
        if (this.outputSteps != null) return this.outputSteps;
        Object object = this.OUTPUT_STEP_LIST_LOCK;
        synchronized (object) {
            if (this.outputSteps != null) return this.outputSteps;
            if (this.nextSteps.isEmpty()) {
                if (!this.processor.isPotent() && !this.processor.isIdempotent()) throw new RuntimeException("Detected terminal step that does not produce an output!. Final step on any path must be potent or idempotent");
                this.outputSteps = new HashSet<Step>();
                this.outputSteps.add(this);
            } else {
                ArrayList<Set<Step>> subEffects = new ArrayList<Set<Step>>(this.nextSteps.size());
                HashSet<Step> values = new HashSet<Step>(this.nextSteps.values());
                for (Step value : values) {
                    subEffects.add(value.getDownstreamOutputSteps());
                }
                ArrayList<StepImpl> tmp = new ArrayList<StepImpl>();
                for (Set set : subEffects) {
                    tmp.addAll(set);
                }
                if (this.isOutputStep()) {
                    tmp.add(this);
                }
                this.outputSteps = new HashSet<Step>();
                this.outputSteps.addAll(tmp);
            }
            return this.outputSteps;
        }
    }

    @Override
    public boolean isOutputStep() {
        return this.processor.isPotent() || this.processor.isIdempotent() && this.nextSteps.isEmpty();
    }

    @Override
    public LinkedHashMap<String, Step> getNextSteps() {
        return this.nextSteps;
    }

    @Override
    public LinkedHashMap<String, Step> getEligibleNextSteps(Document d) {
        return this.nextSteps.entrySet().stream().filter(e -> ((Step)e.getValue()).getOutputDestinationNames().stream().anyMatch(dest -> Arrays.asList(d.getIncompleteOutputDestinations()).contains(dest))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (s1, s2) -> s1, LinkedHashMap::new));
    }

    @Override
    public boolean isActivePriorSteps() {
        return this.getPriorSteps().stream().anyMatch(Active::isActive);
    }

    @Override
    public List<Step> getPriorSteps() {
        return this.priorSteps;
    }

    void pushToNextIfNotDropped(Document document) {
        boolean allDrop = true;
        DocStatusChange statusChange = document.getStatusChange();
        String id = document.getId();
        if (statusChange == null) {
            log.trace("No Status change for {}", (Object)id);
            allDrop = false;
        } else {
            log.trace("Status change for {} = {}", (Object)id, (Object)statusChange);
            if (statusChange.getStatus() == Status.DROPPED) {
                log.trace("Status was DROPPED for {}", (Object)id);
                Collection<String> droppedDestinations = statusChange.getSpecificDestinations();
                if (droppedDestinations != null && droppedDestinations.size() != 0) {
                    log.trace("Had dropped destinations: {} for {}", droppedDestinations, (Object)id);
                    for (String dest : document.getIncompleteOutputDestinations()) {
                        Supplier[] supplierArray = new Supplier[3];
                        supplierArray[0] = () -> document.getStatus(dest);
                        supplierArray[1] = statusChange::getStatus;
                        supplierArray[2] = () -> id;
                        log.trace("SAW STATUS:{} and change:{} for {}", supplierArray);
                        allDrop &= droppedDestinations.contains(dest);
                    }
                }
            } else {
                allDrop = false;
            }
        }
        if (!allDrop) {
            log.trace("Pushing on:{}", (Object)id);
            this.pushToNextIfOk(document, false);
        } else {
            log.trace("Dropping:{}", (Object)id);
            document.reportDocStatus();
        }
    }

    void pushToNextIfOk(Document document, boolean processorSkipped) {
        try {
            String id = document.getId();
            log.trace("starting push to next if ok {} for {}", (Object)this.getName(), (Object)id);
            NextSteps next = this.getNextSteps(document);
            log.trace("Found {} next steps", next == null ? "(null)" : Integer.valueOf(next.size()));
            if (document.getIncompleteOutputDestinations().length < 1 && this.getNextSteps().isEmpty()) {
                throw new RuntimeException("Critical failure! No down stream step on Document after routing. This is likely to be a bug in JesterJ, please report an issue in the project issue tracker. Current Step:" + this.getName() + " Document:" + document + " Router class:" + (Serializable)(this.getRouter() == null ? "(no router)" : this.getRouter().getClass()));
            }
            if (next == null) {
                if (!this.getProcessor().isPotent() && !this.getProcessor().isIdempotent()) {
                    if (this.nextSteps.isEmpty()) {
                        throw new RuntimeException("Your plan is misconfigured. you have dangling steps that have no external outputs. The final step in each branch must be either POTENT or IDEMPOTENT. Note that a step thatincrements a custom metric that can be externally observed somehow should be marked POTENT.");
                    }
                    throw new RuntimeException("Your router failed to select a destination. This is a bug in the routerimplementation. If it is a standard JesterJ router, please report an issue in the project issue tracker. Remaining incomplete steps:" + document.listIncompleteOutputSteps() + " Current Step:" + this.getName() + " Document:" + document);
                }
                if (document.getIncompleteOutputDestinations().length > 1 && this.nextSteps.isEmpty()) {
                    throw new RuntimeException("Critical failure! JesterJ calculated more than one down stream step on a document at a final step. This is likely to be a bug in JesterJ, please report an issue in the project issue tracker. Remaining incomplete steps:" + document.listIncompleteOutputSteps() + " Current Step:" + this.getName() + " Document:" + document);
                }
                String incompleteOutputStep = document.getIncompleteOutputDestinations()[0];
                if (!this.getProcessor().isPotent() && !this.getProcessor().isIdempotent()) {
                    throw new RuntimeException("Somehow we have a destination output step, at the last step, but the last stepis not POTENT or IDEMPOTENT, or the name doesn't match the current step! Our Name:" + this.getName() + " Expected destination:" + incompleteOutputStep);
                }
                if (!this.getOutputDestinationNames().contains(incompleteOutputStep)) {
                    throw new RuntimeException("We reached a valid final step, but it does not have the expected step name, This is likely to be a bug in JesterJ please report an issue in the project issue tracker. Named valid:" + this.getOutputDestinationNames() + " Name expected:" + incompleteOutputStep);
                }
                Status currStat = document.getStatus(document.getIncompleteOutputDestinations()[0]);
                if (!Status.BATCHED.equals(currStat) && !Status.INDEXING.equals(currStat)) {
                    this.markIndexed((DocumentImpl)document);
                    document.reportDocStatus();
                }
            } else {
                if (!processorSkipped && (this.getProcessor().isPotent() || this.getProcessor().isIdempotent())) {
                    this.markIndexed((DocumentImpl)document);
                }
                document.reportDocStatus();
                this.pushToNext(next);
            }
            log.trace("completing push to next if ok {} for {}", (Object)this.getName(), (Object)id);
        }
        catch (Exception e) {
            log.error("Exception caught, exiting from step {}", (Object)this.getName());
            throw e;
        }
    }

    @Override
    public Router getRouter() {
        return this.router;
    }

    void markIndexed(DocumentImpl document) {
        log.trace("{} finished processing {}", (Object)this.getName(), (Object)document.getId());
        document.setStatus(Status.INDEXED, "Last available step {} completed OK,", new Serializable[]{this.getName()});
    }

    void pushToNext(NextSteps next) {
        List<Map.Entry<Step, NextSteps.StepStatusHolder>> remaining = next.remaining();
        if (remaining.size() == 1) {
            this.pushToStep(remaining.get(0), true);
        } else {
            while (remaining.size() > 0) {
                for (Map.Entry<Step, NextSteps.StepStatusHolder> stepStatusEntry : next.remaining()) {
                    Step destinationStep = stepStatusEntry.getKey();
                    if (stepStatusEntry.getValue().getException() != null) {
                        next.update(destinationStep, NextSteps.StepStatus.FAIL);
                        this.reportException(stepStatusEntry, "Failed to clone document when sending to multiple steps", new Object[0]);
                        continue;
                    }
                    NextSteps.StepStatus stepStatus = this.pushToStep(stepStatusEntry, false);
                    next.update(destinationStep, stepStatus);
                }
                remaining = next.remaining();
            }
        }
    }

    private NextSteps.StepStatus pushToStep(Map.Entry<Step, NextSteps.StepStatusHolder> entry, boolean block) {
        Step step = entry.getKey();
        Document document = entry.getValue().getDoc();
        String name = step == null ? "null step name" : step.getName();
        log.trace("Pushing to {} DocId:{} Statuses:{}", (Object)name, (Object)document.getId(), (Object)document.dumpStatus());
        if (step != null) {
            boolean offer;
            log.trace("starting put ( {} into {} )", (Object)this.getName(), (Object)name);
            if (block) {
                try {
                    step.put(document);
                    log.trace("completed put ( {} into {} )", (Object)this.getName(), (Object)name);
                }
                catch (InterruptedException e) {
                    return NextSteps.StepStatus.FAIL;
                }
                catch (Exception e) {
                    String message = "Exception while offering to " + name + ". Exception message:{}";
                    this.reportException(entry, message, e);
                    return NextSteps.StepStatus.FAIL;
                }
                offer = true;
            } else {
                offer = step.offer(document);
            }
            return offer ? NextSteps.StepStatus.SENT : NextSteps.StepStatus.RETRY;
        }
        throw new RuntimeException("Attempted to route to a null step");
    }

    /*
     * Exception decompiling
     */
    @Override
    public void run() {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    void addStepContext() {
        ThreadContext.put((String)"JJ_PLAN_NAME", (String)this.getPlan().getName());
        ThreadContext.put((String)"JJ_PLAN_VERSION", (String)String.valueOf(this.getPlan().getVersion()));
    }

    void removeStepContext() {
        ThreadContext.remove((String)"JJ_PLAN_NAME");
        ThreadContext.remove((String)"JJ_PLAN_VERSION");
    }

    @Override
    public String getName() {
        return this.stepName;
    }

    protected Logger getLogger() {
        return log;
    }

    protected void reportException(Map.Entry<Step, NextSteps.StepStatusHolder> entry, String message, Object ... params) {
        DocumentImpl doc = (DocumentImpl)entry.getValue().getDoc();
        StringWriter buff = new StringWriter();
        Exception e = entry.getValue().getException();
        e.printStackTrace(new PrintWriter(buff));
        String errorMsg = message + " " + e.getMessage() + "\n" + buff;
        doc.setStatus(Status.ERROR, errorMsg, new Serializable[]{params});
        doc.reportDocStatus();
        if (e instanceof InterruptedException) {
            log.debug("Step interrupted!", (Throwable)e);
        } else {
            log.error("Step Exception!", (Throwable)e);
        }
    }

    @Override
    public void executeDeferred() {
        this.deferred.forEach(Runnable::run);
    }

    @Override
    public void addDeferred(Runnable builderAction) {
        this.deferred.add(builderAction);
    }

    public DocumentProcessor getProcessor() {
        return this.processor;
    }

    @Override
    public void addPredecessor(StepImpl obj) {
        this.priorSteps.add(obj);
    }

    public String toString() {
        return this.getName();
    }

    public static class Builder
    extends NamedBuilder<StepImpl> {
        private StepImpl obj;

        public Builder() {
            if (this.whoAmI() == this.getClass()) {
                this.obj = new StepImpl();
            }
        }

        private Class whoAmI() {
            return new Object(){}.getClass().getEnclosingMethod().getDeclaringClass();
        }

        @Override
        protected StepImpl getObj() {
            return this.obj;
        }

        public Builder batchSize(int size) {
            this.getObj().batchSize = size;
            this.getObj().queue = new LinkedBlockingQueue(size);
            return this;
        }

        public Builder named(String stepName) {
            this.getObj().stepName = stepName;
            return this;
        }

        public Builder withShutdownWait(int millis) {
            this.getObj().shutdownTimeout = millis;
            return this;
        }

        public Builder routingBy(RouterBase.Builder<? extends Router> router) {
            StepImpl currObj = this.getObj();
            this.getObj().addDeferred(() -> {
                currObj.router = router.forStep(this.getObj()).build();
            });
            return this;
        }

        public Builder withProcessor(ConfiguredBuildable<? extends DocumentProcessor> processor) {
            StepImpl currObj = this.getObj();
            this.getObj().addDeferred(() -> {
                currObj.processor = (DocumentProcessor)processor.build();
            });
            return this;
        }

        public String getStepName() {
            return this.getObj().stepName;
        }

        @Override
        public StepImpl build() {
            StepImpl object = this.getObj();
            object.executeDeferred();
            int batchSize = object.batchSize;
            object.queue = new LinkedBlockingQueue(batchSize > 0 ? batchSize : 50);
            this.obj = new StepImpl();
            return object;
        }

        void addNextStep(Step step) {
            step.addPredecessor(this.getObj());
            this.getObj().nextSteps.put(step.getName(), step);
        }
    }

    private class DocumentConsumer
    implements Consumer<DocumentImpl> {
        private DocumentConsumer() {
        }

        @Override
        public void accept(DocumentImpl document) {
            Document[] documents;
            try {
                String[] incompleteOutputSteps;
                log.trace("DOC CONSUMER START");
                String[] stringArray = incompleteOutputSteps = document.getIncompleteOutputDestinations();
                int n = stringArray.length;
                for (int i = 0; i < n; ++i) {
                    String incompleteOutputStep = stringArray[i];
                    if (document.getStatus(incompleteOutputStep) != Status.ERROR && document.getStatus(incompleteOutputStep) != Status.DROPPED && document.getStatus(incompleteOutputStep) != Status.DEAD) continue;
                    log.fatal("ATTEMPTED TO CONSUME {}} DOCUMENT!!", (Object)document.getStatus(incompleteOutputStep));
                    log.fatal("offending doc:{}", (Object)document.getId());
                    log.fatal("This is a bug in JesterJ");
                    log.fatal((Object)new RuntimeException("Bad Doc Status:" + document.getStatus(incompleteOutputStep)));
                    Thread.dumpStack();
                    System.exit(9999);
                }
                String p1 = StepImpl.this.processor == null ? "null" : StepImpl.this.processor.getName();
                log.trace("accepting {}({}), sending to {} in {}", (Object)document.getId(), (Object)document.getOrigination(), (Object)p1, (Object)StepImpl.this.getName());
                documents = StepImpl.this.processor.processDocument(document);
                log.trace("finished {}({}), was sent to {} in {}", (Object)document.getId(), (Object)document.getOrigination(), (Object)p1, (Object)StepImpl.this.getName());
            }
            catch (Exception e) {
                log.warn("Exception processing step", (Throwable)e);
                document.stepStarted(StepImpl.this);
                document.setStatus(Status.ERROR, "Exception while processing document in {}. Message:{}", new Serializable[]{StepImpl.this.getName(), e.getMessage()});
                document.reportDocStatus();
                return;
            }
            if (documents != null) {
                for (Document documentResult : documents) {
                    ((DocumentImpl)documentResult).stepStarted(StepImpl.this);
                    StepImpl.this.pushToNextIfNotDropped(documentResult);
                }
            }
            log.trace("DOC CONSUMER END");
        }
    }
}

