/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.master.tableOps.bulkVer1;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.impl.Table;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.fate.Repo;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.master.tableOps.MasterRepo;
import org.apache.accumulo.master.tableOps.bulkVer1.CleanUpBulkImport;
import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.LiveTServerSet;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.hadoop.fs.Path;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class CopyFailed
extends MasterRepo {
    private static final Logger log = LoggerFactory.getLogger(CopyFailed.class);
    private static final long serialVersionUID = 1L;
    private Table.ID tableId;
    private String source;
    private String bulk;
    private String error;

    public CopyFailed(Table.ID tableId, String source, String bulk, String error) {
        this.tableId = tableId;
        this.source = source;
        this.bulk = bulk;
        this.error = error;
    }

    @Override
    public long isReady(long tid, Master master) throws Exception {
        HashSet<TServerInstance> finished = new HashSet<TServerInstance>();
        Set<TServerInstance> running = master.onlineTabletServers();
        for (TServerInstance server : running) {
            try {
                LiveTServerSet.TServerConnection client = master.getConnection(server);
                if (client == null || client.isActive(tid)) continue;
                finished.add(server);
            }
            catch (TException ex) {
                log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + (Object)((Object)ex));
            }
        }
        if (finished.containsAll(running)) {
            return 0L;
        }
        return 500L;
    }

    @Override
    public Repo<Master> call(long tid, Master master) throws Exception {
        Object path2;
        Object line;
        master.updateBulkImportStatus(this.source, BulkImportState.COPY_FILES);
        VolumeManager fs = master.getFileSystem();
        if (!fs.exists(new Path(this.error, "failures.txt"))) {
            return new CleanUpBulkImport(this.tableId, this.source, this.bulk, this.error);
        }
        HashMap<FileRef, Object> failures = new HashMap<FileRef, Object>();
        HashMap<FileRef, String> loadedFailures = new HashMap<FileRef, String>();
        try (BufferedReader in = new BufferedReader(new InputStreamReader((InputStream)fs.open(new Path(this.error, "failures.txt")), StandardCharsets.UTF_8));){
            line = null;
            while ((line = in.readLine()) != null) {
                path2 = new Path((String)line);
                if (fs.exists(new Path(this.error, path2.getName()))) continue;
                failures.put(new FileRef((String)line, (Path)path2), line);
            }
        }
        AccumuloClient client = master.getClient();
        IsolatedScanner mscanner = new IsolatedScanner(client.createScanner("accumulo.metadata", Authorizations.EMPTY));
        line = null;
        try {
            mscanner.setRange(new KeyExtent(this.tableId, null, null).toMetadataRange());
            mscanner.fetchColumnFamily(MetadataSchema.TabletsSection.BulkFileColumnFamily.NAME);
            path2 = mscanner.iterator();
            while (path2.hasNext()) {
                FileRef loadedFile;
                String absPath;
                Map.Entry entry = (Map.Entry)path2.next();
                if (Long.parseLong(((Value)entry.getValue()).toString()) != tid || (absPath = (String)failures.remove(loadedFile = new FileRef(fs, (Key)entry.getKey()))) == null) continue;
                loadedFailures.put(loadedFile, absPath);
            }
        }
        catch (Throwable path2) {
            line = path2;
            throw path2;
        }
        finally {
            if (mscanner != null) {
                if (line != null) {
                    try {
                        mscanner.close();
                    }
                    catch (Throwable path2) {
                        ((Throwable)line).addSuppressed(path2);
                    }
                } else {
                    mscanner.close();
                }
            }
        }
        for (String failure : failures.values()) {
            Path orig = new Path(failure);
            Path dest = new Path(this.error, orig.getName());
            fs.rename(orig, dest);
            log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed");
        }
        if (loadedFailures.size() > 0) {
            DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue("/accumulo/" + master.getInstanceID() + "/bulk_failed_copyq", master.getConfiguration());
            HashSet<String> workIds = new HashSet<String>();
            for (String failure : loadedFailures.values()) {
                Path orig = new Path(failure);
                Path dest = new Path(this.error, orig.getName());
                if (fs.exists(dest)) continue;
                bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(StandardCharsets.UTF_8));
                workIds.add(orig.getName());
                log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
            }
            bifCopyQueue.waitUntilDone(workIds);
        }
        fs.deleteRecursively(new Path(this.error, "failures.txt"));
        return new CleanUpBulkImport(this.tableId, this.source, this.bulk, this.error);
    }
}

