/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFileInputOperator<T>
implements InputOperator,
Partitioner<AbstractFileInputOperator<T>>,
StatsListener,
Operator.CheckpointListener {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class);
    @NotNull
    protected String directory;
    @NotNull
    protected DirectoryScanner scanner = new DirectoryScanner();
    protected int scanIntervalMillis = 5000;
    protected int offset;
    protected String currentFile;
    protected Set<String> processedFiles = new HashSet<String>();
    protected int emitBatchSize = 1000;
    protected int currentPartitions = 1;
    protected int partitionCount = 1;
    private int retryCount = 0;
    private int maxRetryCount = 5;
    protected transient int skipCount = 0;
    private transient Context.OperatorContext context;
    private final BasicCounters<MutableLong> fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
    protected MutableLong globalNumberOfFailures = new MutableLong();
    protected MutableLong localNumberOfFailures = new MutableLong();
    protected MutableLong globalNumberOfRetries = new MutableLong();
    protected MutableLong localNumberOfRetries = new MutableLong();
    protected transient MutableLong globalProcessedFileCount = new MutableLong();
    protected transient MutableLong localProcessedFileCount = new MutableLong();
    protected transient MutableLong pendingFileCount = new MutableLong();
    @NotNull
    private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager();
    protected transient long currentWindowId;
    protected final transient LinkedList<RecoveryEntry> currentWindowRecoveryState = Lists.newLinkedList();
    protected int operatorId;
    protected long lastRepartition = 0L;
    protected Queue<FailedFile> unfinishedFiles = new LinkedList<FailedFile>();
    protected Queue<FailedFile> failedFiles = new LinkedList<FailedFile>();
    protected transient FileSystem fs;
    protected transient Configuration configuration;
    protected transient long lastScanMillis;
    protected transient Path filePath;
    protected transient InputStream inputStream;
    protected Set<String> pendingFiles = new LinkedHashSet<String>();

    public String getDirectory() {
        return this.directory;
    }

    public void setDirectory(String directory) {
        this.directory = directory;
    }

    public DirectoryScanner getScanner() {
        return this.scanner;
    }

    public void setScanner(DirectoryScanner scanner) {
        this.scanner = scanner;
    }

    public int getScanIntervalMillis() {
        return this.scanIntervalMillis;
    }

    public void setScanIntervalMillis(int scanIntervalMillis) {
        this.scanIntervalMillis = scanIntervalMillis;
    }

    public int getEmitBatchSize() {
        return this.emitBatchSize;
    }

    public void setEmitBatchSize(int emitBatchSize) {
        this.emitBatchSize = emitBatchSize;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public int getPartitionCount() {
        return this.partitionCount;
    }

    public void setPartitionCount(int requiredPartitions) {
        this.partitionCount = requiredPartitions;
    }

    public int getCurrentPartitions() {
        return this.currentPartitions;
    }

    public void setup(Context.OperatorContext context) {
        this.operatorId = context.getId();
        this.globalProcessedFileCount.setValue((long)this.processedFiles.size());
        LOG.debug("Setup processed file count: {}", (Object)this.globalProcessedFileCount);
        this.context = context;
        try {
            this.filePath = new Path(this.directory);
            this.configuration = new Configuration();
            this.fs = this.getFSInstance();
        }
        catch (IOException ex) {
            this.failureHandling(ex);
        }
        this.fileCounters.setCounter(FileCounters.GLOBAL_PROCESSED_FILES, this.globalProcessedFileCount);
        this.fileCounters.setCounter(FileCounters.LOCAL_PROCESSED_FILES, this.localProcessedFileCount);
        this.fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES, this.globalNumberOfFailures);
        this.fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES, this.localNumberOfFailures);
        this.fileCounters.setCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES, this.globalNumberOfRetries);
        this.fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES, this.localNumberOfRetries);
        this.fileCounters.setCounter(FileCounters.PENDING_FILES, this.pendingFileCount);
        this.windowDataManager.setup((Context)context);
        if ((Long)context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < this.windowDataManager.getLargestRecoveryWindow()) {
            this.currentFile = null;
            this.offset = 0;
        }
    }

    protected FileSystem getFSInstance() throws IOException {
        return FileSystem.newInstance((URI)this.filePath.toUri(), (Configuration)this.configuration);
    }

    public void teardown() {
        IOException savedException = null;
        boolean fileFailed = false;
        try {
            if (this.inputStream != null) {
                this.inputStream.close();
            }
        }
        catch (IOException ex) {
            savedException = ex;
            fileFailed = true;
        }
        boolean fsFailed = false;
        try {
            this.fs.close();
        }
        catch (IOException ex) {
            savedException = ex;
            fsFailed = true;
        }
        if (savedException != null) {
            String errorMessage = "";
            if (fileFailed) {
                errorMessage = errorMessage + "Failed to close " + this.currentFile + ". ";
            }
            if (fsFailed) {
                errorMessage = errorMessage + "Failed to close filesystem.";
            }
            throw new RuntimeException(errorMessage, savedException);
        }
        this.windowDataManager.teardown();
    }

    public void beginWindow(long windowId) {
        this.currentWindowId = windowId;
        if (windowId <= this.windowDataManager.getLargestRecoveryWindow()) {
            this.replay(windowId);
        }
    }

    public void endWindow() {
        if (this.currentWindowId > this.windowDataManager.getLargestRecoveryWindow()) {
            try {
                this.windowDataManager.save(this.currentWindowRecoveryState, this.operatorId, this.currentWindowId);
            }
            catch (IOException e) {
                throw new RuntimeException("saving recovery", e);
            }
        }
        this.currentWindowRecoveryState.clear();
        if (this.context != null) {
            this.pendingFileCount.setValue((long)(this.pendingFiles.size() + this.failedFiles.size() + this.unfinishedFiles.size()));
            if (this.currentFile != null) {
                this.pendingFileCount.increment();
            }
            this.context.setCounters(this.fileCounters);
        }
    }

    protected void replay(long windowId) {
        try {
            Map<Integer, Object> recoveryDataPerOperator = this.windowDataManager.load(windowId);
            for (Object recovery : recoveryDataPerOperator.values()) {
                LinkedList recoveryData = (LinkedList)recovery;
                for (RecoveryEntry recoveryEntry : recoveryData) {
                    if (!this.scanner.acceptFile(recoveryEntry.file)) continue;
                    if (this.currentFile == null || !this.currentFile.equals(recoveryEntry.file) || this.offset != recoveryEntry.startOffset) {
                        if (this.inputStream != null) {
                            this.closeFile(this.inputStream);
                        }
                        this.processedFiles.add(recoveryEntry.file);
                        Iterator failedFileIterator = this.failedFiles.iterator();
                        while (failedFileIterator.hasNext()) {
                            FailedFile ff = (FailedFile)failedFileIterator.next();
                            if (!ff.path.equals(recoveryEntry.file) || ff.offset != recoveryEntry.startOffset) continue;
                            failedFileIterator.remove();
                            break;
                        }
                        Iterator unfinishedFileIterator = this.unfinishedFiles.iterator();
                        while (unfinishedFileIterator.hasNext()) {
                            FailedFile ff = (FailedFile)unfinishedFileIterator.next();
                            if (!ff.path.equals(recoveryEntry.file) || ff.offset != recoveryEntry.startOffset) continue;
                            unfinishedFileIterator.remove();
                            break;
                        }
                        if (this.pendingFiles.contains(recoveryEntry.file)) {
                            this.pendingFiles.remove(recoveryEntry.file);
                        }
                        this.inputStream = this.retryFailedFile(new FailedFile(recoveryEntry.file, recoveryEntry.startOffset));
                        while (this.offset < recoveryEntry.endOffset) {
                            T line = this.readEntity();
                            ++this.offset;
                            this.emit(line);
                        }
                        continue;
                    }
                    while (this.offset < recoveryEntry.endOffset) {
                        T line = this.readEntity();
                        ++this.offset;
                        this.emit(line);
                    }
                }
            }
        }
        catch (IOException e) {
            throw new RuntimeException("replay", e);
        }
    }

    public void emitTuples() {
        if (this.currentWindowId <= this.windowDataManager.getLargestRecoveryWindow()) {
            return;
        }
        if (this.inputStream == null) {
            try {
                if (this.currentFile != null && this.offset > 0) {
                    int tmpOffset = this.offset;
                    if (this.fs.exists(new Path(this.currentFile))) {
                        this.inputStream = this.openFile(new Path(this.currentFile));
                        this.offset = tmpOffset;
                        this.skipCount = tmpOffset;
                    } else {
                        this.currentFile = null;
                        this.offset = 0;
                        this.skipCount = 0;
                    }
                } else if (!this.unfinishedFiles.isEmpty()) {
                    this.retryFailedFile(this.unfinishedFiles.poll());
                } else if (!this.pendingFiles.isEmpty()) {
                    String newPathString = this.pendingFiles.iterator().next();
                    this.pendingFiles.remove(newPathString);
                    if (this.fs.exists(new Path(newPathString))) {
                        this.inputStream = this.openFile(new Path(newPathString));
                    }
                } else if (!this.failedFiles.isEmpty()) {
                    this.retryFailedFile(this.failedFiles.poll());
                } else {
                    this.scanDirectory();
                }
            }
            catch (IOException ex) {
                this.failureHandling(ex);
            }
        }
        if (this.inputStream != null) {
            int startOffset = this.offset;
            String file = this.currentFile;
            try {
                int counterForTuple = 0;
                while (counterForTuple++ < this.emitBatchSize) {
                    T line = this.readEntity();
                    if (line == null) {
                        LOG.info("done reading file ({} entries).", (Object)this.offset);
                        this.closeFile(this.inputStream);
                        break;
                    }
                    if (this.skipCount == 0) {
                        ++this.offset;
                        this.emit(line);
                        continue;
                    }
                    --this.skipCount;
                }
            }
            catch (IOException e) {
                this.failureHandling(e);
            }
            if (this.offset > startOffset) {
                this.currentWindowRecoveryState.add(new RecoveryEntry(file, startOffset, this.offset));
            }
        }
    }

    protected void scanDirectory() {
        if (System.currentTimeMillis() - (long)this.scanIntervalMillis >= this.lastScanMillis) {
            LinkedHashSet<Path> newPaths = this.scanner.scan(this.fs, this.filePath, this.processedFiles);
            for (Path newPath : newPaths) {
                String newPathString = newPath.toString();
                this.pendingFiles.add(newPathString);
                this.processedFiles.add(newPathString);
                this.localProcessedFileCount.increment();
            }
            this.lastScanMillis = System.currentTimeMillis();
        }
    }

    private void failureHandling(Exception e) {
        this.localNumberOfFailures.increment();
        if (this.maxRetryCount <= 0) {
            throw new RuntimeException(e);
        }
        LOG.error("FS reader error", (Throwable)e);
        this.addToFailedList();
    }

    protected void addToFailedList() {
        FailedFile ff = new FailedFile(this.currentFile, this.offset, this.retryCount);
        try {
            if (this.inputStream != null) {
                this.inputStream.close();
            }
        }
        catch (IOException e) {
            this.localNumberOfFailures.increment();
            LOG.error("Could not close input stream on: " + this.currentFile);
        }
        ++ff.retryCount;
        ff.lastFailedTime = System.currentTimeMillis();
        ff.offset = this.offset;
        this.currentFile = null;
        this.inputStream = null;
        if (ff.retryCount > this.maxRetryCount) {
            return;
        }
        this.localNumberOfRetries.increment();
        LOG.info("adding to failed list path {} offset {} retry {}", new Object[]{ff.path, ff.offset, ff.retryCount});
        this.failedFiles.add(ff);
    }

    protected InputStream retryFailedFile(FailedFile ff) throws IOException {
        LOG.info("retrying failed file {} offset {} retry {}", new Object[]{ff.path, ff.offset, ff.retryCount});
        String path = ff.path;
        if (!this.fs.exists(new Path(path))) {
            return null;
        }
        this.inputStream = this.openFile(new Path(path));
        this.offset = ff.offset;
        this.retryCount = ff.retryCount;
        this.skipCount = ff.offset;
        return this.inputStream;
    }

    protected InputStream openFile(Path path) throws IOException {
        this.currentFile = path.toString();
        this.offset = 0;
        this.retryCount = 0;
        this.skipCount = 0;
        LOG.info("opening file {}", (Object)path);
        FSDataInputStream input = this.fs.open(path);
        return input;
    }

    protected void closeFile(InputStream is) throws IOException {
        LOG.info("closing file {} offset {}", (Object)this.currentFile, (Object)this.offset);
        if (is != null) {
            is.close();
        }
        this.currentFile = null;
        this.inputStream = null;
    }

    public Collection<Partitioner.Partition<AbstractFileInputOperator<T>>> definePartitions(Collection<Partitioner.Partition<AbstractFileInputOperator<T>>> partitions, Partitioner.PartitioningContext context) {
        this.lastRepartition = System.currentTimeMillis();
        int totalCount = this.getNewPartitionCount(partitions, context);
        LOG.debug("Computed new partitions: {}", (Object)totalCount);
        if (totalCount == partitions.size()) {
            return partitions;
        }
        AbstractFileInputOperator tempOperator = (AbstractFileInputOperator)partitions.iterator().next().getPartitionedInstance();
        MutableLong tempGlobalNumberOfRetries = tempOperator.globalNumberOfRetries;
        MutableLong tempGlobalNumberOfFailures = tempOperator.globalNumberOfRetries;
        HashSet totalProcessedFiles = Sets.newHashSet();
        HashSet currentFiles = Sets.newHashSet();
        LinkedList oldscanners = Lists.newLinkedList();
        LinkedList totalFailedFiles = Lists.newLinkedList();
        LinkedList totalPendingFiles = Lists.newLinkedList();
        HashSet deletedOperators = Sets.newHashSet();
        for (Partitioner.Partition<AbstractFileInputOperator<T>> partition : partitions) {
            AbstractFileInputOperator oper = (AbstractFileInputOperator)partition.getPartitionedInstance();
            totalProcessedFiles.addAll(oper.processedFiles);
            totalFailedFiles.addAll(oper.failedFiles);
            totalPendingFiles.addAll(oper.pendingFiles);
            currentFiles.addAll(this.unfinishedFiles);
            tempGlobalNumberOfRetries.add((Number)oper.localNumberOfRetries);
            tempGlobalNumberOfFailures.add((Number)oper.localNumberOfFailures);
            if (oper.currentFile != null) {
                currentFiles.add(new FailedFile(oper.currentFile, oper.offset));
            }
            oldscanners.add(oper.getScanner());
            deletedOperators.add(oper.operatorId);
        }
        List<DirectoryScanner> scanners = this.scanner.partition(totalCount, oldscanners);
        ArrayList newPartitions = Lists.newArrayListWithExpectedSize((int)totalCount);
        ArrayList newManagers = Lists.newArrayListWithExpectedSize((int)totalCount);
        KryoCloneUtils<AbstractFileInputOperator> cloneUtils = KryoCloneUtils.createCloneUtils(this);
        for (int i = 0; i < scanners.size(); ++i) {
            AbstractFileInputOperator oper = cloneUtils.getClone();
            DirectoryScanner scn = scanners.get(i);
            oper.setScanner(scn);
            oper.processedFiles.addAll(totalProcessedFiles);
            oper.globalNumberOfFailures = tempGlobalNumberOfRetries;
            oper.localNumberOfFailures.setValue(0L);
            oper.globalNumberOfRetries = tempGlobalNumberOfFailures;
            oper.localNumberOfRetries.setValue(0L);
            oper.unfinishedFiles.clear();
            oper.currentFile = null;
            oper.offset = 0;
            Iterator unfinishedIter = currentFiles.iterator();
            while (unfinishedIter.hasNext()) {
                FailedFile unfinishedFile = (FailedFile)unfinishedIter.next();
                if (!scn.acceptFile(unfinishedFile.path)) continue;
                oper.unfinishedFiles.add(unfinishedFile);
                unfinishedIter.remove();
            }
            oper.failedFiles.clear();
            Iterator iter = totalFailedFiles.iterator();
            while (iter.hasNext()) {
                FailedFile ff = (FailedFile)iter.next();
                if (!scn.acceptFile(ff.path)) continue;
                oper.failedFiles.add(ff);
                iter.remove();
            }
            oper.pendingFiles.clear();
            Iterator pendingFilesIterator = totalPendingFiles.iterator();
            while (pendingFilesIterator.hasNext()) {
                String pathString = (String)pendingFilesIterator.next();
                if (!scn.acceptFile(pathString)) continue;
                oper.pendingFiles.add(pathString);
                pendingFilesIterator.remove();
            }
            newPartitions.add(new DefaultPartition((Object)oper));
            newManagers.add(oper.windowDataManager);
        }
        this.windowDataManager.partitioned(newManagers, deletedOperators);
        LOG.info("definePartitions called returning {} partitions", (Object)newPartitions.size());
        return newPartitions;
    }

    protected int getNewPartitionCount(Collection<Partitioner.Partition<AbstractFileInputOperator<T>>> partitions, Partitioner.PartitioningContext context) {
        return DefaultPartition.getRequiredPartitionCount((Partitioner.PartitioningContext)context, (int)this.partitionCount);
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractFileInputOperator<T>>> partitions) {
        this.currentPartitions = partitions.size();
    }

    public void checkpointed(long windowId) {
    }

    public void committed(long windowId) {
        try {
            this.windowDataManager.deleteUpTo(this.operatorId, windowId);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected abstract T readEntity() throws IOException;

    protected abstract void emit(T var1);

    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
        StatsListener.Response res = new StatsListener.Response();
        res.repartitionRequired = false;
        if (this.currentPartitions != this.partitionCount) {
            LOG.info("processStats: trying repartition of input operator current {} required {}", (Object)this.currentPartitions, (Object)this.partitionCount);
            res.repartitionRequired = true;
        }
        return res;
    }

    public int getMaxRetryCount() {
        return this.maxRetryCount;
    }

    public void setMaxRetryCount(int maxRetryCount) {
        this.maxRetryCount = maxRetryCount;
    }

    public static class FileLineInputOperator
    extends AbstractFileInputOperator<String> {
        public final transient DefaultOutputPort<String> output = new DefaultOutputPort();
        protected transient BufferedReader br;

        @Override
        protected InputStream openFile(Path path) throws IOException {
            InputStream is = super.openFile(path);
            this.br = new BufferedReader(new InputStreamReader(is));
            return is;
        }

        @Override
        protected void closeFile(InputStream is) throws IOException {
            super.closeFile(is);
            this.br.close();
            this.br = null;
        }

        @Override
        protected String readEntity() throws IOException {
            return this.br.readLine();
        }

        @Override
        protected void emit(String tuple) {
            this.output.emit((Object)tuple);
        }
    }

    protected static class RecoveryEntry {
        final String file;
        final int startOffset;
        final int endOffset;

        private RecoveryEntry() {
            this.file = null;
            this.startOffset = -1;
            this.endOffset = -1;
        }

        RecoveryEntry(String file, int startOffset, int endOffset) {
            this.file = (String)Preconditions.checkNotNull((Object)file, (Object)"file");
            this.startOffset = startOffset;
            this.endOffset = endOffset;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof RecoveryEntry)) {
                return false;
            }
            RecoveryEntry that = (RecoveryEntry)o;
            if (this.endOffset != that.endOffset) {
                return false;
            }
            if (this.startOffset != that.startOffset) {
                return false;
            }
            return this.file.equals(that.file);
        }

        public int hashCode() {
            int result = this.file.hashCode();
            result = 31 * result + this.startOffset;
            result = 31 * result + this.endOffset;
            return result;
        }
    }

    public static class DirectoryScanner
    implements Serializable {
        private static final long serialVersionUID = 4535844463258899929L;
        private String filePatternRegexp;
        private transient Pattern regex = null;
        private int partitionIndex;
        private int partitionCount;
        protected final transient HashSet<String> ignoredFiles = new HashSet();

        public String getFilePatternRegexp() {
            return this.filePatternRegexp;
        }

        public void setFilePatternRegexp(String filePatternRegexp) {
            this.filePatternRegexp = filePatternRegexp;
            this.regex = null;
        }

        public int getPartitionCount() {
            return this.partitionCount;
        }

        public int getPartitionIndex() {
            return this.partitionIndex;
        }

        protected Pattern getRegex() {
            if (this.regex == null && this.filePatternRegexp != null) {
                this.regex = Pattern.compile(this.filePatternRegexp);
            }
            return this.regex;
        }

        public LinkedHashSet<Path> scan(FileSystem fs, Path filePath, Set<String> consumedFiles) {
            LinkedHashSet pathSet = Sets.newLinkedHashSet();
            try {
                FileStatus[] files;
                LOG.debug("Scanning {} with pattern {}", (Object)filePath, (Object)this.filePatternRegexp);
                for (FileStatus status : files = fs.listStatus(filePath)) {
                    Path path = status.getPath();
                    String filePathStr = path.toString();
                    if (consumedFiles.contains(filePathStr) || this.ignoredFiles.contains(filePathStr)) continue;
                    if (this.acceptFile(filePathStr)) {
                        LOG.debug("Found {}", (Object)filePathStr);
                        pathSet.add(path);
                        continue;
                    }
                    this.ignoredFiles.add(filePathStr);
                }
            }
            catch (FileNotFoundException e) {
                LOG.warn("Failed to list directory {}", (Object)filePath, (Object)e);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return pathSet;
        }

        protected boolean acceptFile(String filePathStr) {
            Matcher matcher;
            Pattern regex;
            if (this.partitionCount > 1) {
                int i = filePathStr.hashCode();
                int mod = i % this.partitionCount;
                if (mod < 0) {
                    mod += this.partitionCount;
                }
                LOG.debug("partition {} {} {} {}", new Object[]{this.partitionIndex, filePathStr, i, mod});
                if (mod != this.partitionIndex) {
                    return false;
                }
            }
            return (regex = this.getRegex()) == null || (matcher = regex.matcher(filePathStr)).matches();
        }

        public List<DirectoryScanner> partition(int count) {
            ArrayList partitions = Lists.newArrayListWithExpectedSize((int)count);
            for (int i = 0; i < count; ++i) {
                partitions.add(this.createPartition(i, count));
            }
            return partitions;
        }

        public List<DirectoryScanner> partition(int count, Collection<DirectoryScanner> scanners) {
            return this.partition(count);
        }

        protected DirectoryScanner createPartition(int partitionIndex, int partitionCount) {
            DirectoryScanner that = new DirectoryScanner();
            that.filePatternRegexp = this.filePatternRegexp;
            that.regex = this.regex;
            that.partitionIndex = partitionIndex;
            that.partitionCount = partitionCount;
            return that;
        }

        public String toString() {
            return "DirectoryScanner [filePatternRegexp=" + this.filePatternRegexp + " partitionIndex=" + this.partitionIndex + " partitionCount=" + this.partitionCount + "]";
        }
    }

    public static final class FileCountersAggregator
    implements Context.CountersAggregator,
    Serializable {
        private static final long serialVersionUID = 201409041428L;
        MutableLong totalLocalProcessedFiles = new MutableLong();
        MutableLong pendingFiles = new MutableLong();
        MutableLong totalLocalNumberOfFailures = new MutableLong();
        MutableLong totalLocalNumberOfRetries = new MutableLong();

        public Object aggregate(Collection<?> countersList) {
            if (countersList.isEmpty()) {
                return null;
            }
            BasicCounters tempFileCounters = (BasicCounters)countersList.iterator().next();
            MutableLong globalProcessedFiles = (MutableLong)tempFileCounters.getCounter(FileCounters.GLOBAL_PROCESSED_FILES);
            MutableLong globalNumberOfFailures = (MutableLong)tempFileCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_FAILURES);
            MutableLong globalNumberOfRetries = (MutableLong)tempFileCounters.getCounter(FileCounters.GLOBAL_NUMBER_OF_RETRIES);
            this.totalLocalProcessedFiles.setValue(0L);
            this.pendingFiles.setValue(0L);
            this.totalLocalNumberOfFailures.setValue(0L);
            this.totalLocalNumberOfRetries.setValue(0L);
            for (Object fileCounters : countersList) {
                BasicCounters basicFileCounters = (BasicCounters)fileCounters;
                this.totalLocalProcessedFiles.add(basicFileCounters.getCounter(FileCounters.LOCAL_PROCESSED_FILES));
                this.pendingFiles.add(basicFileCounters.getCounter(FileCounters.PENDING_FILES));
                this.totalLocalNumberOfFailures.add(basicFileCounters.getCounter(FileCounters.LOCAL_NUMBER_OF_FAILURES));
                this.totalLocalNumberOfRetries.add(basicFileCounters.getCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES));
            }
            globalProcessedFiles.add((Number)this.totalLocalProcessedFiles);
            globalProcessedFiles.subtract((Number)this.pendingFiles);
            globalNumberOfFailures.add((Number)this.totalLocalNumberOfFailures);
            globalNumberOfRetries.add((Number)this.totalLocalNumberOfRetries);
            BasicCounters<MutableLong> aggregatedCounters = new BasicCounters<MutableLong>(MutableLong.class);
            aggregatedCounters.setCounter(AggregatedFileCounters.PROCESSED_FILES, globalProcessedFiles);
            aggregatedCounters.setCounter(AggregatedFileCounters.PENDING_FILES, this.pendingFiles);
            aggregatedCounters.setCounter(AggregatedFileCounters.NUMBER_OF_ERRORS, this.totalLocalNumberOfFailures);
            aggregatedCounters.setCounter(AggregatedFileCounters.NUMBER_OF_RETRIES, this.totalLocalNumberOfRetries);
            return aggregatedCounters;
        }
    }

    protected static enum FileCounters {
        GLOBAL_PROCESSED_FILES,
        LOCAL_PROCESSED_FILES,
        GLOBAL_NUMBER_OF_FAILURES,
        LOCAL_NUMBER_OF_FAILURES,
        GLOBAL_NUMBER_OF_RETRIES,
        LOCAL_NUMBER_OF_RETRIES,
        PENDING_FILES;

    }

    public static enum AggregatedFileCounters {
        PROCESSED_FILES,
        PENDING_FILES,
        NUMBER_OF_ERRORS,
        NUMBER_OF_RETRIES;

    }

    protected static class FailedFile {
        String path;
        int offset;
        int retryCount;
        long lastFailedTime;

        protected FailedFile() {
        }

        protected FailedFile(String path, int offset) {
            this.path = path;
            this.offset = offset;
            this.retryCount = 0;
        }

        protected FailedFile(String path, int offset, int retryCount) {
            this.path = path;
            this.offset = offset;
            this.retryCount = retryCount;
        }

        public String toString() {
            return "FailedFile[path='" + this.path + '\'' + ", offset=" + this.offset + ", retryCount=" + this.retryCount + ", lastFailedTime=" + this.lastFailedTime + ']';
        }
    }
}

