/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.job.tracker;

import alluxio.AlluxioURI;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.URIStatus;
import alluxio.exception.AlluxioException;
import alluxio.grpc.ListStatusPOptions;
import alluxio.grpc.OperationType;
import alluxio.job.JobConfig;
import alluxio.job.plan.BatchedJobConfig;
import alluxio.job.plan.load.LoadConfig;
import alluxio.job.wire.JobSource;
import alluxio.master.job.JobMaster;
import alluxio.master.job.common.CmdInfo;
import alluxio.master.job.metrics.DistributedCmdMetrics;
import alluxio.master.job.tracker.AbstractCmdRunner;
import alluxio.master.job.tracker.CmdRunAttempt;
import alluxio.retry.CountingRetry;
import alluxio.retry.RetryPolicy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistLoadCliRunner
extends AbstractCmdRunner {
    private static final Logger LOG = LoggerFactory.getLogger(DistLoadCliRunner.class);

    public DistLoadCliRunner(FileSystemContext fsContext, JobMaster jobMaster) {
        super(fsContext, jobMaster);
    }

    public CmdInfo runDistLoad(int batchSize, AlluxioURI filePath, int replication, Set<String> workerSet, Set<String> excludedWorkerSet, Set<String> localityIds, Set<String> excludedLocalityIds, boolean directCache, long jobControlId) throws IOException {
        long submissionTime = System.currentTimeMillis();
        ArrayList<URIStatus> filePool = new ArrayList<URIStatus>(batchSize);
        ArrayList path = Lists.newArrayList((Object[])new String[]{filePath.getPath()});
        CmdInfo cmdInfo = new CmdInfo(jobControlId, OperationType.DIST_LOAD, JobSource.CLI, submissionTime, path);
        try {
            this.load(filePath, batchSize, replication, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache, filePool, cmdInfo);
        }
        catch (AlluxioException | IOException e) {
            LOG.warn(String.format("DistributedLoad job is failing for path = %s!", filePath.getPath()));
            LOG.error(e.getMessage());
            throw new IOException(e.getMessage());
        }
        if (filePool.size() > 0) {
            this.submitDistLoad(filePool, replication, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache, cmdInfo);
            filePool.clear();
        }
        return cmdInfo;
    }

    private void load(AlluxioURI filePath, int batchSize, int replication, Set<String> workerSet, Set<String> excludedWorkerSet, Set<String> localityIds, Set<String> excludedLocalityIds, boolean directCache, List<URIStatus> pool, CmdInfo cmdInfo) throws IOException, AlluxioException {
        ListStatusPOptions options = ListStatusPOptions.newBuilder().setRecursive(true).build();
        LongAdder incompleteCount = new LongAdder();
        this.mFileSystem.iterateStatus(filePath, options, uriStatus -> {
            if (!uriStatus.isFolder()) {
                if (!uriStatus.isCompleted()) {
                    incompleteCount.increment();
                    System.out.printf("Ignored load because: %s is in incomplete status", uriStatus.getPath());
                    return;
                }
                AlluxioURI fileURI = new AlluxioURI(uriStatus.getPath());
                if (uriStatus.getInAlluxioPercentage() == 100 && replication == 1) {
                    System.out.println(fileURI + " is already fully loaded in Alluxio");
                    return;
                }
                pool.add((URIStatus)uriStatus);
                if (pool.size() == batchSize) {
                    this.submitDistLoad(pool, replication, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache, cmdInfo);
                    pool.clear();
                }
            }
        });
        if (incompleteCount.longValue() > 0L) {
            System.out.printf("Ignore load %d paths because they are in incomplete status", incompleteCount.longValue());
        }
    }

    private void submitDistLoad(List<URIStatus> pool, int replication, Set<String> workerSet, Set<String> excludedWorkerSet, Set<String> localityIds, Set<String> excludedLocalityIds, boolean directCache, CmdInfo cmdInfo) {
        if (this.mSubmitted.size() >= 3000) {
            this.waitForCmdJob();
        }
        CmdRunAttempt attempt = new CmdRunAttempt((RetryPolicy)new CountingRetry(3), this.mJobMaster);
        this.setJobConfigAndFileMetrics(pool, replication, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache, attempt);
        this.mSubmitted.add(attempt);
        cmdInfo.addCmdRunAttempt(attempt);
        attempt.run();
    }

    protected void setJobConfigAndFileMetrics(List<URIStatus> filePath, int replication, Set<String> workerSet, Set<String> excludedWorkerSet, Set<String> localityIds, Set<String> excludedLocalityIds, boolean directCache, CmdRunAttempt attempt) {
        LoadConfig jobConfig;
        long fileCount = 0L;
        long fileSize = 0L;
        String filePathString = filePath.stream().map(URIStatus::getPath).collect(Collectors.joining(","));
        if (filePath.size() == 1) {
            URIStatus status = filePath.iterator().next();
            String source = status.getPath();
            jobConfig = new LoadConfig(source, Integer.valueOf(replication), workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, Boolean.valueOf(directCache));
            fileCount = 1L;
            fileSize = DistributedCmdMetrics.getFileSize(source, this.mFileSystem, (RetryPolicy)new CountingRetry(3));
        } else {
            HashSet configs = Sets.newHashSet();
            ObjectMapper oMapper = new ObjectMapper();
            for (URIStatus status : filePath) {
                LoadConfig loadConfig = new LoadConfig(status.getPath(), Integer.valueOf(replication), workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, Boolean.valueOf(directCache));
                Map map = (Map)oMapper.convertValue((Object)loadConfig, Map.class);
                configs.add(map);
                fileSize += DistributedCmdMetrics.getFileSize(status.getPath(), this.mFileSystem, (RetryPolicy)new CountingRetry(3));
            }
            fileCount = filePath.size();
            jobConfig = new BatchedJobConfig("Load", (Set)configs);
        }
        attempt.setFileCount(fileCount);
        attempt.setFileSize(fileSize);
        attempt.setConfig((JobConfig)jobConfig);
        attempt.setFilePath(filePathString);
    }
}

