/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.hive.HiveIcebergRecordWriter;
import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
import org.apache.iceberg.mr.hive.TezUtil;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveIcebergOutputCommitter
extends OutputCommitter {
    private static final String FOR_COMMIT_EXTENSION = ".forCommit";
    private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);

    public void setupJob(JobContext jobContext) {
    }

    public void setupTask(TaskAttemptContext taskAttemptContext) {
    }

    public boolean needsTaskCommit(TaskAttemptContext context) {
        return TaskType.REDUCE.equals((Object)context.getTaskAttemptID().getTaskID().getTaskType()) || context.getJobConf().getNumReduceTasks() == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commitTask(TaskAttemptContext originalContext) throws IOException {
        TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
        TaskAttemptID attemptID = context.getTaskAttemptID();
        JobConf jobConf = context.getJobConf();
        Collection<String> outputs = HiveIcebergStorageHandler.outputTables((Configuration)context.getJobConf());
        Map writers = Optional.ofNullable(HiveIcebergRecordWriter.getWriters(attemptID)).orElseGet(() -> {
            LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", (Object)outputs, (Object)attemptID);
            return ImmutableMap.of();
        });
        ExecutorService tableExecutor = HiveIcebergOutputCommitter.tableExecutor((Configuration)jobConf, outputs.size());
        try {
            Tasks.foreach(outputs).retry(3).stopOnFailure().throwFailureWhenFinished().executeWith(tableExecutor).run(output -> {
                Table table = HiveIcebergStorageHandler.table((Configuration)context.getJobConf(), output);
                if (table != null) {
                    DataFile[] closedFiles;
                    HiveIcebergRecordWriter writer = (HiveIcebergRecordWriter)writers.get(output);
                    if (writer != null) {
                        closedFiles = writer.dataFiles();
                    } else {
                        LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", output, (Object)attemptID);
                        closedFiles = new DataFile[]{};
                    }
                    String fileForCommitLocation = HiveIcebergOutputCommitter.generateFileForCommitLocation(table.location(), (Configuration)jobConf, (JobID)attemptID.getJobID(), attemptID.getTaskID().getId());
                    HiveIcebergOutputCommitter.createFileForCommit(closedFiles, fileForCommitLocation, table.io());
                } else {
                    LOG.info("CommitTask found no serialized table in config for table: {}.", output);
                }
            }, IOException.class);
        }
        finally {
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
        }
        HiveIcebergRecordWriter.removeWriters(attemptID);
    }

    public void abortTask(TaskAttemptContext originalContext) throws IOException {
        TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
        Map<String, HiveIcebergRecordWriter> writers = HiveIcebergRecordWriter.removeWriters(context.getTaskAttemptID());
        if (writers != null) {
            for (HiveIcebergRecordWriter writer : writers.values()) {
                writer.close(true);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commitJob(JobContext originalContext) throws IOException {
        JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
        JobConf jobConf = jobContext.getJobConf();
        long startTime = System.currentTimeMillis();
        LOG.info("Committing job {} has started", (Object)jobContext.getJobID());
        Collection<String> outputs = HiveIcebergStorageHandler.outputTables((Configuration)jobContext.getJobConf());
        ConcurrentLinkedQueue<String> jobLocations = new ConcurrentLinkedQueue<String>();
        ExecutorService fileExecutor = HiveIcebergOutputCommitter.fileExecutor((Configuration)jobConf);
        ExecutorService tableExecutor = HiveIcebergOutputCommitter.tableExecutor((Configuration)jobConf, outputs.size());
        try {
            Tasks.foreach(outputs).throwFailureWhenFinished().stopOnFailure().executeWith(tableExecutor).run(output -> {
                Table table = HiveIcebergStorageHandler.table((Configuration)jobConf, output);
                if (table != null) {
                    String catalogName = HiveIcebergStorageHandler.catalogName((Configuration)jobConf, output);
                    jobLocations.add(HiveIcebergOutputCommitter.generateJobLocation(table.location(), (Configuration)jobConf, jobContext.getJobID()));
                    this.commitTable(table.io(), fileExecutor, jobContext, (String)output, table.location(), catalogName);
                } else {
                    LOG.info("CommitJob found no serialized table in config for table: {}. Skipping job commit.", output);
                }
            });
        }
        finally {
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
        }
        LOG.info("Commit took {} ms for job {}", (Object)(System.currentTimeMillis() - startTime), (Object)jobContext.getJobID());
        this.cleanup(jobContext, jobLocations);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abortJob(JobContext originalContext, int status) throws IOException {
        JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
        JobConf jobConf = jobContext.getJobConf();
        LOG.info("Job {} is aborted. Data file cleaning started", (Object)jobContext.getJobID());
        Collection<String> outputs = HiveIcebergStorageHandler.outputTables((Configuration)jobContext.getJobConf());
        ConcurrentLinkedQueue<String> jobLocations = new ConcurrentLinkedQueue<String>();
        ExecutorService fileExecutor = HiveIcebergOutputCommitter.fileExecutor((Configuration)jobConf);
        ExecutorService tableExecutor = HiveIcebergOutputCommitter.tableExecutor((Configuration)jobConf, outputs.size());
        try {
            Tasks.foreach(outputs).suppressFailureWhenFinished().executeWith(tableExecutor).onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, (Object)exc)).run(output -> {
                LOG.info("Cleaning table {} with job id {}", output, (Object)jobContext.getJobID());
                Table table = HiveIcebergStorageHandler.table((Configuration)jobConf, output);
                jobLocations.add(HiveIcebergOutputCommitter.generateJobLocation(table.location(), (Configuration)jobConf, jobContext.getJobID()));
                Collection<DataFile> dataFiles = HiveIcebergOutputCommitter.dataFiles(fileExecutor, table.location(), jobContext, table.io(), false);
                if (!dataFiles.isEmpty()) {
                    Tasks.foreach(dataFiles).retry(3).suppressFailureWhenFinished().executeWith(fileExecutor).onFailure((file, exc) -> LOG.warn("Failed to remove data file {} on abort job", (Object)file.path(), (Object)exc)).run(file -> table.io().deleteFile(file.path().toString()));
                }
            });
        }
        finally {
            fileExecutor.shutdown();
            if (tableExecutor != null) {
                tableExecutor.shutdown();
            }
        }
        LOG.info("Job {} is aborted. Data file cleaning finished", (Object)jobContext.getJobID());
        this.cleanup(jobContext, jobLocations);
    }

    private void commitTable(FileIO io, ExecutorService executor, JobContext jobContext, String name, String location, String catalogName) {
        JobConf conf = jobContext.getJobConf();
        Properties catalogProperties = new Properties();
        catalogProperties.put("name", name);
        catalogProperties.put("location", location);
        if (catalogName != null) {
            catalogProperties.put("iceberg.catalog", catalogName);
        }
        Table table = Catalogs.loadTable((Configuration)conf, catalogProperties);
        long startTime = System.currentTimeMillis();
        LOG.info("Committing job has started for table: {}, using location: {}", (Object)table, (Object)HiveIcebergOutputCommitter.generateJobLocation(location, (Configuration)conf, jobContext.getJobID()));
        Collection<DataFile> dataFiles = HiveIcebergOutputCommitter.dataFiles(executor, location, jobContext, io, true);
        if (!dataFiles.isEmpty()) {
            AppendFiles append = table.newAppend();
            dataFiles.forEach(append::appendFile);
            append.commit();
            LOG.info("Commit took {} ms for table: {} with {} file(s)", new Object[]{System.currentTimeMillis() - startTime, table, dataFiles.size()});
            LOG.debug("Added files {}", dataFiles);
        } else {
            LOG.info("Commit took {} ms for table: {} with no new files", (Object)(System.currentTimeMillis() - startTime), (Object)table);
        }
    }

    private void cleanup(JobContext jobContext, Collection<String> jobLocations) throws IOException {
        JobConf jobConf = jobContext.getJobConf();
        LOG.info("Cleaning for job {} started", (Object)jobContext.getJobID());
        Tasks.foreach(jobLocations).retry(3).suppressFailureWhenFinished().onFailure((jobLocation, exc) -> LOG.debug("Failed to remove directory {} on job cleanup", jobLocation, (Object)exc)).run(jobLocation -> {
            LOG.info("Cleaning location: {}", jobLocation);
            Path toDelete = new Path(jobLocation);
            FileSystem fs = Util.getFs(toDelete, (Configuration)jobConf);
            fs.delete(toDelete, true);
        }, IOException.class);
        LOG.info("Cleaning for job {} finished", (Object)jobContext.getJobID());
    }

    private static ExecutorService fileExecutor(Configuration conf) {
        int size = conf.getInt("iceberg.mr.commit.file.thread.pool.size", 10);
        return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder().setDaemon(true).setPriority(5).setNameFormat("iceberg-commit-file-pool-%d").build());
    }

    private static ExecutorService tableExecutor(Configuration conf, int maxThreadNum) {
        int size = conf.getInt("iceberg.mr.commit.table.thread.pool.size", 10);
        if ((size = Math.min(maxThreadNum, size)) > 1) {
            return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder().setDaemon(true).setPriority(5).setNameFormat("iceberg-commit-table-pool-%d").build());
        }
        return null;
    }

    private static Collection<DataFile> dataFiles(ExecutorService executor, String location, JobContext jobContext, FileIO io, boolean throwOnFailure) {
        JobConf conf = jobContext.getJobConf();
        int expectedFiles = conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
        ConcurrentLinkedQueue<DataFile> dataFiles = new ConcurrentLinkedQueue<DataFile>();
        Tasks.range(expectedFiles).throwFailureWhenFinished(throwOnFailure).executeWith(executor).retry(3).run(taskId -> {
            String taskFileName = HiveIcebergOutputCommitter.generateFileForCommitLocation(location, (Configuration)conf, jobContext.getJobID(), taskId);
            dataFiles.addAll(Arrays.asList(HiveIcebergOutputCommitter.readFileForCommit(taskFileName, io)));
        });
        return dataFiles;
    }

    @VisibleForTesting
    static String generateJobLocation(String location, Configuration conf, JobID jobId) {
        String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
        return location + "/temp/" + queryId + "-" + jobId;
    }

    private static String generateFileForCommitLocation(String location, Configuration conf, JobID jobId, int taskId) {
        return HiveIcebergOutputCommitter.generateJobLocation(location, conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION;
    }

    private static void createFileForCommit(DataFile[] closedFiles, String location, FileIO io) throws IOException {
        OutputFile fileForCommit = io.newOutputFile(location);
        try (ObjectOutputStream oos = new ObjectOutputStream(fileForCommit.createOrOverwrite());){
            oos.writeObject(closedFiles);
        }
        LOG.debug("Iceberg committed file is created {}", (Object)fileForCommit);
    }

    private static DataFile[] readFileForCommit(String fileForCommitLocation, FileIO io) {
        DataFile[] dataFileArray;
        ObjectInputStream ois = new ObjectInputStream(io.newInputFile(fileForCommitLocation).newStream());
        try {
            dataFileArray = (DataFile[])ois.readObject();
        }
        catch (Throwable throwable) {
            try {
                try {
                    ois.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException | ClassNotFoundException e) {
                throw new NotFoundException("Can not read or parse committed file: %s", fileForCommitLocation);
            }
        }
        ois.close();
        return dataFileArray;
    }
}

