/*
 * Decompiled with CFR 0.152.
 */
package alluxio.job.plan.persist;

import alluxio.AlluxioURI;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.file.FileInStream;
import alluxio.client.file.URIStatus;
import alluxio.collections.Pair;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.ReadPType;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.AbstractVoidPlanDefinition;
import alluxio.job.plan.persist.PersistConfig;
import alluxio.job.util.JobUtils;
import alluxio.job.util.SerializableVoid;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.CloseableResource;
import alluxio.security.authorization.Mode;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.CreateOptions;
import alluxio.underfs.options.MkdirsOptions;
import alluxio.wire.WorkerInfo;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class PersistDefinition
extends AbstractVoidPlanDefinition<PersistConfig, SerializableVoid> {
    private static final Logger LOG = LoggerFactory.getLogger(PersistDefinition.class);

    @Override
    public Set<Pair<WorkerInfo, SerializableVoid>> selectExecutors(PersistConfig config, List<WorkerInfo> jobWorkerInfoList, SelectExecutorsContext context) throws Exception {
        if (jobWorkerInfoList.isEmpty()) {
            throw new RuntimeException("No worker is available");
        }
        AlluxioURI uri = new AlluxioURI(config.getFilePath());
        List alluxioWorkerInfoList = context.getFsContext().getCachedWorkers();
        BlockWorkerInfo workerWithMostBlocks = JobUtils.getWorkerWithMostBlocks(alluxioWorkerInfoList, context.getFileSystem().getStatus(uri).getFileBlockInfos());
        HashSet result = Sets.newHashSet();
        boolean found = false;
        if (workerWithMostBlocks != null) {
            for (WorkerInfo workerInfo : jobWorkerInfoList) {
                if (!workerInfo.getAddress().getHost().equals(workerWithMostBlocks.getNetAddress().getHost())) continue;
                result.add(new Pair((Object)workerInfo, null));
                found = true;
                break;
            }
        }
        if (!found) {
            result.add(new Pair((Object)jobWorkerInfoList.get(new Random().nextInt(jobWorkerInfoList.size())), null));
        }
        return result;
    }

    @Override
    public SerializableVoid runTask(PersistConfig config, SerializableVoid args, RunTaskContext context) throws Exception {
        AlluxioURI uri = new AlluxioURI(config.getFilePath());
        String ufsPath = config.getUfsPath();
        UfsManager.UfsClient ufsClient = context.getUfsManager().get(config.getMountId());
        try (CloseableResource ufsResource = ufsClient.acquireUfsResource();){
            long bytesWritten;
            URIStatus uriStatus;
            UnderFileSystem ufs = (UnderFileSystem)ufsResource.get();
            if (ufs == null) {
                throw new IOException("Failed to create UFS instance for " + ufsPath);
            }
            if (ufs.exists(ufsPath)) {
                if (config.isOverwrite()) {
                    LOG.info("File {} is already persisted in UFS. Removing it.", (Object)config.getFilePath());
                    ufs.deleteExistingFile(ufsPath);
                } else {
                    throw new IOException("File " + config.getFilePath() + " is already persisted in UFS, to overwrite the file, please set the overwrite flag in the config.");
                }
            }
            if (!(uriStatus = context.getFileSystem().getStatus(uri)).isCompleted()) {
                throw new IOException("Cannot persist an incomplete Alluxio file: " + uri);
            }
            try (Closer closer = Closer.create();){
                OpenFilePOptions options = OpenFilePOptions.newBuilder().setReadType(ReadPType.NO_CACHE).setUpdateLastAccessTime(false).build();
                FileInStream in = (FileInStream)closer.register((Closeable)context.getFileSystem().openFile(uri, options));
                AlluxioURI dstPath = new AlluxioURI(ufsPath);
                Stack<Pair> ancestorUfsAndAlluxioPaths = new Stack<Pair>();
                AlluxioURI curAlluxioPath = uri.getParent();
                AlluxioURI curUfsPath = dstPath.getParent();
                while (!ufs.isDirectory(curUfsPath.toString()) && curAlluxioPath != null) {
                    ancestorUfsAndAlluxioPaths.push(new Pair((Object)curUfsPath.toString(), (Object)curAlluxioPath.toString()));
                    curAlluxioPath = curAlluxioPath.getParent();
                    curUfsPath = curUfsPath.getParent();
                }
                while (!ancestorUfsAndAlluxioPaths.empty()) {
                    Pair ancestorUfsAndAlluxioPath = (Pair)ancestorUfsAndAlluxioPaths.pop();
                    String ancestorUfsPath = (String)ancestorUfsAndAlluxioPath.getFirst();
                    String ancestorAlluxioPath = (String)ancestorUfsAndAlluxioPath.getSecond();
                    URIStatus status = context.getFileSystem().getStatus(new AlluxioURI(ancestorAlluxioPath));
                    MkdirsOptions mkdirOptions = MkdirsOptions.defaults((AlluxioConfiguration)Configuration.global()).setCreateParent(false).setOwner(status.getOwner()).setGroup(status.getGroup()).setMode(new Mode((short)status.getMode()));
                    if (ufs.mkdirs(ancestorUfsPath, mkdirOptions)) {
                        List allAcls = Stream.concat(status.getDefaultAcl().getEntries().stream(), status.getAcl().getEntries().stream()).collect(Collectors.toList());
                        ufs.setAclEntries(ancestorUfsPath, allAcls);
                        continue;
                    }
                    if (ufs.isDirectory(ancestorUfsPath)) continue;
                    throw new IOException("Failed to create " + ufsPath + " with permission " + options + " because its ancestor " + ancestorUfsPath + " is not a directory");
                }
                OutputStream out = (OutputStream)closer.register((Closeable)ufs.createNonexistingFile(dstPath.toString(), CreateOptions.defaults((AlluxioConfiguration)Configuration.global()).setOwner(uriStatus.getOwner()).setGroup(uriStatus.getGroup()).setMode(new Mode((short)uriStatus.getMode()))));
                URIStatus status = context.getFileSystem().getStatus(uri);
                List allAcls = Stream.concat(status.getDefaultAcl().getEntries().stream(), status.getAcl().getEntries().stream()).collect(Collectors.toList());
                ufs.setAclEntries(dstPath.toString(), allAcls);
                bytesWritten = IOUtils.copyLarge((InputStream)in, (OutputStream)out, (byte[])new byte[0x800000]);
                this.incrementPersistedMetric(ufsClient.getUfsMountPointUri(), bytesWritten);
            }
            LOG.info("Persisted file {} with size {}", (Object)ufsPath, (Object)bytesWritten);
        }
        return null;
    }

    private void incrementPersistedMetric(AlluxioURI ufsMountPointUri, long bytes) {
        String mountPoint = MetricsSystem.escape((AlluxioURI)ufsMountPointUri);
        String metricName = String.format("BytesPersisted-Ufs:%s", mountPoint);
        MetricsSystem.counter((String)metricName).inc(bytes);
    }

    @Override
    public Class<PersistConfig> getJobConfigClass() {
        return PersistConfig.class;
    }
}

