/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.twister2;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.tset.TBase;
import edu.iu.dsc.tws.api.tset.sets.StorableTBase;
import edu.iu.dsc.tws.api.tset.sets.TSet;
import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
import edu.iu.dsc.tws.tset.TBaseGraph;
import edu.iu.dsc.tws.tset.env.BatchTSetEnvironment;
import edu.iu.dsc.tws.tset.env.TSetEnvironment;
import edu.iu.dsc.tws.tset.links.BaseTLink;
import edu.iu.dsc.tws.tset.sets.BaseTSet;
import edu.iu.dsc.tws.tset.sets.BuildableTSet;
import edu.iu.dsc.tws.tset.sets.batch.CachedTSet;
import edu.iu.dsc.tws.tset.sets.batch.ComputeTSet;
import edu.iu.dsc.tws.tset.sets.batch.SinkTSet;
import edu.iu.dsc.tws.tset.worker.BatchTSetIWorker;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.twister2.translators.functions.DoFnFunction;
import org.apache.beam.runners.twister2.translators.functions.Twister2SinkFunction;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

public class BeamBatchWorker
implements Serializable,
BatchTSetIWorker {
    private static final @UnknownKeyFor @NonNull @Initialized String SIDEINPUTS = "sideInputs";
    private static final @UnknownKeyFor @NonNull @Initialized String LEAVES = "leaves";
    private static final @UnknownKeyFor @NonNull @Initialized String GRAPH = "graph";
    private /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized HashMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized BatchTSet<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputDataSets;
    private @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized TSet> leaves;

    public void execute(@UnknownKeyFor @NonNull @Initialized BatchTSetEnvironment env) {
        Config config = env.getConfig();
        LinkedHashMap sideInputIds = (LinkedHashMap)config.get(SIDEINPUTS);
        Set leaveIds = (Set)config.get(LEAVES);
        TBaseGraph graph = (TBaseGraph)config.get(GRAPH);
        env.settBaseGraph(graph);
        this.setupTSets(env, sideInputIds, leaveIds);
        this.resetEnv(env, graph);
        this.executePipeline(env);
    }

    private void resetEnv(@UnknownKeyFor @NonNull @Initialized BatchTSetEnvironment env, @UnknownKeyFor @NonNull @Initialized TBaseGraph graph) {
        Set nodes = graph.getNodes();
        for (TBase node : nodes) {
            if (node instanceof BaseTSet) {
                ((BaseTSet)node).setTSetEnv((TSetEnvironment)env);
                continue;
            }
            if (node instanceof BaseTLink) {
                ((BaseTLink)node).setTSetEnv((TSetEnvironment)env);
                continue;
            }
            throw new IllegalStateException("node must be either of type BaseTSet or BaseTLink");
        }
    }

    private void setupTSets(@UnknownKeyFor @NonNull @Initialized BatchTSetEnvironment env, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> sideInputIds, @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> leaveIds) {
        this.sideInputDataSets = new LinkedHashMap();
        this.leaves = new HashSet<TSet>();
        HashSet<BuildableTSet> newSources = new HashSet<BuildableTSet>();
        for (BuildableTSet buildableTSet : env.getGraph().getSources()) {
            newSources.add((BuildableTSet)env.getGraph().getNodeById(buildableTSet.getId()));
        }
        env.getGraph().setSources(newSources);
        for (Map.Entry entry : sideInputIds.entrySet()) {
            BatchTSet curr = (BatchTSet)env.getGraph().getNodeById((String)entry.getValue());
            this.sideInputDataSets.put((String)entry.getKey(), curr);
        }
        for (String string : leaveIds) {
            this.leaves.add((TSet)env.getGraph().getNodeById(string));
        }
    }

    public void executePipeline(@UnknownKeyFor @NonNull @Initialized BatchTSetEnvironment env) {
        HashMap<String, CachedTSet> sideInputTSets = new HashMap<String, CachedTSet>();
        for (Map.Entry<String, BatchTSet<?>> sides : this.sideInputDataSets.entrySet()) {
            BatchTSet<?> sideTSet = sides.getValue();
            this.addInputs((BaseTSet)sideTSet, sideInputTSets);
            CachedTSet tempCache = (CachedTSet)sideTSet.cache();
            sideInputTSets.put(sides.getKey(), tempCache);
        }
        for (TSet leaf : this.leaves) {
            SinkTSet sinkTSet = (SinkTSet)leaf.direct().sink(new Twister2SinkFunction());
            this.addInputs((BaseTSet)sinkTSet, sideInputTSets);
            this.eval(env, sinkTSet);
        }
    }

    private void addInputs(@UnknownKeyFor @NonNull @Initialized BaseTSet sinkTSet, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized CachedTSet> sideInputTSets) {
        if (sideInputTSets.isEmpty()) {
            return;
        }
        TBaseGraph graph = sinkTSet.getTBaseGraph();
        TBase currNode = null;
        ArrayDeque<BaseTSet> deque = new ArrayDeque<BaseTSet>();
        deque.add(sinkTSet);
        while (!deque.isEmpty()) {
            currNode = (TBase)deque.remove();
            deque.addAll(graph.getPredecessors(currNode));
            if (!(currNode instanceof ComputeTSet) || !(((ComputeTSet)currNode).getComputeFunc() instanceof DoFnFunction)) continue;
            Set<String> sideInputKeys = ((DoFnFunction)((ComputeTSet)currNode).getComputeFunc()).getSideInputKeys();
            for (String sideInputKey : sideInputKeys) {
                if (!sideInputTSets.containsKey(sideInputKey)) {
                    throw new IllegalStateException("Side input not found for key " + sideInputKey);
                }
                ((ComputeTSet)currNode).addInput(sideInputKey, (StorableTBase)sideInputTSets.get(sideInputKey));
            }
        }
    }

    public void eval(@UnknownKeyFor @NonNull @Initialized BatchTSetEnvironment env, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized SinkTSet<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> tSet) {
        env.run(tSet);
    }
}

