/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.runners.direct.AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.CommittedResult;
import org.apache.beam.runners.direct.CompletionCallback;
import org.apache.beam.runners.direct.DirectGraph;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ModelEnforcementFactory;
import org.apache.beam.runners.direct.PipelineExecutor;
import org.apache.beam.runners.direct.RootProviderRegistry;
import org.apache.beam.runners.direct.StepAndKey;
import org.apache.beam.runners.direct.TransformEvaluatorRegistry;
import org.apache.beam.runners.direct.TransformExecutor;
import org.apache.beam.runners.direct.TransformExecutorService;
import org.apache.beam.runners.direct.TransformExecutorServices;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Optional;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheBuilder;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheLoader;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.LoadingCache;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.RemovalListener;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.RemovalNotification;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.repackaged.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.runners.direct.repackaged.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.beam.runners.direct.repackaged.javax.annotation.Nullable;
import org.apache.beam.runners.direct.repackaged.runners.core.KeyedWorkItem;
import org.apache.beam.runners.direct.repackaged.runners.core.KeyedWorkItems;
import org.apache.beam.runners.direct.repackaged.runners.core.TimerInternals;
import org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ExecutorServiceParallelExecutor
implements PipelineExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
    private final int targetParallelism;
    private final ExecutorService executorService;
    private final DirectGraph graph;
    private final RootProviderRegistry rootProviderRegistry;
    private final TransformEvaluatorRegistry registry;
    private final Map<String, Collection<ModelEnforcementFactory>> transformEnforcements;
    private final EvaluationContext evaluationContext;
    private final LoadingCache<StepAndKey, TransformExecutorService> executorServices;
    private final Queue<ExecutorUpdate> allUpdates;
    private final BlockingQueue<VisibleExecutorUpdate> visibleUpdates;
    private final TransformExecutorService parallelExecutorService;
    private final CompletionCallback defaultCompletionCallback;
    private final ConcurrentMap<AppliedPTransform<?, ?, ?>, ConcurrentLinkedQueue<CommittedBundle<?>>> pendingRootBundles;
    private final AtomicReference<ExecutorState> state = new AtomicReference<ExecutorState>(ExecutorState.QUIESCENT);
    private final AtomicLong outstandingWork = new AtomicLong();
    private AtomicReference<PipelineResult.State> pipelineState = new AtomicReference<PipelineResult.State>(PipelineResult.State.RUNNING);

    public static ExecutorServiceParallelExecutor create(int targetParallelism, DirectGraph graph, RootProviderRegistry rootProviderRegistry, TransformEvaluatorRegistry registry, Map<String, Collection<ModelEnforcementFactory>> transformEnforcements, EvaluationContext context) {
        return new ExecutorServiceParallelExecutor(targetParallelism, graph, rootProviderRegistry, registry, transformEnforcements, context);
    }

    private ExecutorServiceParallelExecutor(int targetParallelism, DirectGraph graph, RootProviderRegistry rootProviderRegistry, TransformEvaluatorRegistry registry, Map<String, Collection<ModelEnforcementFactory>> transformEnforcements, EvaluationContext context) {
        this.targetParallelism = targetParallelism;
        this.executorService = Executors.newFixedThreadPool(targetParallelism, new ThreadFactoryBuilder().setThreadFactory(MoreExecutors.platformThreadFactory()).setNameFormat("direct-runner-worker").build());
        this.graph = graph;
        this.rootProviderRegistry = rootProviderRegistry;
        this.registry = registry;
        this.transformEnforcements = transformEnforcements;
        this.evaluationContext = context;
        this.executorServices = CacheBuilder.newBuilder().weakValues().removalListener(this.shutdownExecutorServiceListener()).build(this.serialTransformExecutorServiceCacheLoader());
        this.allUpdates = new ConcurrentLinkedQueue<ExecutorUpdate>();
        this.visibleUpdates = new LinkedBlockingQueue<VisibleExecutorUpdate>();
        this.parallelExecutorService = TransformExecutorServices.parallel(this.executorService);
        this.defaultCompletionCallback = new TimerIterableCompletionCallback(Collections.emptyList());
        this.pendingRootBundles = new ConcurrentHashMap();
    }

    private CacheLoader<StepAndKey, TransformExecutorService> serialTransformExecutorServiceCacheLoader() {
        return new CacheLoader<StepAndKey, TransformExecutorService>(){

            @Override
            public TransformExecutorService load(StepAndKey stepAndKey) throws Exception {
                return TransformExecutorServices.serial(ExecutorServiceParallelExecutor.this.executorService);
            }
        };
    }

    private RemovalListener<StepAndKey, TransformExecutorService> shutdownExecutorServiceListener() {
        return new RemovalListener<StepAndKey, TransformExecutorService>(){

            @Override
            public void onRemoval(RemovalNotification<StepAndKey, TransformExecutorService> notification) {
                TransformExecutorService service = (TransformExecutorService)notification.getValue();
                if (service != null) {
                    service.shutdown();
                }
            }
        };
    }

    @Override
    public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
        int numTargetSplits = Math.max(3, this.targetParallelism);
        for (AppliedPTransform<?, ?, ?> root : roots) {
            ConcurrentLinkedQueue pending = new ConcurrentLinkedQueue();
            try {
                Collection<CommittedBundle<?>> initialInputs = this.rootProviderRegistry.getInitialInputs(root, numTargetSplits);
                pending.addAll(initialInputs);
            }
            catch (Exception e) {
                throw UserCodeException.wrap((Throwable)e);
            }
            this.pendingRootBundles.put(root, pending);
        }
        this.evaluationContext.initialize(this.pendingRootBundles);
        MonitorRunnable monitorRunnable = new MonitorRunnable();
        this.executorService.submit(monitorRunnable);
    }

    private void scheduleConsumption(AppliedPTransform<?, ?, ?> consumer, CommittedBundle<?> bundle, CompletionCallback onComplete) {
        this.evaluateBundle(consumer, bundle, onComplete);
    }

    private <T> void evaluateBundle(AppliedPTransform<?, ?, ?> transform, CommittedBundle<T> bundle, CompletionCallback onComplete) {
        TransformExecutorService transformExecutor;
        if (this.isKeyed((PValue)bundle.getPCollection())) {
            StepAndKey stepAndKey = StepAndKey.of(transform, bundle.getKey());
            transformExecutor = this.executorServices.getUnchecked(stepAndKey);
        } else {
            transformExecutor = this.parallelExecutorService;
        }
        Collection enforcements = MoreObjects.firstNonNull(this.transformEnforcements.get(PTransformTranslation.urnForTransform(transform.getTransform())), Collections.emptyList());
        TransformExecutor<T> callable = TransformExecutor.create(this.evaluationContext, this.registry, enforcements, bundle, transform, onComplete, transformExecutor);
        this.outstandingWork.incrementAndGet();
        if (!this.pipelineState.get().isTerminal()) {
            transformExecutor.schedule(callable);
        }
    }

    private boolean isKeyed(PValue pvalue) {
        return this.evaluationContext.isKeyed(pvalue);
    }

    private void scheduleConsumers(ExecutorUpdate update) {
        CommittedBundle<?> bundle = update.getBundle().get();
        for (AppliedPTransform<?, ?, ?> consumer : update.getConsumers()) {
            this.scheduleConsumption(consumer, bundle, this.defaultCompletionCallback);
        }
    }

    @Override
    public PipelineResult.State waitUntilFinish(Duration duration) throws Exception {
        Instant completionTime = duration.equals((Object)Duration.ZERO) ? new Instant(Long.MAX_VALUE) : Instant.now().plus((ReadableDuration)duration);
        VisibleExecutorUpdate update = null;
        while (Instant.now().isBefore((ReadableInstant)completionTime) && (update == null || this.isTerminalStateUpdate(update))) {
            update = this.visibleUpdates.poll(25L, TimeUnit.MILLISECONDS);
            if (update == null && this.pipelineState.get().isTerminal()) {
                return this.pipelineState.get();
            }
            if (update == null || !update.thrown.isPresent()) continue;
            Throwable thrown = (Throwable)update.thrown.get();
            if (thrown instanceof Exception) {
                throw (Exception)thrown;
            }
            if (thrown instanceof Error) {
                throw (Error)thrown;
            }
            throw new Exception("Unknown Type of Throwable", thrown);
        }
        return this.pipelineState.get();
    }

    @Override
    public PipelineResult.State getPipelineState() {
        return this.pipelineState.get();
    }

    private boolean isTerminalStateUpdate(VisibleExecutorUpdate update) {
        return update.getNewState() != null || !update.getNewState().isTerminal();
    }

    @Override
    public void stop() {
        this.shutdownIfNecessary(PipelineResult.State.CANCELLED);
        while (!this.visibleUpdates.offer(VisibleExecutorUpdate.cancelled())) {
            this.visibleUpdates.poll();
        }
    }

    private void shutdownIfNecessary(PipelineResult.State newState) {
        if (!newState.isTerminal()) {
            return;
        }
        LOG.debug("Pipeline has terminated. Shutting down.");
        this.pipelineState.compareAndSet(PipelineResult.State.RUNNING, newState);
        this.executorServices.invalidateAll();
        this.executorServices.cleanUp();
        this.parallelExecutorService.shutdown();
        this.executorService.shutdown();
        try {
            this.registry.cleanup();
        }
        catch (Exception e) {
            this.visibleUpdates.add(VisibleExecutorUpdate.fromException(e));
        }
    }

    private static enum ExecutorState {
        ACTIVE,
        PROCESSING,
        QUIESCING,
        QUIESCENT;

    }

    private class MonitorRunnable
    implements Runnable {
        private final String runnableName;
        private boolean exceptionThrown;

        private MonitorRunnable() {
            this.runnableName = String.format("%s$%s-monitor", ExecutorServiceParallelExecutor.this.evaluationContext.getPipelineOptions().getAppName(), ExecutorServiceParallelExecutor.class.getSimpleName());
            this.exceptionThrown = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            String oldName = Thread.currentThread().getName();
            Thread.currentThread().setName(this.runnableName);
            try {
                boolean noWorkOutstanding = ExecutorServiceParallelExecutor.this.outstandingWork.get() == 0L;
                ExecutorState startingState = (ExecutorState)((Object)ExecutorServiceParallelExecutor.this.state.get());
                if (startingState == ExecutorState.ACTIVE) {
                    ExecutorServiceParallelExecutor.this.state.compareAndSet(ExecutorState.ACTIVE, ExecutorState.PROCESSING);
                } else if (startingState == ExecutorState.PROCESSING && noWorkOutstanding) {
                    ExecutorServiceParallelExecutor.this.state.compareAndSet(ExecutorState.PROCESSING, ExecutorState.QUIESCING);
                } else if (startingState == ExecutorState.QUIESCING && noWorkOutstanding) {
                    ExecutorServiceParallelExecutor.this.state.compareAndSet(ExecutorState.QUIESCING, ExecutorState.QUIESCENT);
                }
                this.fireTimers();
                ArrayList<ExecutorUpdate> updates = new ArrayList<ExecutorUpdate>();
                ExecutorUpdate pendingUpdate = (ExecutorUpdate)ExecutorServiceParallelExecutor.this.allUpdates.poll();
                while (pendingUpdate != null) {
                    updates.add(pendingUpdate);
                    pendingUpdate = (ExecutorUpdate)ExecutorServiceParallelExecutor.this.allUpdates.poll();
                }
                for (ExecutorUpdate update : updates) {
                    this.applyUpdate(noWorkOutstanding, startingState, update);
                }
                this.addWorkIfNecessary();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                LOG.error("Monitor died due to being interrupted");
                while (!ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromException(e))) {
                    ExecutorServiceParallelExecutor.this.visibleUpdates.poll();
                }
            }
            catch (Exception t) {
                LOG.error("Monitor thread died due to exception", (Throwable)t);
                while (!ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromException(t))) {
                    ExecutorServiceParallelExecutor.this.visibleUpdates.poll();
                }
            }
            finally {
                if (!this.shouldShutdown()) {
                    ExecutorServiceParallelExecutor.this.executorService.submit(this);
                }
                Thread.currentThread().setName(oldName);
            }
        }

        private void applyUpdate(boolean noWorkOutstanding, ExecutorState startingState, ExecutorUpdate update) {
            LOG.debug("Executor Update: {}", (Object)update);
            if (update.getBundle().isPresent()) {
                if (ExecutorState.ACTIVE == startingState || ExecutorState.PROCESSING == startingState && noWorkOutstanding) {
                    ExecutorServiceParallelExecutor.this.scheduleConsumers(update);
                } else {
                    ExecutorServiceParallelExecutor.this.allUpdates.offer(update);
                }
            } else if (update.getException().isPresent()) {
                Preconditions.checkState(ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.fromException(update.getException().get())), "VisibleUpdates should always be able to receive an offered update");
                this.exceptionThrown = true;
            }
        }

        private void fireTimers() throws Exception {
            try {
                for (WatermarkManager.FiredTimers transformTimers : ExecutorServiceParallelExecutor.this.evaluationContext.extractFiredTimers()) {
                    Collection<TimerInternals.TimerData> delivery = transformTimers.getTimers();
                    KeyedWorkItem work = KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery);
                    CommittedBundle bundle = ExecutorServiceParallelExecutor.this.evaluationContext.createKeyedBundle(transformTimers.getKey(), (PCollection)Iterables.getOnlyElement(transformTimers.getTransform().getInputs().values())).add(WindowedValue.valueInGlobalWindow(work)).commit(ExecutorServiceParallelExecutor.this.evaluationContext.now());
                    ExecutorServiceParallelExecutor.this.scheduleConsumption(transformTimers.getTransform(), bundle, new TimerIterableCompletionCallback(delivery));
                    ExecutorServiceParallelExecutor.this.state.set(ExecutorState.ACTIVE);
                }
            }
            catch (Exception e) {
                LOG.error("Internal Error while delivering timers", (Throwable)e);
                throw e;
            }
        }

        private boolean shouldShutdown() {
            PipelineResult.State nextState = PipelineResult.State.UNKNOWN;
            if (this.exceptionThrown) {
                nextState = PipelineResult.State.FAILED;
            } else if (ExecutorServiceParallelExecutor.this.evaluationContext.isDone()) {
                ExecutorServiceParallelExecutor.this.visibleUpdates.offer(VisibleExecutorUpdate.finished());
                nextState = PipelineResult.State.DONE;
            }
            ExecutorServiceParallelExecutor.this.shutdownIfNecessary(nextState);
            return ((PipelineResult.State)ExecutorServiceParallelExecutor.this.pipelineState.get()).isTerminal();
        }

        private void addWorkIfNecessary() {
            if (ExecutorServiceParallelExecutor.this.state.get() == ExecutorState.QUIESCENT) {
                for (Map.Entry pendingRootEntry : ExecutorServiceParallelExecutor.this.pendingRootBundles.entrySet()) {
                    ArrayList<CommittedBundle> bundles = new ArrayList<CommittedBundle>();
                    while (!((ConcurrentLinkedQueue)pendingRootEntry.getValue()).isEmpty()) {
                        CommittedBundle bundle = (CommittedBundle)((ConcurrentLinkedQueue)pendingRootEntry.getValue()).poll();
                        bundles.add(bundle);
                    }
                    for (CommittedBundle bundle : bundles) {
                        ExecutorServiceParallelExecutor.this.scheduleConsumption((AppliedPTransform)pendingRootEntry.getKey(), bundle, ExecutorServiceParallelExecutor.this.defaultCompletionCallback);
                        ExecutorServiceParallelExecutor.this.state.set(ExecutorState.ACTIVE);
                    }
                }
            }
        }
    }

    private static class VisibleExecutorUpdate {
        private final Optional<? extends Throwable> thrown;
        @Nullable
        private final PipelineResult.State newState;

        public static VisibleExecutorUpdate fromException(Exception e) {
            return new VisibleExecutorUpdate(null, e);
        }

        public static VisibleExecutorUpdate fromError(Error err) {
            return new VisibleExecutorUpdate(PipelineResult.State.FAILED, err);
        }

        public static VisibleExecutorUpdate finished() {
            return new VisibleExecutorUpdate(PipelineResult.State.DONE, null);
        }

        public static VisibleExecutorUpdate cancelled() {
            return new VisibleExecutorUpdate(PipelineResult.State.CANCELLED, null);
        }

        private VisibleExecutorUpdate(PipelineResult.State newState, @Nullable Throwable exception) {
            this.thrown = Optional.fromNullable(exception);
            this.newState = newState;
        }

        public PipelineResult.State getNewState() {
            return this.newState;
        }
    }

    static abstract class ExecutorUpdate {
        ExecutorUpdate() {
        }

        public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle, Collection<AppliedPTransform<?, ?, ?>> consumers) {
            return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(Optional.of(bundle), consumers, Optional.absent());
        }

        public static ExecutorUpdate fromException(Exception e) {
            return new AutoValue_ExecutorServiceParallelExecutor_ExecutorUpdate(Optional.absent(), Collections.emptyList(), Optional.of(e));
        }

        public abstract Optional<? extends CommittedBundle<?>> getBundle();

        public abstract Collection<AppliedPTransform<?, ?, ?>> getConsumers();

        public abstract Optional<? extends Exception> getException();
    }

    private class TimerIterableCompletionCallback
    implements CompletionCallback {
        private final Iterable<TimerInternals.TimerData> timers;

        protected TimerIterableCompletionCallback(Iterable<TimerInternals.TimerData> timers) {
            this.timers = timers;
        }

        @Override
        public final CommittedResult handleResult(CommittedBundle<?> inputBundle, TransformResult<?> result) {
            CommittedResult committedResult = ExecutorServiceParallelExecutor.this.evaluationContext.handleResult(inputBundle, this.timers, result);
            for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
                ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle, ExecutorServiceParallelExecutor.this.graph.getPerElementConsumers((PValue)outputBundle.getPCollection())));
            }
            Optional<CommittedBundle<?>> unprocessedInputs = committedResult.getUnprocessedInputs();
            if (unprocessedInputs.isPresent()) {
                if (inputBundle.getPCollection() == null) {
                    ((ConcurrentLinkedQueue)ExecutorServiceParallelExecutor.this.pendingRootBundles.get(result.getTransform())).offer(unprocessedInputs.get());
                } else {
                    ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromBundle(unprocessedInputs.get(), Collections.singleton(committedResult.getTransform())));
                }
            }
            if (!committedResult.getProducedOutputTypes().isEmpty()) {
                ExecutorServiceParallelExecutor.this.state.set(ExecutorState.ACTIVE);
            }
            ExecutorServiceParallelExecutor.this.outstandingWork.decrementAndGet();
            return committedResult;
        }

        @Override
        public void handleEmpty(AppliedPTransform<?, ?, ?> transform) {
            ExecutorServiceParallelExecutor.this.outstandingWork.decrementAndGet();
        }

        @Override
        public final void handleException(CommittedBundle<?> inputBundle, Exception e) {
            ExecutorServiceParallelExecutor.this.allUpdates.offer(ExecutorUpdate.fromException(e));
            ExecutorServiceParallelExecutor.this.outstandingWork.decrementAndGet();
        }

        @Override
        public void handleError(Error err) {
            ExecutorServiceParallelExecutor.this.visibleUpdates.add(VisibleExecutorUpdate.fromError(err));
        }
    }
}

