/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.tserver.log;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.master.thrift.RecoveryStatus;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;

public class LogSorter {
    private static final Logger log = Logger.getLogger(LogSorter.class);
    VolumeManager fs;
    AccumuloConfiguration conf;
    private final Map<String, LogProcessor> currentWork = Collections.synchronizedMap(new HashMap());
    ThreadPoolExecutor threadPool;
    private final Instance instance;

    public LogSorter(Instance instance, VolumeManager fs, AccumuloConfiguration conf) {
        this.instance = instance;
        this.fs = fs;
        this.conf = conf;
        int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
        this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
    }

    public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException {
        this.threadPool = distWorkQThreadPool;
        new DistributedWorkQueue(ZooUtil.getRoot((Instance)this.instance) + "/recovery").startProcessing((DistributedWorkQueue.Processor)new LogProcessor(), this.threadPool);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<RecoveryStatus> getLogSorts() {
        ArrayList<RecoveryStatus> result = new ArrayList<RecoveryStatus>();
        Map<String, LogProcessor> map = this.currentWork;
        synchronized (map) {
            for (Map.Entry<String, LogProcessor> entries : this.currentWork.entrySet()) {
                RecoveryStatus status = new RecoveryStatus();
                status.name = entries.getKey();
                try {
                    status.progress = (double)entries.getValue().getBytesCopied() / (0.0 + (double)this.conf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE));
                }
                catch (IOException ex) {
                    log.warn((Object)"Error getting bytes read");
                }
                status.runtime = (int)entries.getValue().getSortTime();
                result.add(status);
            }
            return result;
        }
    }

    class LogProcessor
    implements DistributedWorkQueue.Processor {
        private FSDataInputStream input;
        private DataInputStream decryptingInput;
        private long bytesCopied = -1L;
        private long sortStart = 0L;
        private long sortStop = -1L;

        LogProcessor() {
        }

        public DistributedWorkQueue.Processor newProcessor() {
            return new LogProcessor();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void process(String child, byte[] data) {
            String work = new String(data);
            String[] parts = work.split("\\|");
            String src = parts[0];
            String dest = parts[1];
            String sortId = new Path(src).getName();
            log.debug((Object)("Sorting " + src + " to " + dest + " using sortId " + sortId));
            Map map = LogSorter.this.currentWork;
            synchronized (map) {
                if (LogSorter.this.currentWork.containsKey(sortId)) {
                    return;
                }
                LogSorter.this.currentWork.put(sortId, this);
            }
            try {
                log.info((Object)("Copying " + src + " to " + dest));
                this.sort(sortId, new Path(src), dest);
            }
            finally {
                LogSorter.this.currentWork.remove(sortId);
            }
        }

        /*
         * Exception decompiling
         */
        public void sort(String name, Path srcPath, String destPath) {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [1[TRYBLOCK]], but top level block is 27[UNCONDITIONALDOLOOP]
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void writeBuffer(String destPath, ArrayList<Pair<LogFileKey, LogFileValue>> buffer, int part) throws IOException {
            Path path = new Path(destPath, String.format("part-r-%05d", part++));
            FileSystem ns = LogSorter.this.fs.getVolumeByPath(path).getFileSystem();
            MapFile.Writer output = new MapFile.Writer(ns.getConf(), ns, path.toString(), LogFileKey.class, LogFileValue.class);
            try {
                Collections.sort(buffer, new Comparator<Pair<LogFileKey, LogFileValue>>(){

                    @Override
                    public int compare(Pair<LogFileKey, LogFileValue> o1, Pair<LogFileKey, LogFileValue> o2) {
                        return ((LogFileKey)o1.getFirst()).compareTo((LogFileKey)o2.getFirst());
                    }
                });
                for (Pair<LogFileKey, LogFileValue> entry : buffer) {
                    output.append((WritableComparable)entry.getFirst(), (Writable)entry.getSecond());
                }
            }
            finally {
                output.close();
            }
        }

        synchronized void close() throws IOException {
            this.bytesCopied = this.input.getPos();
            this.input.close();
            this.decryptingInput.close();
            this.input = null;
        }

        public synchronized long getSortTime() {
            if (this.sortStart > 0L) {
                if (this.sortStop > 0L) {
                    return this.sortStop - this.sortStart;
                }
                return System.currentTimeMillis() - this.sortStart;
            }
            return 0L;
        }

        synchronized long getBytesCopied() throws IOException {
            return this.input == null ? this.bytesCopied : this.input.getPos();
        }
    }
}

