/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.manager.tableOps.bulkVer1;

import com.google.common.annotations.VisibleForTesting;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.Utils;
import org.apache.accumulo.manager.tableOps.bulkVer1.LoadFiles;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.tablets.UniqueNameAllocator;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BulkImport
extends ManagerRepo {
    public static final String FAILURES_TXT = "failures.txt";
    private static final long serialVersionUID = 1L;
    private static final Logger log = LoggerFactory.getLogger(BulkImport.class);
    private TableId tableId;
    private String sourceDir;
    private String errorDir;
    private boolean setTime;

    public BulkImport(TableId tableId, String sourceDir, String errorDir, boolean setTime) {
        this.tableId = tableId;
        this.sourceDir = sourceDir;
        this.errorDir = errorDir;
        this.setTime = setTime;
    }

    @Override
    public long isReady(long tid, Manager manager) throws Exception {
        if (!Utils.getReadLock(manager, this.tableId, tid).tryLock()) {
            return 100L;
        }
        manager.getContext().clearTableListCache();
        if (manager.getContext().getTableState(this.tableId) == TableState.ONLINE) {
            long reserve2 = Utils.reserveHdfsDirectory(manager, this.sourceDir, tid);
            long reserve1 = reserve2;
            if (reserve1 == 0L) {
                reserve2 = Utils.reserveHdfsDirectory(manager, this.errorDir, tid);
            }
            return reserve2;
        }
        throw new AcceptableThriftTableOperationException(this.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OFFLINE, null);
    }

    @Override
    public Repo<Manager> call(long tid, Manager manager) throws Exception {
        String fmtTid = FateTxId.formatTid((long)tid);
        log.debug(" {} sourceDir {}", (Object)fmtTid, (Object)this.sourceDir);
        Utils.getReadLock(manager, this.tableId, tid).lock();
        VolumeManager fs = manager.getVolumeManager();
        Path errorPath = new Path(this.errorDir);
        FileStatus errorStatus = null;
        try {
            errorStatus = fs.getFileStatus(errorPath);
        }
        catch (FileNotFoundException fileNotFoundException) {
            // empty catch block
        }
        if (errorStatus == null) {
            throw new AcceptableThriftTableOperationException(this.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, this.errorDir + " does not exist");
        }
        if (!errorStatus.isDirectory()) {
            throw new AcceptableThriftTableOperationException(this.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, this.errorDir + " is not a directory");
        }
        if (fs.listStatus(errorPath).length != 0) {
            throw new AcceptableThriftTableOperationException(this.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, this.errorDir + " is not empty");
        }
        TransactionWatcher.ZooArbitrator.start((ServerContext)manager.getContext(), (String)"bulkTx", (long)tid);
        manager.updateBulkImportStatus(this.sourceDir, BulkImportState.MOVING);
        try {
            String bulkDir = BulkImport.prepareBulkImport(manager.getContext(), fs, this.sourceDir, this.tableId, tid);
            log.debug(" {} bulkDir {}", (Object)tid, (Object)bulkDir);
            return new LoadFiles(this.tableId, this.sourceDir, bulkDir, this.errorDir, this.setTime);
        }
        catch (IOException ex) {
            log.error("error preparing the bulk import directory", (Throwable)ex);
            throw new AcceptableThriftTableOperationException(this.tableId.canonical(), null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_INPUT_DIRECTORY, this.sourceDir + ": " + ex);
        }
    }

    private static Path createNewBulkDir(ServerContext context, VolumeManager fs, String sourceDir, TableId tableId) throws IOException {
        Path tableDir = fs.matchingFileSystem(new Path(sourceDir), context.getTablesDirs());
        if (tableDir == null) {
            throw new IOException(sourceDir + " is not in the same file system as any volume configured for Accumulo");
        }
        Path directory = new Path(tableDir, tableId.canonical());
        fs.mkdirs(directory);
        UniqueNameAllocator namer = context.getUniqueNameAllocator();
        while (true) {
            Path newBulkDir;
            if (fs.exists(newBulkDir = new Path(directory, "b-" + namer.getNextName()))) {
                throw new IOException("Dir exist when it should not " + newBulkDir);
            }
            if (fs.mkdirs(newBulkDir)) {
                return newBulkDir;
            }
            log.warn("Failed to create {} for unknown reason", (Object)newBulkDir);
            UtilWaitThread.sleepUninterruptibly((long)3L, (TimeUnit)TimeUnit.SECONDS);
        }
    }

    @VisibleForTesting
    public static String prepareBulkImport(ServerContext manager, VolumeManager fs, String dir, TableId tableId, long tid) throws Exception {
        Path bulkDir = BulkImport.createNewBulkDir(manager, fs, dir, tableId);
        manager.getAmple().addBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), tid);
        Path dirPath = new Path(dir);
        FileStatus[] mapFiles = fs.listStatus(dirPath);
        UniqueNameAllocator namer = manager.getUniqueNameAllocator();
        AccumuloConfiguration serverConfig = manager.getConfiguration();
        ThreadPoolExecutor workers = ThreadPools.getServerThreadPools().createExecutorService(serverConfig, serverConfig.resolve(Property.MANAGER_RENAME_THREADS, new Property[]{Property.MANAGER_BULK_RENAME_THREADS}), false);
        ArrayList<Future<Exception>> results = new ArrayList<Future<Exception>>();
        FileStatus[] fileStatusArray = mapFiles;
        int n = fileStatusArray.length;
        for (int i = 0; i < n; ++i) {
            FileStatus file;
            FileStatus fileStatus = file = fileStatusArray[i];
            results.add(workers.submit(() -> {
                try {
                    String[] sa = fileStatus.getPath().getName().split("\\.");
                    String extension = "";
                    if (sa.length > 1) {
                        extension = sa[sa.length - 1];
                        if (!FileOperations.getValidExtensions().contains(extension)) {
                            log.warn("{} does not have a valid extension, ignoring", (Object)fileStatus.getPath());
                            return null;
                        }
                    } else {
                        extension = "map";
                    }
                    if (extension.equals("map")) {
                        if (!fileStatus.isDirectory()) {
                            log.warn("{} is not a map file, ignoring", (Object)fileStatus.getPath());
                            return null;
                        }
                        if (fileStatus.getPath().getName().equals("_logs")) {
                            log.info("{} is probably a log directory from a map/reduce task, skipping", (Object)fileStatus.getPath());
                            return null;
                        }
                        try {
                            FileStatus dataStatus = fs.getFileStatus(new Path(fileStatus.getPath(), "data"));
                            if (dataStatus.isDirectory()) {
                                log.warn("{} is not a map file, ignoring", (Object)fileStatus.getPath());
                                return null;
                            }
                        }
                        catch (FileNotFoundException fnfe) {
                            log.warn("{} is not a map file, ignoring", (Object)fileStatus.getPath());
                            return null;
                        }
                    }
                    String newName = "I" + namer.getNextName() + "." + extension;
                    Path newPath = new Path(bulkDir, newName);
                    try {
                        fs.rename(fileStatus.getPath(), newPath);
                        log.debug("Moved {} to {}", (Object)fileStatus.getPath(), (Object)newPath);
                    }
                    catch (IOException E1) {
                        log.error("Could not move: {} {}", (Object)fileStatus.getPath(), (Object)E1.getMessage());
                    }
                }
                catch (Exception ex) {
                    return ex;
                }
                return null;
            }));
        }
        workers.shutdown();
        while (!workers.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
        }
        for (Future future : results) {
            if (future.get() == null) continue;
            throw (Exception)future.get();
        }
        return bulkDir.toString();
    }

    @Override
    public void undo(long tid, Manager environment) throws Exception {
        Utils.unreserveHdfsDirectory(environment, this.sourceDir, tid);
        Utils.unreserveHdfsDirectory(environment, this.errorDir, tid);
        Utils.getReadLock(environment, this.tableId, tid).unlock();
        TransactionWatcher.ZooArbitrator.cleanup((ServerContext)environment.getContext(), (String)"bulkTx", (long)tid);
    }
}

