/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.manager.tableOps.bulkVer2;

import com.google.common.base.Preconditions;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.bulk.Bulk;
import org.apache.accumulo.core.clientImpl.bulk.BulkSerialize;
import org.apache.accumulo.core.clientImpl.bulk.LoadMappingIterator;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.master.thrift.BulkImportState;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.PeekingIterator;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.bulkVer2.BulkInfo;
import org.apache.accumulo.manager.tableOps.bulkVer2.CleanUpBulkImport;
import org.apache.accumulo.manager.tableOps.bulkVer2.CompleteBulkImport;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LoadFiles
extends ManagerRepo {
    private static final long serialVersionUID = 1L;
    private static final Logger log = LoggerFactory.getLogger(LoadFiles.class);
    private final BulkInfo bulkInfo;
    private static final Comparator<Text> PREV_COMP = Comparator.nullsFirst(BinaryComparable::compareTo);
    private static final Comparator<Text> END_COMP = Comparator.nullsLast(BinaryComparable::compareTo);

    public LoadFiles(BulkInfo bulkInfo) {
        this.bulkInfo = bulkInfo;
    }

    @Override
    public long isReady(long tid, Manager manager) throws Exception {
        if (manager.onlineTabletServers().isEmpty()) {
            log.warn("There are no tablet server to process bulkDir import, waiting (tid = " + FateTxId.formatTid((long)tid) + ")");
            return 100L;
        }
        VolumeManager fs = manager.getVolumeManager();
        Path bulkDir = new Path(this.bulkInfo.bulkDir);
        manager.updateBulkImportStatus(this.bulkInfo.sourceDir, BulkImportState.LOADING);
        try (LoadMappingIterator lmi = BulkSerialize.getUpdatedLoadMapping((String)bulkDir.toString(), (TableId)this.bulkInfo.tableId, arg_0 -> ((VolumeManager)fs).open(arg_0));){
            long l = this.loadFiles(this.bulkInfo.tableId, bulkDir, lmi, manager, tid);
            return l;
        }
    }

    @Override
    public Repo<Manager> call(long tid, Manager manager) {
        if (this.bulkInfo.tableState == TableState.ONLINE) {
            return new CompleteBulkImport(this.bulkInfo);
        }
        return new CleanUpBulkImport(this.bulkInfo);
    }

    private long loadFiles(TableId tableId, Path bulkDir, LoadMappingIterator loadMapIter, Manager manager, long tid) throws Exception {
        PeekingIterator lmi = new PeekingIterator((Iterator)loadMapIter);
        Map.Entry loadMapEntry = (Map.Entry)lmi.peek();
        Text startRow = ((KeyExtent)loadMapEntry.getKey()).prevEndRow();
        Iterator tabletIter = TabletsMetadata.builder((AccumuloClient)manager.getContext()).forTable(tableId).overlapping(startRow, null).checkConsistency().fetch(new TabletMetadata.ColumnType[]{TabletMetadata.ColumnType.PREV_ROW, TabletMetadata.ColumnType.LOCATION, TabletMetadata.ColumnType.LOADED}).build().iterator();
        Loader loader = this.bulkInfo.tableState == TableState.ONLINE ? new OnlineLoader() : new OfflineLoader();
        loader.start(bulkDir, manager, tid, this.bulkInfo.setTime);
        long t1 = System.currentTimeMillis();
        while (lmi.hasNext()) {
            loadMapEntry = (Map.Entry)lmi.next();
            List<TabletMetadata> tablets = this.findOverlappingTablets((KeyExtent)loadMapEntry.getKey(), tabletIter);
            loader.load(tablets, (Bulk.Files)loadMapEntry.getValue());
        }
        long sleepTime = loader.finish();
        if (sleepTime > 0L) {
            long scanTime = Math.min(System.currentTimeMillis() - t1, 30000L);
            sleepTime = Math.max(sleepTime, scanTime * 2L);
        }
        return sleepTime;
    }

    private List<TabletMetadata> findOverlappingTablets(KeyExtent loadRange, Iterator<TabletMetadata> tabletIter) {
        TabletMetadata currTablet = null;
        try {
            int cmp;
            ArrayList<TabletMetadata> tablets = new ArrayList<TabletMetadata>();
            currTablet = tabletIter.next();
            while ((cmp = PREV_COMP.compare(currTablet.getPrevEndRow(), loadRange.prevEndRow())) < 0) {
                currTablet = tabletIter.next();
            }
            if (cmp != 0) {
                throw new IllegalStateException("Unexpected prev end row " + currTablet.getExtent() + " " + loadRange);
            }
            tablets.add(currTablet);
            while ((cmp = END_COMP.compare(currTablet.getEndRow(), loadRange.endRow())) < 0) {
                currTablet = tabletIter.next();
                tablets.add(currTablet);
            }
            if (cmp != 0) {
                throw new IllegalStateException("Unexpected end row " + currTablet + " " + loadRange);
            }
            return tablets;
        }
        catch (NoSuchElementException e) {
            NoSuchElementException ne2 = new NoSuchElementException("Failed to find overlapping tablets " + currTablet + " " + loadRange);
            ne2.initCause(e);
            throw ne2;
        }
    }

    private static class OfflineLoader
    extends Loader {
        BatchWriter bw;
        MapCounter<HostAndPort> unloadingTablets;

        private OfflineLoader() {
        }

        @Override
        void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception {
            Preconditions.checkArgument((!setTime ? 1 : 0) != 0);
            super.start(bulkDir, manager, tid, setTime);
            this.bw = manager.getContext().createBatchWriter(MetadataTable.NAME);
            this.unloadingTablets = new MapCounter();
        }

        @Override
        void load(List<TabletMetadata> tablets, Bulk.Files files) throws MutationsRejectedException {
            byte[] fam = TextUtil.getBytes((Text)MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
            for (TabletMetadata tablet : tablets) {
                if (tablet.getLocation() != null) {
                    this.unloadingTablets.increment((Object)tablet.getLocation().getHostAndPort(), 1L);
                    continue;
                }
                Mutation mutation = new Mutation(tablet.getExtent().toMetaRow());
                for (Bulk.FileInfo fileInfo : files) {
                    String fullPath = new Path(this.bulkDir, fileInfo.getFileName()).toString();
                    byte[] val = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries()).encode();
                    mutation.put(fam, fullPath.getBytes(StandardCharsets.UTF_8), val);
                }
                this.bw.addMutation(mutation);
            }
        }

        @Override
        long finish() throws Exception {
            this.bw.close();
            long sleepTime = 0L;
            if (this.unloadingTablets.size() > 0) {
                sleepTime = this.unloadingTablets.max() * 13L;
            }
            return sleepTime;
        }
    }

    private static class OnlineLoader
    extends Loader {
        long timeInMillis;
        String fmtTid;
        int locationLess = 0;
        MapCounter<HostAndPort> loadMsgs;
        Map<HostAndPort, Map<TKeyExtent, Map<String, MapFileInfo>>> loadQueue;
        private int queuedDataSize = 0;

        private OnlineLoader() {
        }

        @Override
        void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception {
            super.start(bulkDir, manager, tid, setTime);
            this.timeInMillis = manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
            this.fmtTid = FateTxId.formatTid((long)tid);
            this.loadMsgs = new MapCounter();
            this.loadQueue = new HashMap<HostAndPort, Map<TKeyExtent, Map<String, MapFileInfo>>>();
        }

        private void sendQueued(int threshhold) {
            if (this.queuedDataSize > threshhold || threshhold == 0) {
                this.loadQueue.forEach((server, tabletFiles) -> {
                    if (log.isTraceEnabled()) {
                        log.trace("{} asking {} to bulk import {} files for {} tablets", new Object[]{this.fmtTid, server, tabletFiles.values().stream().mapToInt(Map::size).sum(), tabletFiles.size()});
                    }
                    TabletClientService.Client client = null;
                    try {
                        client = (TabletClientService.Client)ThriftUtil.getClient((ThriftClientTypes)ThriftClientTypes.TABLET_SERVER, (HostAndPort)server, (ClientContext)this.manager.getContext(), (long)this.timeInMillis);
                        client.loadFiles(TraceUtil.traceInfo(), this.manager.getContext().rpcCreds(), this.tid, this.bulkDir.toString(), tabletFiles, this.setTime);
                    }
                    catch (TException ex) {
                        try {
                            log.debug("rpc failed server: " + server + ", " + this.fmtTid + " " + ex.getMessage(), (Throwable)ex);
                        }
                        catch (Throwable throwable) {
                            ThriftUtil.returnClient(client, (ClientContext)this.manager.getContext());
                            throw throwable;
                        }
                        ThriftUtil.returnClient((TServiceClient)client, (ClientContext)this.manager.getContext());
                    }
                    ThriftUtil.returnClient((TServiceClient)client, (ClientContext)this.manager.getContext());
                });
                this.loadQueue.clear();
                this.queuedDataSize = 0;
            }
        }

        private void addToQueue(HostAndPort server, KeyExtent extent, Map<String, MapFileInfo> thriftImports) {
            if (!thriftImports.isEmpty()) {
                this.loadMsgs.increment((Object)server, 1L);
                Map<String, MapFileInfo> prev = this.loadQueue.computeIfAbsent(server, k -> new HashMap()).putIfAbsent(extent.toThrift(), thriftImports);
                Preconditions.checkState((prev == null ? 1 : 0) != 0, (String)"Unexpectedly saw extent %s twice", (Object)extent);
                this.queuedDataSize += thriftImports.keySet().stream().mapToInt(String::length).sum() + server.getHost().length() + 4 + thriftImports.size() * 32;
            }
        }

        @Override
        void load(List<TabletMetadata> tablets, Bulk.Files files) {
            for (TabletMetadata tablet : tablets) {
                TabletMetadata.Location location = tablet.getLocation();
                HostAndPort server = null;
                if (location == null) {
                    ++this.locationLess;
                    continue;
                }
                server = location.getHostAndPort();
                Set loadedFiles = tablet.getLoaded().keySet();
                HashMap<String, MapFileInfo> thriftImports = new HashMap<String, MapFileInfo>();
                for (Bulk.FileInfo fileInfo : files) {
                    Path fullPath = new Path(this.bulkDir, fileInfo.getFileName());
                    TabletFile bulkFile = new TabletFile(fullPath);
                    if (loadedFiles.contains(bulkFile)) continue;
                    thriftImports.put(fileInfo.getFileName(), new MapFileInfo(fileInfo.getEstFileSize()));
                }
                this.addToQueue(server, tablet.getExtent(), thriftImports);
            }
            this.sendQueued(0x400000);
        }

        @Override
        long finish() {
            this.sendQueued(0);
            long sleepTime = 0L;
            if (this.loadMsgs.size() > 0) {
                sleepTime = this.loadMsgs.max() * 13L;
            }
            if (this.locationLess > 0) {
                sleepTime = Math.max(Math.max(100L, (long)this.locationLess), sleepTime);
            }
            return sleepTime;
        }
    }

    private static abstract class Loader {
        protected Path bulkDir;
        protected Manager manager;
        protected long tid;
        protected boolean setTime;

        private Loader() {
        }

        void start(Path bulkDir, Manager manager, long tid, boolean setTime) throws Exception {
            this.bulkDir = bulkDir;
            this.manager = manager;
            this.tid = tid;
            this.setTime = setTime;
        }

        abstract void load(List<TabletMetadata> var1, Bulk.Files var2) throws Exception;

        abstract long finish() throws Exception;
    }
}

