/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.CacheBuilder;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.CacheLoader;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.cache.LoadingCache;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.DirectExecutionContext;
import org.apache.beam.runners.direct.DoFnLifecycleManager;
import org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator;
import org.apache.beam.runners.direct.DoFnLifecycleManagers;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ParDoEvaluator;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ParDoEvaluatorFactory<InputT, OutputT>
implements TransformEvaluatorFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class);
    private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> fnClones;
    private final EvaluationContext evaluationContext;
    private final PipelineOptions options;
    private final ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory;

    ParDoEvaluatorFactory(EvaluationContext evaluationContext, ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory, CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> doFnCacheLoader, PipelineOptions options) {
        this.evaluationContext = evaluationContext;
        this.options = options;
        this.runnerFactory = runnerFactory;
        this.fnClones = CacheBuilder.newBuilder().build(doFnCacheLoader);
    }

    static CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> basicDoFnCacheLoader() {
        return new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>(){

            @Override
            public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> application) throws Exception {
                return DoFnLifecycleManager.of(ParDoTranslation.getDoFn(application));
            }
        };
    }

    public <T> TransformEvaluator<T> forApplication(AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception {
        DoFnLifecycleManagerRemovingTransformEvaluator<?> evaluator = this.createEvaluator(application, inputBundle.getPCollection(), inputBundle.getKey(), ParDoTranslation.getSideInputs(application), ParDoTranslation.getMainOutputTag(application), ParDoTranslation.getAdditionalOutputTags(application).getAll());
        return evaluator;
    }

    @Override
    public void cleanup() throws Exception {
        DoFnLifecycleManagers.removeAllFromManagers(this.fnClones.asMap().values());
    }

    DoFnLifecycleManagerRemovingTransformEvaluator<InputT> createEvaluator(AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application, PCollection<InputT> mainInput, StructuralKey<?> inputBundleKey, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags) throws Exception {
        String stepName = this.evaluationContext.getStepName(application);
        DirectExecutionContext.DirectStepContext stepContext = this.evaluationContext.getExecutionContext(application, inputBundleKey).getStepContext(stepName);
        DoFnLifecycleManager fnManager = this.fnClones.getUnchecked(application);
        return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(this.createParDoEvaluator(application, inputBundleKey, mainInput, sideInputs, mainOutputTag, additionalOutputTags, stepContext, fnManager.get(), fnManager), fnManager);
    }

    ParDoEvaluator<InputT> createParDoEvaluator(AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application, StructuralKey<?> key, PCollection<InputT> mainInput, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, DirectExecutionContext.DirectStepContext stepContext, DoFn<InputT, OutputT> fn, DoFnLifecycleManager fnManager) throws Exception {
        try {
            return ParDoEvaluator.create(this.evaluationContext, this.options, stepContext, application, mainInput.getWindowingStrategy(), fn, key, sideInputs, mainOutputTag, additionalOutputTags, ParDoEvaluatorFactory.pcollections(application.getOutputs()), this.runnerFactory);
        }
        catch (Exception e) {
            try {
                fnManager.remove();
            }
            catch (Exception removalException) {
                LOG.error("Exception encountered while cleaning up in ParDo evaluator construction", (Throwable)removalException);
                e.addSuppressed(removalException);
            }
            throw e;
        }
    }

    static Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) {
        HashMap pcs = new HashMap();
        for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
            pcs.put(output.getKey(), (PCollection)output.getValue());
        }
        return pcs;
    }
}

