/*
 * Decompiled with CFR 0.152.
 */
package io.druid.indexer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.InputSupplier;
import com.google.common.io.OutputSupplier;
import com.metamx.common.FileUtils;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.RetryUtils;
import com.metamx.common.logger.Logger;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.Jobby;
import io.druid.indexer.updater.HadoopDruidConverterConfig;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.joda.time.DateTime;

public class JobHelper {
    private static final Logger log = new Logger(JobHelper.class);
    private static final int NUM_RETRIES = 8;
    private static final int SECONDS_BETWEEN_RETRIES = 2;
    private static final int DEFAULT_FS_BUFFER_SIZE = 262144;
    private static final Pattern SNAPSHOT_JAR = Pattern.compile(".*\\-SNAPSHOT(-selfcontained)?\\.jar$");

    public static Path distributedClassPath(String path) {
        return JobHelper.distributedClassPath(new Path(path));
    }

    public static Path distributedClassPath(Path base) {
        return new Path(base, "classpath");
    }

    public static void authenticate(HadoopDruidIndexerConfig config) {
        String principal = HadoopDruidIndexerConfig.HADOOP_KERBEROS_CONFIG.getPrincipal();
        String keytab = HadoopDruidIndexerConfig.HADOOP_KERBEROS_CONFIG.getKeytab();
        if (!Strings.isNullOrEmpty((String)principal) && !Strings.isNullOrEmpty((String)keytab)) {
            Configuration conf = new Configuration();
            UserGroupInformation.setConfiguration((Configuration)conf);
            if (UserGroupInformation.isSecurityEnabled()) {
                try {
                    if (!UserGroupInformation.getCurrentUser().hasKerberosCredentials() || !UserGroupInformation.getCurrentUser().getUserName().equals(principal)) {
                        log.info("trying to authenticate user [%s] with keytab [%s]", new Object[]{principal, keytab});
                        UserGroupInformation.loginUserFromKeytab((String)principal, (String)keytab);
                    }
                }
                catch (IOException e) {
                    throw new ISE((Throwable)e, "Failed to authenticate user principal [%s] with keytab [%s]", new Object[]{principal, keytab});
                }
            }
        }
    }

