/*
 * Decompiled with CFR 0.152.
 */
package com.ksyun.kmr.hadoop.fs.ks3.committer;

import com.ksyun.kmr.hadoop.fs.ks3.Ks3FileStatus;
import com.ksyun.kmr.hadoop.fs.ks3.Ks3FileSystem;
import com.ksyun.kmr.hadoop.fs.ks3.Ks3FileSystemStore;
import com.ksyun.kmr.hadoop.fs.ks3.ListObjectsResult;
import com.ksyun.kmr.hadoop.fs.ks3.committer.PendingCommit;
import com.ksyun.kmr.hadoop.fs.ks3.committer.PendingCommitList;
import com.ksyun.kmr.hadoop.fs.ks3.parallel.EngineShutter;
import com.ksyun.kmr.hadoop.fs.ks3.parallel.conveyor.CommitAction;
import com.ksyun.kmr.hadoop.fs.ks3.parallel.conveyor.DestroyAction;
import com.ksyun.kmr.hadoop.fs.ks3.requestbuilder.ListFileStatus;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;

@InterfaceAudience.Public
@InterfaceStability.Stable
public class CommitInfoFileCommitter
extends FileOutputCommitter {
    private static final Log LOG = LogFactory.getLog(CommitInfoFileCommitter.class);
    public static final String PENDING_DIR_NAME = "_direct_output_committer_temporary";
    public static final String TASK_DIR_NAME = "_direct_output_committer_tasks";
    public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
    public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs";
    private Path outputPath = null;
    private Path workPath = null;
    private Ks3FileSystem fs;
    private Ks3FileSystemStore store;

    public CommitInfoFileCommitter(Path outputPath, JobContext context) throws IOException {
        super(outputPath, context);
        if (outputPath != null) {
            this.fs = (Ks3FileSystem)FileSystem.get((URI)outputPath.toUri(), (Configuration)context.getConfiguration());
            this.store = this.fs.getStore();
            this.outputPath = this.fs.makeQualified(outputPath);
            if (context.getClass().toString().contains("TaskAttemptContext")) {
                this.workPath = CommitInfoFileCommitter.getTaskAttemptPath((TaskAttemptContext)context, outputPath);
            }
        }
    }

    private Path getPendingJobAttemptsPath() {
        return CommitInfoFileCommitter.getPendingJobAttemptsPath(this.getOutputPath());
    }

    private static Path getPendingJobAttemptsPath(Path out) {
        return new Path(out, PENDING_DIR_NAME);
    }

    private static int getAppAttemptId(JobContext context) {
        return context.getConfiguration().getInt("mapreduce.job.application.attempt.id", 0);
    }

    public Path getJobAttemptPath(JobContext context) {
        return CommitInfoFileCommitter.getJobAttemptPath(context, this.getOutputPath());
    }

    public static Path getJobAttemptPath(JobContext context, Path out) {
        return CommitInfoFileCommitter.getJobAttemptPath(CommitInfoFileCommitter.getAppAttemptId(context), out);
    }

    protected Path getJobAttemptPath(int appAttemptId) {
        return CommitInfoFileCommitter.getJobAttemptPath(appAttemptId, this.getOutputPath());
    }

    private static Path getJobAttemptPath(int appAttemptId, Path out) {
        return new Path(CommitInfoFileCommitter.getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
    }

    private Path getPendingTaskAttemptsPath(JobContext context) {
        return CommitInfoFileCommitter.getPendingTaskAttemptsPath(context, this.getOutputPath());
    }

    private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
        return new Path(CommitInfoFileCommitter.getJobAttemptPath(context, out), TASK_DIR_NAME);
    }

    public Path getTaskAttemptPath(TaskAttemptContext context) {
        return new Path(this.getPendingTaskAttemptsPath((JobContext)context), String.valueOf(context.getTaskAttemptID()));
    }

    public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
        return new Path(CommitInfoFileCommitter.getPendingTaskAttemptsPath((JobContext)context, out), String.valueOf(context.getTaskAttemptID()));
    }

    public Path getCommittedTaskPath(TaskAttemptContext context) {
        return this.getCommittedTaskPath(CommitInfoFileCommitter.getAppAttemptId((JobContext)context), context);
    }

    protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
        return new Path(this.getJobAttemptPath(appAttemptId), String.valueOf(context.getTaskAttemptID().getTaskID()));
    }

    public Path getWorkPath() {
        return this.workPath;
    }

    public void setupJob(JobContext context) throws IOException {
        this.cleanupJob(context);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commitJob(JobContext context) throws IOException {
        Path jobAttemptPath = this.getJobAttemptPath(context);
        Ks3FileStatus jobAttemptDirStatus = null;
        try {
            jobAttemptDirStatus = this.fs.getFileStatus(jobAttemptPath);
        }
        catch (FileNotFoundException e) {
            LOG.warn((Object)"No Output found for commitJob");
        }
        if (jobAttemptDirStatus != null) {
            ListFileStatus listFileStatus = new ListFileStatus(this.fs, jobAttemptPath, false, jobAttemptDirStatus);
            AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<Exception>();
            DestroyAction destroyAction = new DestroyAction(this.store, exceptionAtomicReference);
            CommitAction commitAction = new CommitAction(this.store, exceptionAtomicReference);
            try {
                commitAction.startEngines();
                destroyAction.startEngines();
                commitAction.sink = dataLoc -> destroyAction.sendData(dataLoc);
                listFileStatus.genStream(commitAction.getExceptionAtomicReference()).map(batch -> {
                    List<Ks3FileStatus> fss = listFileStatus.wrapToFileStatus((ListObjectsResult)batch, new PendingCommitListFilter());
                    LOG.info((Object)"wrap batch finished");
                    return fss;
                }).forEach(stats -> {
                    for (Ks3FileStatus stat : stats) {
                        if (commitAction.source().sendData(Collections.singletonMap("data", stat.getPath()))) continue;
                        break;
                    }
                    LOG.info((Object)"message batch finished");
                });
            }
            catch (Throwable throwable) {
                EngineShutter.shutdownAll(commitAction, destroyAction);
                throw throwable;
            }
            EngineShutter.shutdownAll(commitAction, destroyAction);
        }
        this.cleanupJob(context);
        if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) {
            Path markerPath = new Path(this.outputPath, SUCCEEDED_FILE_NAME);
            this.fs.create(markerPath).close();
        }
    }

    @Deprecated
    public void cleanupJob(JobContext context) throws IOException {
        Path pendingJobAttemptsPath = this.getPendingJobAttemptsPath();
        this.store.deleteDir(this.fs.pathToKey(pendingJobAttemptsPath), true);
    }

    public void abortJob(JobContext context, JobStatus.State state) throws IOException {
        this.cleanupJob(context);
    }

    public void setupTask(TaskAttemptContext context) throws IOException {
    }

    public void commitTask(TaskAttemptContext context) throws IOException {
        this.commitTask(context, null);
    }

    @InterfaceAudience.Private
    public void commitTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
        Ks3FileStatus taskAttemptDirStatus;
        context.progress();
        TaskAttemptID attemptId = context.getTaskAttemptID();
        taskAttemptPath = this.getTaskAttemptPath(context);
        try {
            taskAttemptDirStatus = this.fs.getFileStatus(taskAttemptPath);
        }
        catch (FileNotFoundException e) {
            taskAttemptDirStatus = null;
        }
        if (taskAttemptDirStatus != null) {
            Ks3FileStatus[] fss;
            Path committedTaskPath = this.getCommittedTaskPath(context);
            PendingCommitList pendingCommitList = new PendingCommitList();
            LinkedList<String> pendingCommitPaths = new LinkedList<String>();
            for (Ks3FileStatus status : fss = new ListFileStatus(this.fs, taskAttemptPath, true, taskAttemptDirStatus).listStatus(new PendingCommitFilter())) {
                Path itemPath = status.getPath();
                pendingCommitPaths.add(this.fs.pathToKey(itemPath));
                PendingCommit itemCommit = PendingCommit.load(this.fs, itemPath);
                pendingCommitList.data.add(itemCommit);
            }
            pendingCommitPaths.add(this.fs.pathToKey(taskAttemptPath) + "/");
            this.store.putObject(this.fs.pathToKey(committedTaskPath) + ".pending_commit_list", pendingCommitList.toBytes());
            this.store.deleteObjects(pendingCommitPaths);
        } else {
            LOG.warn((Object)("No Output found for " + attemptId));
        }
    }

    public void abortTask(TaskAttemptContext context) throws IOException {
        this.abortTask(context, null);
    }

    @InterfaceAudience.Private
    public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
        context.progress();
        taskAttemptPath = this.getTaskAttemptPath(context);
        this.store.deleteDir(this.fs.pathToKey(taskAttemptPath), true);
    }

    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
        return true;
    }

    @InterfaceAudience.Private
    public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
        return true;
    }

    @Deprecated
    public boolean isRecoverySupported() {
        return false;
    }

    public boolean isCommitJobRepeatable(JobContext context) throws IOException {
        return false;
    }

    public void recoverTask(TaskAttemptContext context) throws IOException {
        LOG.warn((Object)("Cannot recover task {" + context.getTaskAttemptID() + "}"));
        throw new IOException("unsupport recover task");
    }

    public static class PendingCommitListFilter
    implements PathFilter {
        public boolean accept(Path path) {
            return path.getName().endsWith(".pending_commit_list");
        }
    }

    public static class PendingCommitFilter
    implements PathFilter {
        public boolean accept(Path path) {
            return path.getName().endsWith(".pending_commit");
        }
    }
}

