/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.master.tableOps.bulkVer2;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.accumulo.core.client.impl.AbstractId;
import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.client.impl.BulkSerialize;
import org.apache.accumulo.core.client.impl.ClientContext;
import org.apache.accumulo.core.client.impl.Table;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.thrift.TableOperation;
import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.metadata.schema.MetadataScanner;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.MasterRepo;
import org.apache.accumulo.master.tableOps.Utils;
import org.apache.accumulo.master.tableOps.bulkVer2.BulkImportMove;
import org.apache.accumulo.master.tableOps.bulkVer2.BulkInfo;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.tablets.UniqueNameAllocator;
import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrepBulkImport
extends MasterRepo {
    private static final long serialVersionUID = 1L;
    private static final Logger log = LoggerFactory.getLogger(PrepBulkImport.class);
    private final BulkInfo bulkInfo;

    public PrepBulkImport(Table.ID tableId, String sourceDir, boolean setTime) {
        BulkInfo info = new BulkInfo();
        info.tableId = tableId;
        info.sourceDir = sourceDir;
        info.setTime = setTime;
        this.bulkInfo = info;
    }

    @Override
    public long isReady(long tid, Master master) throws Exception {
        if (!Utils.getReadLock(master, (AbstractId)this.bulkInfo.tableId, tid).tryLock()) {
            return 100L;
        }
        if (master.onlineTabletServers().size() == 0) {
            return 500L;
        }
        Tables.clearCache((ClientContext)master.getContext());
        return Utils.reserveHdfsDirectory(master, this.bulkInfo.sourceDir, tid);
    }

    private static boolean equals(Function<KeyExtent, Text> extractor, KeyExtent ke1, KeyExtent ke2) {
        return Objects.equals(extractor.apply(ke1), extractor.apply(ke2));
    }

    @VisibleForTesting
    static void checkForMerge(String tableId, Iterator<KeyExtent> lmi, TabletIterFactory tabletIterFactory) throws Exception {
        KeyExtent currRange = lmi.next();
        Text startRow = currRange.getPrevEndRow();
        Iterator<KeyExtent> tabletIter = tabletIterFactory.newTabletIter(startRow);
        KeyExtent currTablet = tabletIter.next();
        if (!tabletIter.hasNext() && PrepBulkImport.equals(KeyExtent::getPrevEndRow, currTablet, currRange) && PrepBulkImport.equals(KeyExtent::getEndRow, currTablet, currRange)) {
            currRange = null;
        }
        while (tabletIter.hasNext()) {
            if (currRange == null) {
                if (!lmi.hasNext()) break;
                currRange = lmi.next();
            }
            while (!PrepBulkImport.equals(KeyExtent::getPrevEndRow, currTablet, currRange) && tabletIter.hasNext()) {
                currTablet = tabletIter.next();
            }
            boolean matchedPrevRow = PrepBulkImport.equals(KeyExtent::getPrevEndRow, currTablet, currRange);
            while (!PrepBulkImport.equals(KeyExtent::getEndRow, currTablet, currRange) && tabletIter.hasNext()) {
                currTablet = tabletIter.next();
            }
            if (!matchedPrevRow || !PrepBulkImport.equals(KeyExtent::getEndRow, currTablet, currRange)) break;
            currRange = null;
        }
        if (currRange != null || lmi.hasNext()) {
            throw new AcceptableThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.OTHER, "Concurrent merge happened");
        }
    }

    private void checkForMerge(Master master) throws Exception {
        VolumeManager fs = master.getFileSystem();
        Path bulkDir = new Path(this.bulkInfo.sourceDir);
        try (BulkSerialize.LoadMappingIterator lmi = BulkSerialize.readLoadMapping((String)bulkDir.toString(), (Table.ID)this.bulkInfo.tableId, p -> fs.open(p));){
            Iterators.transform((Iterator)lmi, entry -> (KeyExtent)entry.getKey());
            TabletIterFactory tabletIterFactory = startRow -> MetadataScanner.builder().from((ClientContext)master.getContext()).scanMetadataTable().overRange(this.bulkInfo.tableId, startRow, null).checkConsistency().fetchPrev().build().stream().map(TabletMetadata::getExtent).iterator();
            PrepBulkImport.checkForMerge(this.bulkInfo.tableId.canonicalID(), Iterators.transform((Iterator)lmi, entry -> (KeyExtent)entry.getKey()), tabletIterFactory);
        }
    }

    @Override
    public Repo<Master> call(long tid, Master master) throws Exception {
        this.checkForMerge(master);
        this.bulkInfo.tableState = Tables.getTableState((ClientContext)master.getContext(), (Table.ID)this.bulkInfo.tableId);
        VolumeManager fs = master.getFileSystem();
        UniqueNameAllocator namer = master.getContext().getUniqueNameAllocator();
        Path sourceDir = new Path(this.bulkInfo.sourceDir);
        FileStatus[] files = fs.listStatus(sourceDir);
        Path bulkDir = this.createNewBulkDir(master.getContext(), fs, this.bulkInfo.tableId);
        Path mappingFile = new Path(sourceDir, "loadmap.json");
        HashMap<String, String> oldToNewNameMap = new HashMap<String, String>();
        FileStatus[] fileStatusArray = files;
        int n = fileStatusArray.length;
        for (int i = 0; i < n; ++i) {
            boolean invalidFileName;
            FileStatus file;
            FileStatus fileStatus = file = fileStatusArray[i];
            Path originalPath = fileStatus.getPath();
            String[] fileNameParts = originalPath.getName().split("\\.");
            String extension = "";
            if (fileNameParts.length > 1) {
                extension = fileNameParts[fileNameParts.length - 1];
                invalidFileName = !FileOperations.getValidExtensions().contains(extension);
            } else {
                invalidFileName = true;
            }
            if (invalidFileName) {
                log.warn("{} does not have a valid extension, ignoring", (Object)fileStatus.getPath());
                continue;
            }
            String newName = "I" + namer.getNextName() + "." + extension;
            Path newPath = new Path(bulkDir, newName);
            oldToNewNameMap.put(originalPath.getName(), newPath.getName());
        }
        Path newMappingFile = new Path(bulkDir, mappingFile.getName());
        oldToNewNameMap.put(mappingFile.getName(), newMappingFile.getName());
        BulkSerialize.writeRenameMap(oldToNewNameMap, (String)bulkDir.toString(), p -> fs.create(p));
        this.bulkInfo.bulkDir = bulkDir.toString();
        return new BulkImportMove(this.bulkInfo);
    }

    private Path createNewBulkDir(ServerContext context, VolumeManager fs, Table.ID tableId) throws IOException {
        Path tempPath = fs.matchingFileSystem(new Path(this.bulkInfo.sourceDir), ServerConstants.getTablesDirs((AccumuloConfiguration)context.getConfiguration()));
        if (tempPath == null) {
            throw new IOException(this.bulkInfo.sourceDir + " is not in a volume configured for Accumulo");
        }
        String tableDir = tempPath.toString();
        if (tableDir == null) {
            throw new IOException(this.bulkInfo.sourceDir + " is not in a volume configured for Accumulo");
        }
        Path directory = new Path(tableDir + "/" + tableId);
        fs.mkdirs(directory);
        UniqueNameAllocator namer = context.getUniqueNameAllocator();
        Path newBulkDir;
        while (!fs.mkdirs(newBulkDir = new Path(directory, "b-" + namer.getNextName()))) {
            log.warn("Failed to create {} for unknown reason", (Object)newBulkDir);
            UtilWaitThread.sleepUninterruptibly((long)3L, (TimeUnit)TimeUnit.SECONDS);
        }
        return newBulkDir;
    }

    @Override
    public void undo(long tid, Master environment) throws Exception {
        Utils.unreserveHdfsDirectory(environment, this.bulkInfo.sourceDir, tid);
        Utils.getReadLock(environment, (AbstractId)this.bulkInfo.tableId, tid).unlock();
        TransactionWatcher.ZooArbitrator.cleanup((ServerContext)environment.getContext(), (String)"bulkTx", (long)tid);
    }

    @VisibleForTesting
    static interface TabletIterFactory {
        public Iterator<KeyExtent> newTabletIter(Text var1) throws Exception;
    }
}

