/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.twister2;

import edu.iu.dsc.tws.api.JobConfig;
import edu.iu.dsc.tws.api.Twister2Job;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
import edu.iu.dsc.tws.api.scheduler.Twister2JobState;
import edu.iu.dsc.tws.api.tset.sets.TSet;
import edu.iu.dsc.tws.api.tset.sets.batch.BatchTSet;
import edu.iu.dsc.tws.local.LocalSubmitter;
import edu.iu.dsc.tws.rsched.core.ResourceAllocator;
import edu.iu.dsc.tws.rsched.job.Twister2Submitter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded;
import org.apache.beam.runners.core.construction.resources.PipelineResources;
import org.apache.beam.runners.twister2.BeamBatchWorker;
import org.apache.beam.runners.twister2.Twister2PipelineExecutionEnvironment;
import org.apache.beam.runners.twister2.Twister2PipelineOptions;
import org.apache.beam.runners.twister2.Twister2PipelineResult;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.FileStagingOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;

public class Twister2Runner
extends PipelineRunner<PipelineResult> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = Logger.getLogger(Twister2Runner.class.getName());
    private static final @UnknownKeyFor @NonNull @Initialized String SIDEINPUTS = "sideInputs";
    private static final @UnknownKeyFor @NonNull @Initialized String LEAVES = "leaves";
    private static final @UnknownKeyFor @NonNull @Initialized String GRAPH = "graph";
    private final @UnknownKeyFor @NonNull @Initialized Twister2PipelineOptions options;

    protected Twister2Runner(@UnknownKeyFor @NonNull @Initialized Twister2PipelineOptions options) {
        this.options = options;
    }

    public static @UnknownKeyFor @NonNull @Initialized Twister2Runner fromOptions(@UnknownKeyFor @NonNull @Initialized PipelineOptions options) {
        return new Twister2Runner((Twister2PipelineOptions)PipelineOptionsValidator.validate(Twister2PipelineOptions.class, (PipelineOptions)options));
    }

    public @UnknownKeyFor @NonNull @Initialized PipelineResult run(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline) {
        Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(this.options);
        LOG.info("Translating pipeline to Twister2 program.");
        pipeline.replaceAll(Twister2Runner.getDefaultOverrides());
        if (!ExperimentalOptions.hasExperiment((PipelineOptions)pipeline.getOptions(), (String)"beam_fn_api")) {
            SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary((Pipeline)pipeline);
        }
        env.translate(pipeline);
        this.setupSystem(this.options);
        HashMap<String, Object> configMap = new HashMap<String, Object>();
        JobConfig jobConfig = new JobConfig();
        if (this.isLocalMode(this.options)) {
            this.options.setParallelism(1);
            configMap.put(SIDEINPUTS, this.extractNames(env.getSideInputs()));
            configMap.put(LEAVES, this.extractNames(env.getLeaves()));
            configMap.put(GRAPH, env.getTSetGraph());
            configMap.put("twister2.network.buffer.size", 32000);
            configMap.put("twister2.network.sendBuffer.count", this.options.getParallelism());
            LOG.warning("Twister2 Local Mode currently only supports single worker");
        } else {
            jobConfig.put((Object)SIDEINPUTS, this.extractNames(env.getSideInputs()));
            jobConfig.put((Object)LEAVES, this.extractNames(env.getLeaves()));
            jobConfig.put((Object)GRAPH, (Object)env.getTSetGraph());
        }
        Config config = ResourceAllocator.loadConfig(configMap);
        int workers = this.options.getParallelism();
        Twister2Job twister2Job = Twister2Job.newBuilder().setJobName(this.options.getJobName()).setWorkerClass(BeamBatchWorker.class).addComputeResource((double)this.options.getWorkerCPUs(), this.options.getRamMegaBytes(), workers).setConfig(jobConfig).build();
        Twister2JobState jobState = this.isLocalMode(this.options) ? LocalSubmitter.submitJob((Twister2Job)twister2Job, (Config)config) : Twister2Submitter.submitJob((Twister2Job)twister2Job, (Config)config);
        Twister2PipelineResult result = new Twister2PipelineResult(jobState);
        return result;
    }

    private @UnknownKeyFor @NonNull @Initialized boolean isLocalMode(@UnknownKeyFor @NonNull @Initialized Twister2PipelineOptions options) {
        return options.getTwister2Home() == null || "".equals(options.getTwister2Home());
    }

    public @UnknownKeyFor @NonNull @Initialized PipelineResult runTest(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline) {
        Twister2PipelineExecutionEnvironment env = new Twister2PipelineExecutionEnvironment(this.options);
        LOG.info("Translating pipeline to Twister2 program.");
        pipeline.replaceAll(Twister2Runner.getDefaultOverrides());
        if (!ExperimentalOptions.hasExperiment((PipelineOptions)pipeline.getOptions(), (String)"beam_fn_api")) {
            SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary((Pipeline)pipeline);
        }
        env.translate(pipeline);
        this.setupSystemTest(this.options);
        HashMap<String, Object> configMap = new HashMap<String, Object>();
        configMap.put(SIDEINPUTS, this.extractNames(env.getSideInputs()));
        configMap.put(LEAVES, this.extractNames(env.getLeaves()));
        configMap.put(GRAPH, env.getTSetGraph());
        configMap.put("twister2.network.buffer.size", 32000);
        configMap.put("twister2.network.sendBuffer.count", this.options.getParallelism());
        Config config = ResourceAllocator.loadConfig(configMap);
        JobConfig jobConfig = new JobConfig();
        int workers = this.options.getParallelism();
        Twister2Job twister2Job = Twister2Job.newBuilder().setJobName(this.options.getJobName()).setWorkerClass(BeamBatchWorker.class).addComputeResource((double)this.options.getWorkerCPUs(), this.options.getRamMegaBytes(), workers).setConfig(jobConfig).build();
        Twister2JobState jobState = LocalSubmitter.submitJob((Twister2Job)twister2Job, (Config)config);
        Twister2PipelineResult result = new Twister2PipelineResult(jobState);
        if (result.state == PipelineResult.State.FAILED) {
            throw new RuntimeException("Pipeline execution failed", jobState.getCause());
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setupSystem(@UnknownKeyFor @NonNull @Initialized Twister2PipelineOptions options) {
        this.prepareFilesToStage(options);
        this.zipFilesToStage(options);
        System.setProperty("cluster_type", options.getClusterType());
        System.setProperty("job_file", options.getJobFileZip());
        System.setProperty("job_type", options.getJobType());
        if (this.isLocalMode(options)) {
            System.setProperty("twister2_home", System.getProperty("java.io.tmpdir"));
            System.setProperty("config_dir", System.getProperty("java.io.tmpdir") + "/conf/");
        } else {
            String[] filesList;
            System.setProperty("twister2_home", options.getTwister2Home());
            System.setProperty("config_dir", options.getTwister2Home() + "/conf/");
            File cDir = new File(System.getProperty("config_dir"), options.getClusterType());
            for (String file : filesList = new String[]{"core.yaml", "network.yaml", "data.yaml", "resource.yaml", "task.yaml"}) {
                File toCheck = new File(cDir, file);
                if (toCheck.exists()) continue;
                throw new Twister2RuntimeException("Couldn't find " + file + " in config directory specified.");
            }
            FileInputStream fis = null;
            try {
                fis = new FileInputStream(new File(cDir, "logger.properties"));
                LogManager.getLogManager().readConfiguration(fis);
                fis.close();
            }
            catch (IOException e) {
                LOG.warning("Couldn't load logging configuration");
            }
            finally {
                if (fis != null) {
                    try {
                        fis.close();
                    }
                    catch (IOException e) {
                        LOG.info(e.getMessage());
                    }
                }
            }
        }
    }

    private void setupSystemTest(@UnknownKeyFor @NonNull @Initialized Twister2PipelineOptions options) {
        this.prepareFilesToStage(options);
        this.zipFilesToStage(options);
        System.setProperty("cluster_type", options.getClusterType());
        System.setProperty("twister2_home", System.getProperty("java.io.tmpdir"));
        System.setProperty("job_file", options.getJobFileZip());
        System.setProperty("job_type", options.getJobType());
    }

    private @UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized String> extractNames(@UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized TSet> leaves) {
        HashSet<String> results = new HashSet<String>();
        for (TSet leaf : leaves) {
            results.add(leaf.getId());
        }
        return results;
    }

    private @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized String> extractNames(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized BatchTSet<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?>> sideInputs) {
        LinkedHashMap<String, String> results = new LinkedHashMap<String, String>();
        for (Map.Entry<String, BatchTSet<?>> entry : sideInputs.entrySet()) {
            results.put(entry.getKey(), entry.getValue().getId());
        }
        return results;
    }

    private void prepareFilesToStage(@UnknownKeyFor @NonNull @Initialized Twister2PipelineOptions options) {
        PipelineResources.prepareFilesForStaging((FileStagingOptions)options);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void zipFilesToStage(@UnknownKeyFor @NonNull @Initialized Twister2PipelineOptions options) {
        File zipFile = null;
        HashSet<String> jarSet = new HashSet<String>();
        List filesToStage = options.getFilesToStage();
        ArrayList<String> trimmed = new ArrayList<String>();
        for (String file : filesToStage) {
            if (file.contains("/org/twister2")) continue;
            trimmed.add(file);
        }
        FileInputStream fis = null;
        try {
            zipFile = File.createTempFile("twister2-", ".zip");
            FileOutputStream fos = new FileOutputStream(zipFile);
            ZipOutputStream zipOut = new ZipOutputStream(fos);
            zipOut.putNextEntry(new ZipEntry("lib/"));
            for (String srcFile : trimmed) {
                int length;
                File fileToZip = new File(srcFile);
                if (jarSet.contains(fileToZip.getName())) continue;
                jarSet.add(fileToZip.getName());
                fis = new FileInputStream(fileToZip);
                ZipEntry zipEntry = new ZipEntry("lib/" + fileToZip.getName());
                zipOut.putNextEntry(zipEntry);
                byte[] bytes = new byte[1024];
                while ((length = fis.read(bytes)) >= 0) {
                    zipOut.write(bytes, 0, length);
                }
                fis.close();
            }
            zipOut.close();
            fos.close();
            zipFile.deleteOnExit();
        }
        catch (FileNotFoundException e) {
            LOG.info(e.getMessage());
        }
        catch (IOException e) {
            LOG.info(e.getMessage());
        }
        finally {
            if (fis != null) {
                try {
                    fis.close();
                }
                catch (IOException e) {
                    LOG.info(e.getMessage());
                }
            }
        }
        if (zipFile != null) {
            options.setJobFileZip(zipFile.getPath());
        }
    }

    private static @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized PTransformOverride> getDefaultOverrides() {
        ImmutableList overrides = ImmutableList.builder().add((Object)PTransformOverride.of((PTransformMatcher)PTransformMatchers.splittableParDo(), (PTransformOverrideFactory)new SplittableParDo.OverrideFactory())).add((Object)PTransformOverride.of((PTransformMatcher)PTransformMatchers.urnEqualTo((String)"beam:transform:sdf_process_keyed_elements:v1"), (PTransformOverrideFactory)new SplittableParDoNaiveBounded.OverrideFactory())).build();
        return overrides;
    }
}

