/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.dataflow.util;

import com.fasterxml.jackson.core.Base64Variants;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.collect.Lists;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.hash.Funnels;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.hash.Hasher;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.hash.Hashing;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.io.CountingOutputStream;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.io.Files;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.Futures;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.ListenableFuture;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.util.BackOffAdapter;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.ZipFiles;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PackageUtil {
    private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class);
    private static final int SANE_CLASSPATH_SIZE = 1000;
    private static final FluentBackoff BACKOFF_FACTORY = FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.standardSeconds((long)5L));
    private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();

    PackageUtil() {
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    static PackageAttributes createPackageAttributes(File source, String stagingPath, @Nullable String overridePackageName) {
        boolean directory = source.isDirectory();
        Hasher hasher = Hashing.md5().newHasher();
        OutputStream hashStream = Funnels.asOutputStream(hasher);
        try (CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream);){
            if (!directory) {
                Files.asByteSource(source).copyTo(countingOutputStream);
            } else {
                ZipFiles.zipDirectory((File)source, (OutputStream)countingOutputStream);
            }
            countingOutputStream.flush();
            long size = countingOutputStream.getCount();
            String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
            String uniqueName = PackageUtil.getUniqueContentName(source, hash);
            String resourcePath = FileSystems.matchNewResource((String)stagingPath, (boolean)true).resolve(uniqueName, (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
            DataflowPackage target = new DataflowPackage();
            target.setName(overridePackageName != null ? overridePackageName : uniqueName);
            target.setLocation(resourcePath);
            PackageAttributes packageAttributes = new PackageAttributes(size, hash, directory, target, source.getPath());
            return packageAttributes;
        }
        catch (IOException e) {
            throw new RuntimeException("Package setup failure for " + source, e);
        }
    }

    private static List<PackageAttributes> computePackageAttributes(Collection<String> classpathElements, final String stagingPath, ListeningExecutorService executorService) {
        LinkedList<ListenableFuture<PackageAttributes>> futures = new LinkedList<ListenableFuture<PackageAttributes>>();
        for (String classpathElement : classpathElements) {
            String userPackageName = null;
            if (classpathElement.contains("=")) {
                String[] components = classpathElement.split("=", 2);
                userPackageName = components[0];
                classpathElement = components[1];
            }
            final String packageName = userPackageName;
            final File file = new File(classpathElement);
            if (!file.exists()) {
                LOG.warn("Skipping non-existent classpath element {} that was specified.", (Object)classpathElement);
                continue;
            }
            ListenableFuture<PackageAttributes> future = executorService.submit(new Callable<PackageAttributes>(){

                @Override
                public PackageAttributes call() throws Exception {
                    return PackageUtil.createPackageAttributes(file, stagingPath, packageName);
                }
            });
            futures.add(future);
        }
        try {
            return (List)Futures.allAsList(futures).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while staging packages", e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Error while staging packages", e.getCause());
        }
    }

    private static WritableByteChannel makeWriter(String target, CreateOptions createOptions) throws IOException {
        return FileSystems.create((ResourceId)FileSystems.matchNewResource((String)target, (boolean)false), (CreateOptions)createOptions);
    }

    private static void stageOnePackage(PackageAttributes attributes, AtomicInteger numUploaded, AtomicInteger numCached, Sleeper retrySleeper, CreateOptions createOptions) {
        String source = attributes.getSourcePath();
        String target = attributes.getDataflowPackage().getLocation();
        try {
            try {
                long remoteLength = FileSystems.matchSingleFileSpec((String)target).sizeBytes();
                if (remoteLength == attributes.getSize()) {
                    LOG.debug("Skipping classpath element already staged: {} at {}", (Object)attributes.getSourcePath(), (Object)target);
                    numCached.incrementAndGet();
                    return;
                }
            }
            catch (FileNotFoundException fileNotFoundException) {
                // empty catch block
            }
            BackOff backoff = BackOffAdapter.toGcpBackOff((org.apache.beam.sdk.util.BackOff)BACKOFF_FACTORY.backoff());
            while (true) {
                try {
                    LOG.debug("Uploading classpath element {} to {}", (Object)source, (Object)target);
                    try (WritableByteChannel writer = PackageUtil.makeWriter(target, createOptions);){
                        PackageUtil.copyContent(source, writer);
                    }
                    numUploaded.incrementAndGet();
                }
                catch (IOException e) {
                    if (ERROR_EXTRACTOR.accessDenied(e)) {
                        String errorMessage = String.format("Uploaded failed due to permissions error, will NOT retry staging of classpath %s. Please verify credentials are valid and that you have write access to %s. Stale credentials can be resolved by executing 'gcloud auth application-default login'.", source, target);
                        LOG.error(errorMessage);
                        throw new IOException(errorMessage, e);
                    }
                    long sleep = backoff.nextBackOffMillis();
                    if (sleep == -1L) {
                        LOG.error("Upload failed, will NOT retry staging of classpath: {}", (Object)source, (Object)e);
                        throw e;
                    }
                    LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}", (Object)source, (Object)e);
                    retrySleeper.sleep(sleep);
                    continue;
                }
                break;
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Could not stage classpath element: " + source, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static List<DataflowPackage> stageClasspathElements(Collection<String> classpathElements, String stagingPath, CreateOptions createOptions) {
        ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(32));
        try {
            List<DataflowPackage> list = PackageUtil.stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT, executorService, createOptions);
            return list;
        }
        finally {
            executorService.shutdown();
        }
    }

    static List<DataflowPackage> stageClasspathElements(Collection<String> classpathElements, String stagingPath, final Sleeper retrySleeper, ListeningExecutorService executorService, final CreateOptions createOptions) {
        LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to prepare for execution.", (Object)classpathElements.size());
        if (classpathElements.size() > 1000) {
            LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically copies to all workers. Having this many entries on your classpath may be indicative of an issue in your pipeline. You may want to consider trimming the classpath to necessary dependencies only, using --filesToStage pipeline option to override what files are being staged, or bundling several dependencies into one.", (Object)classpathElements.size());
        }
        Preconditions.checkArgument(stagingPath != null, "Can't stage classpath elements because no staging location has been provided");
        LinkedList<PackageAttributes> packageAttributes = new LinkedList<PackageAttributes>(PackageUtil.computePackageAttributes(classpathElements, stagingPath, executorService));
        ArrayList<DataflowPackage> packages = Lists.newArrayListWithExpectedSize(packageAttributes.size());
        for (PackageAttributes attributes : packageAttributes) {
            packages.add(attributes.getDataflowPackage());
        }
        Collections.sort(packageAttributes, new PackageUploadOrder());
        final AtomicInteger numUploaded = new AtomicInteger(0);
        final AtomicInteger numCached = new AtomicInteger(0);
        LinkedList futures = new LinkedList();
        for (final PackageAttributes attributes : packageAttributes) {
            futures.add(executorService.submit(new Runnable(){

                @Override
                public void run() {
                    PackageUtil.stageOnePackage(attributes, numUploaded, numCached, retrySleeper, createOptions);
                }
            }));
        }
        try {
            Futures.allAsList(futures).get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while staging packages", e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Error while staging packages", e.getCause());
        }
        LOG.info("Staging files complete: {} files cached, {} files newly uploaded", (Object)numCached.get(), (Object)numUploaded.get());
        return packages;
    }

    static String getUniqueContentName(File classpathElement, String contentHash) {
        String fileName = Files.getNameWithoutExtension(classpathElement.getAbsolutePath());
        String fileExtension = Files.getFileExtension(classpathElement.getAbsolutePath());
        if (classpathElement.isDirectory()) {
            return fileName + "-" + contentHash + ".jar";
        }
        if (fileExtension.isEmpty()) {
            return fileName + "-" + contentHash;
        }
        return fileName + "-" + contentHash + "." + fileExtension;
    }

    private static void copyContent(String classpathElement, WritableByteChannel outputChannel) throws IOException {
        File classpathElementFile = new File(classpathElement);
        if (classpathElementFile.isDirectory()) {
            ZipFiles.zipDirectory((File)classpathElementFile, (OutputStream)Channels.newOutputStream(outputChannel));
        } else {
            Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel));
        }
    }

    static class PackageAttributes {
        private final boolean directory;
        private final long size;
        private final String hash;
        private final String sourcePath;
        private DataflowPackage dataflowPackage;

        public PackageAttributes(long size, String hash, boolean directory, DataflowPackage dataflowPackage, String sourcePath) {
            this.size = size;
            this.hash = Objects.requireNonNull(hash, "hash");
            this.directory = directory;
            this.sourcePath = Objects.requireNonNull(sourcePath, "sourcePath");
            this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage");
        }

        public DataflowPackage getDataflowPackage() {
            return this.dataflowPackage;
        }

        public boolean isDirectory() {
            return this.directory;
        }

        public long getSize() {
            return this.size;
        }

        public String getHash() {
            return this.hash;
        }

        public String getSourcePath() {
            return this.sourcePath;
        }
    }

    private static class PackageUploadOrder
    implements Comparator<PackageAttributes> {
        private PackageUploadOrder() {
        }

        @Override
        public int compare(PackageAttributes o1, PackageAttributes o2) {
            long sizeDiff = o2.getSize() - o1.getSize();
            if (sizeDiff != 0L) {
                return Long.signum(sizeDiff);
            }
            return o1.getHash().compareTo(o2.getHash());
        }
    }
}