    public static void setupClasspath(final Path distributedClassPath, final Path intermediateClassPath, final Job job) throws IOException {
        String classpathProperty = System.getProperty("druid.hadoop.internal.classpath");
        if (classpathProperty == null) {
            classpathProperty = System.getProperty("java.class.path");
        }
        String[] jarFiles = classpathProperty.split(File.pathSeparator);
        Configuration conf = job.getConfiguration();
        final FileSystem fs = distributedClassPath.getFileSystem(conf);
        if (fs instanceof LocalFileSystem) {
            return;
        }
        for (String jarFilePath : jarFiles) {
            final File jarFile = new File(jarFilePath);
            if (!jarFile.getName().endsWith(".jar")) continue;
            try {
                RetryUtils.retry((Callable)new Callable<Boolean>(){

                    @Override
                    public Boolean call() throws Exception {
                        if (JobHelper.isSnapshot(jarFile)) {
                            JobHelper.addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job);
                        } else {
                            JobHelper.addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job);
                        }
                        return true;
                    }
                }, JobHelper.shouldRetryPredicate(), (int)8);
            }
            catch (Exception e) {
                throw Throwables.propagate((Throwable)e);
            }
        }
    }

    public static final Predicate<Throwable> shouldRetryPredicate() {
        return new Predicate<Throwable>(){

            public boolean apply(Throwable input) {
                if (input == null) {
                    return false;
                }
                if (input instanceof IOException) {
                    return true;
                }
                return this.apply(input.getCause());
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void addJarToClassPath(File jarFile, Path distributedClassPath, Path intermediateClassPath, FileSystem fs, Job job) throws IOException {
        Path hdfsPath;
        block18: {
            fs.mkdirs(distributedClassPath);
            hdfsPath = new Path(distributedClassPath, jarFile.getName());
            if (!fs.exists(hdfsPath)) {
                Path intermediateHdfsPath = new Path(intermediateClassPath, jarFile.getName());
                JobHelper.uploadJar(jarFile, intermediateHdfsPath, fs);
                IOException exception = null;
                try {
                    log.info("Renaming jar to path[%s]", new Object[]{hdfsPath});
                    fs.rename(intermediateHdfsPath, hdfsPath);
                    if (!fs.exists(hdfsPath)) {
                        throw new IOException(String.format("File does not exist even after moving from[%s] to [%s]", intermediateHdfsPath, hdfsPath));
                    }
                }
                catch (IOException e) {
                    try {
                        if (!fs.exists(hdfsPath)) {
                            log.error((Throwable)e, "IOException while Renaming jar file", new Object[0]);
                            exception = e;
                        }
                    }
                    catch (IOException e1) {
                        e.addSuppressed(e1);
                        exception = e;
                    }
                    {
                    }
                }
                finally {
                    try {
                        if (fs.exists(intermediateHdfsPath)) {
                            fs.delete(intermediateHdfsPath, false);
                        }
                    }
                    catch (IOException e) {
                        if (exception == null) {
                            exception = e;
                        }
                        exception.addSuppressed(e);
                    }
                    if (exception == null) break block18;
                    throw exception;
                }
            }
        }
        job.addFileToClassPath(hdfsPath);
    }

    static void addSnapshotJarToClassPath(File jarFile, Path intermediateClassPath, FileSystem fs, Job job) throws IOException {
        Path hdfsPath = new Path(intermediateClassPath, jarFile.getName());
        if (!fs.exists(hdfsPath)) {
            JobHelper.uploadJar(jarFile, hdfsPath, fs);
        }
        job.addFileToClassPath(hdfsPath);
    }

    static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException {
        log.info("Uploading jar to path[%s]", new Object[]{path});
        ByteStreams.copy((InputSupplier)com.google.common.io.Files.newInputStreamSupplier((File)jarFile), (OutputSupplier)new OutputSupplier<OutputStream>(){

            public OutputStream getOutput() throws IOException {
                return fs.create(path);
            }
        });
    }

    static boolean isSnapshot(File jarFile) {
        return SNAPSHOT_JAR.matcher(jarFile.getName()).matches();
    }

    public static void injectSystemProperties(Job job) {
        JobHelper.injectSystemProperties(job.getConfiguration());
    }

    public static Configuration injectSystemProperties(Configuration conf) {
        for (String propName : System.getProperties().stringPropertyNames()) {
            if (!propName.startsWith("hadoop.")) continue;
            conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
        }
        return conf;
    }

    public static void ensurePaths(HadoopDruidIndexerConfig config) {
        JobHelper.authenticate(config);
        try {
            Job job = Job.getInstance((Configuration)new Configuration(), (String)String.format("%s-determine_partitions-%s", config.getDataSource(), config.getIntervals()));
            job.getConfiguration().set("io.sort.record.percent", "0.19");
            JobHelper.injectSystemProperties(job);
            config.addJobProperties(job);
            config.addInputPaths(job);
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config) {
        String failedMessage = null;
        for (Jobby job : jobs) {
            if (failedMessage != null || job.run()) continue;
            failedMessage = String.format("Job[%s] failed!", job.getClass());
        }
        if (!config.getSchema().getTuningConfig().isLeaveIntermediate() && (failedMessage == null || config.getSchema().getTuningConfig().isCleanupOnFailure().booleanValue())) {
            Path workingPath = config.makeIntermediatePath();
            log.info("Deleting path[%s]", new Object[]{workingPath});
            try {
                workingPath.getFileSystem(JobHelper.injectSystemProperties(new Configuration())).delete(workingPath, true);
            }
            catch (IOException e) {
                log.error((Throwable)e, "Failed to cleanup path[%s]", new Object[]{workingPath});
            }
        }
        if (failedMessage != null) {
            throw new ISE(failedMessage, new Object[0]);
        }
        return true;
    }

    public static DataSegment serializeOutIndex(DataSegment segmentTemplate, Configuration configuration, final Progressable progressable, TaskAttemptID taskAttemptID, final File mergedBase, Path segmentBasePath) throws IOException {
        ImmutableMap loadSpec;
        final FileSystem outputFS = FileSystem.get((URI)segmentBasePath.toUri(), (Configuration)configuration);
        final Path tmpPath = new Path(segmentBasePath, String.format("index.zip.%d", taskAttemptID.getId()));
        final AtomicLong size = new AtomicLong(0L);
        DataPusher zipPusher = (DataPusher)RetryProxy.create(DataPusher.class, (Object)new DataPusher(){

            @Override
            public long push() throws IOException {
                try (FSDataOutputStream outputStream = outputFS.create(tmpPath, true, 262144, progressable);){
                    size.set(JobHelper.zipAndCopyDir(mergedBase, (OutputStream)outputStream, progressable));
                    outputStream.flush();
                }
                catch (IOException | RuntimeException exception) {
                    log.error((Throwable)exception, "Exception in retry loop", new Object[0]);
                    throw exception;
                }
                return -1L;
            }
        }, (RetryPolicy)RetryPolicies.exponentialBackoffRetry((int)8, (long)2L, (TimeUnit)TimeUnit.SECONDS));
        zipPusher.push();
        log.info("Zipped %,d bytes to [%s]", new Object[]{size.get(), tmpPath.toUri()});
        Path finalIndexZipFilePath = new Path(segmentBasePath, "index.zip");
        URI indexOutURI = finalIndexZipFilePath.toUri();
        switch (outputFS.getScheme()) {
            case "hdfs": 
            case "viewfs": 
            case "gs": {
                loadSpec = ImmutableMap.of((Object)"type", (Object)"hdfs", (Object)"path", (Object)indexOutURI.toString());
                break;
            }
            case "s3": 
            case "s3n": {
                loadSpec = ImmutableMap.of((Object)"type", (Object)"s3_zip", (Object)"bucket", (Object)indexOutURI.getHost(), (Object)"key", (Object)indexOutURI.getPath().substring(1));
                break;
            }
            case "file": {
                loadSpec = ImmutableMap.of((Object)"type", (Object)"local", (Object)"path", (Object)indexOutURI.getPath());
                break;
            }
            default: {
                throw new IAE("Unknown file system scheme [%s]", new Object[]{outputFS.getScheme()});
            }
        }
        DataSegment finalSegment = segmentTemplate.withLoadSpec((Map)loadSpec).withSize(size.get()).withBinaryVersion(SegmentUtils.getVersionFromDir((File)mergedBase));
        if (!JobHelper.renameIndexFiles(outputFS, tmpPath, finalIndexZipFilePath)) {
            throw new IOException(String.format("Unable to rename [%s] to [%s]", tmpPath.toUri().toString(), finalIndexZipFilePath.toUri().toString()));
        }
        JobHelper.writeSegmentDescriptor(outputFS, finalSegment, new Path(segmentBasePath, "descriptor.json"), progressable);
        return finalSegment;
    }

    public static void writeSegmentDescriptor(final FileSystem outputFS, final DataSegment segment, final Path descriptorPath, final Progressable progressable) throws IOException {
        DataPusher descriptorPusher = (DataPusher)RetryProxy.create(DataPusher.class, (Object)new DataPusher(){

            @Override
            public long push() throws IOException {
                try {
                    progressable.progress();
                    if (outputFS.exists(descriptorPath) && !outputFS.delete(descriptorPath, false)) {
                        throw new IOException(String.format("Failed to delete descriptor at [%s]", descriptorPath));
                    }
                    try (FSDataOutputStream descriptorOut = outputFS.create(descriptorPath, true, 262144, progressable);){
                        HadoopDruidIndexerConfig.JSON_MAPPER.writeValue((OutputStream)descriptorOut, (Object)segment);
                        descriptorOut.flush();
                    }
                }
                catch (IOException | RuntimeException ex) {
                    log.info((Throwable)ex, "Exception in descriptor pusher retry loop", new Object[0]);
                    throw ex;
                }
                return -1L;
            }
        }, (RetryPolicy)RetryPolicies.exponentialBackoffRetry((int)8, (long)2L, (TimeUnit)TimeUnit.SECONDS));
        descriptorPusher.push();
    }

    public static long zipAndCopyDir(File baseDir, OutputStream baseOutputStream, Progressable progressable) throws IOException {
        long size = 0L;
        try (ZipOutputStream outputStream = new ZipOutputStream(baseOutputStream);){
            List<String> filesToCopy = Arrays.asList(baseDir.list());
            for (String fileName : filesToCopy) {
                File fileToCopy = new File(baseDir, fileName);
                if (Files.isRegularFile(fileToCopy.toPath(), new LinkOption[0])) {
                    size += JobHelper.copyFileToZipStream(fileToCopy, outputStream, progressable);
                    continue;
                }
                log.warn("File at [%s] is not a regular file! skipping as part of zip", new Object[]{fileToCopy.getPath()});
            }
            outputStream.flush();
        }
        return size;
    }

    public static long copyFileToZipStream(File file, ZipOutputStream zipOutputStream, Progressable progressable) throws IOException {
        JobHelper.createNewZipEntry(zipOutputStream, file);
        long numRead = 0L;
        try (FileInputStream inputStream = new FileInputStream(file);){
            byte[] buf = new byte[65536];
            int bytesRead = inputStream.read(buf);
            while (bytesRead >= 0) {
                progressable.progress();
                if (bytesRead != 0) {
                    zipOutputStream.write(buf, 0, bytesRead);
                    progressable.progress();
                    numRead += (long)bytesRead;
                }
                bytesRead = inputStream.read(buf);
            }
        }
        zipOutputStream.closeEntry();
        progressable.progress();
        return numRead;
    }

    private static void createNewZipEntry(ZipOutputStream out, File file) throws IOException {
        log.info("Creating new ZipEntry[%s]", new Object[]{file.getName()});
        out.putNextEntry(new ZipEntry(file.getName()));
    }

    public static Path makeSegmentOutputPath(Path basePath, FileSystem fileSystem, DataSegment segment) {
        String segmentDir = "hdfs".equals(fileSystem.getScheme()) || "viewfs".equals(fileSystem.getScheme()) ? DataSegmentPusherUtil.getHdfsStorageDir((DataSegment)segment) : DataSegmentPusherUtil.getStorageDir((DataSegment)segment);
        return new Path(JobHelper.prependFSIfNullScheme(fileSystem, basePath), String.format("./%s", segmentDir));
    }

    private static boolean renameIndexFiles(final FileSystem outputFS, final Path indexZipFilePath, final Path finalIndexZipFilePath) {
        try {
            return (Boolean)RetryUtils.retry((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    boolean needRename;
                    if (outputFS.exists(finalIndexZipFilePath)) {
                        FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath);
                        FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath);
                        if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime() || zipFile.getLen() != finalIndexZipFile.getLen()) {
                            log.info("File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]", new Object[]{finalIndexZipFile.getPath(), new DateTime(finalIndexZipFile.getModificationTime()), finalIndexZipFile.getLen(), zipFile.getPath(), new DateTime(zipFile.getModificationTime()), zipFile.getLen()});
                            outputFS.delete(finalIndexZipFilePath, false);
                            needRename = true;
                        } else {
                            log.info("File[%s / %s / %sB] existed and will be kept", new Object[]{finalIndexZipFile.getPath(), new DateTime(finalIndexZipFile.getModificationTime()), finalIndexZipFile.getLen()});
                            needRename = false;
                        }
                    } else {
                        needRename = true;
                    }
                    if (needRename) {
                        log.info("Attempting rename from [%s] to [%s]", new Object[]{indexZipFilePath, finalIndexZipFilePath});
                        return outputFS.rename(indexZipFilePath, finalIndexZipFilePath);
                    }
                    return true;
                }
            }, (Predicate)FileUtils.IS_EXCEPTION, (int)8);
        }
        catch (Exception e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public static Path prependFSIfNullScheme(FileSystem fs, Path path) {
        if (path.toUri().getScheme() == null) {
            path = fs.makeQualified(path);
        }
        return path;
    }

    public static long unzipNoGuava(final Path zip, final Configuration configuration, final File outDir, final Progressable progressable) throws IOException {
        DataPusher zipPusher = (DataPusher)RetryProxy.create(DataPusher.class, (Object)new DataPusher(){

            @Override
            public long push() throws IOException {
                try {
                    FileSystem fileSystem = zip.getFileSystem(configuration);
                    long size = 0L;
                    byte[] buffer = new byte[8192];
                    progressable.progress();
                    try (ZipInputStream in = new ZipInputStream((InputStream)fileSystem.open(zip, 8192));){
                        ZipEntry entry = in.getNextEntry();
                        while (entry != null) {
                            String fileName = entry.getName();
                            try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(outDir.getAbsolutePath() + File.separator + fileName), 8192);){
                                int len = in.read(buffer);
                                while (len >= 0) {
                                    progressable.progress();
                                    if (len != 0) {
                                        size += (long)len;
                                        ((OutputStream)out).write(buffer, 0, len);
                                    }
                                    len = in.read(buffer);
                                }
                                ((OutputStream)out).flush();
                            }
                            entry = in.getNextEntry();
                        }
                    }
                    progressable.progress();
                    return size;
                }
                catch (IOException | RuntimeException exception) {
                    log.error((Throwable)exception, "Exception in unzip retry loop", new Object[0]);
                    throw exception;
                }
            }
        }, (RetryPolicy)RetryPolicies.exponentialBackoffRetry((int)8, (long)2L, (TimeUnit)TimeUnit.SECONDS));
        return zipPusher.push();
    }

    public static URI getURIFromSegment(DataSegment dataSegment) {
        URI segmentLocURI;
        Map loadSpec = dataSegment.getLoadSpec();
        String type = loadSpec.get("type").toString();
        if ("s3_zip".equals(type)) {
            segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
        } else if ("hdfs".equals(type)) {
            segmentLocURI = URI.create(loadSpec.get("path").toString());
        } else if ("local".equals(type)) {
            try {
                segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);
            }
            catch (URISyntaxException e) {
                throw new ISE((Throwable)e, "Unable to form simple file uri", new Object[0]);
            }
        } else {
            try {
                throw new IAE("Cannot figure out loadSpec %s", new Object[]{HadoopDruidConverterConfig.jsonMapper.writeValueAsString((Object)loadSpec)});
            }
            catch (JsonProcessingException e) {
                throw new ISE("Cannot write Map with json mapper", new Object[0]);
            }
        }
        return segmentLocURI;
    }

    public static ProgressIndicator progressIndicatorForContext(final TaskAttemptContext context) {
        return new ProgressIndicator(){

            public void progress() {
                context.progress();
            }

            public void start() {
                context.progress();
                context.setStatus("STARTED");
            }

            public void stop() {
                context.progress();
                context.setStatus("STOPPED");
            }

            public void startSection(String section) {
                context.progress();
                context.setStatus(String.format("STARTED [%s]", section));
            }

            public void progressSection(String section, String message) {
                log.info("Progress message for section [%s] : [%s]", new Object[]{section, message});
                context.progress();
                context.setStatus(String.format("PROGRESS [%s]", section));
            }

            public void stopSection(String section) {
                context.progress();
                context.setStatus(String.format("STOPPED [%s]", section));
            }
        };
    }

    public static boolean deleteWithRetry(final FileSystem fs, final Path path, final boolean recursive) {
        try {
            return (Boolean)RetryUtils.retry((Callable)new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    return fs.delete(path, recursive);
                }
            }, JobHelper.shouldRetryPredicate(), (int)8);
        }
        catch (Exception e) {
            log.error((Throwable)e, "Failed to cleanup path[%s]", new Object[]{path});
            throw Throwables.propagate((Throwable)e);
        }
    }

    public static interface DataPusher {
        public long push() throws IOException;
    }
}

