/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
import java.util.Collection;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public abstract class AbstractThroughputFileInputOperator<T>
extends AbstractFileInputOperator<T> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractThroughputFileInputOperator.class);
    private long repartitionInterval = 300000L;
    private int preferredMaxPendingFilesPerOperator = 10;

    public void setRepartitionInterval(long repartitionInterval) {
        this.repartitionInterval = repartitionInterval;
    }

    public long getRepartitionInterval() {
        return this.repartitionInterval;
    }

    public void setPreferredMaxPendingFilesPerOperator(int pendingFilesPerOperator) {
        this.preferredMaxPendingFilesPerOperator = pendingFilesPerOperator;
    }

    public int getPreferredMaxPendingFilesPerOperator() {
        return this.preferredMaxPendingFilesPerOperator;
    }

    @Override
    public int getPartitionCount() {
        return super.getPartitionCount();
    }

    @Override
    public void setPartitionCount(int requiredPartitions) {
        super.setPartitionCount(requiredPartitions);
    }

    @Override
    public void emitTuples() {
        this.scanDirectory();
        super.emitTuples();
    }

    @Override
    protected int getNewPartitionCount(Collection<Partitioner.Partition<AbstractFileInputOperator<T>>> partitions, Partitioner.PartitioningContext context) {
        int newOperatorCount;
        LOG.debug("Called throughput.");
        boolean isInitialParitition = partitions.iterator().next().getStats() == null;
        int totalFileCount = 0;
        for (Partitioner.Partition<AbstractFileInputOperator<T>> partition : partitions) {
            AbstractFileInputOperator oper = (AbstractFileInputOperator)partition.getPartitionedInstance();
            totalFileCount += oper.failedFiles.size();
            totalFileCount += oper.pendingFiles.size();
            totalFileCount += oper.unfinishedFiles.size();
            if (oper.currentFile == null) continue;
            ++totalFileCount;
        }
        if (!isInitialParitition) {
            LOG.debug("definePartitions: Total File Count: {}", (Object)totalFileCount);
            newOperatorCount = this.computeOperatorCount(totalFileCount);
        } else {
            newOperatorCount = this.partitionCount;
        }
        return newOperatorCount;
    }

    private int computeOperatorCount(int totalFileCount) {
        int newOperatorCount = totalFileCount / this.preferredMaxPendingFilesPerOperator;
        if (totalFileCount % this.preferredMaxPendingFilesPerOperator > 0) {
            ++newOperatorCount;
        }
        if (newOperatorCount > this.partitionCount) {
            newOperatorCount = this.partitionCount;
        }
        if (newOperatorCount == 0) {
            newOperatorCount = 1;
        }
        return newOperatorCount;
    }

    @Override
    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
        BasicCounters fileCounters = null;
        for (Stats.OperatorStats operatorStats : batchedOperatorStats.getLastWindowedStats()) {
            if (operatorStats.counters == null) continue;
            fileCounters = (BasicCounters)operatorStats.counters;
        }
        StatsListener.Response response = new StatsListener.Response();
        if (fileCounters != null && ((MutableLong)fileCounters.getCounter(AbstractFileInputOperator.FileCounters.PENDING_FILES)).longValue() > 0L || System.currentTimeMillis() - this.repartitionInterval <= this.lastRepartition) {
            response.repartitionRequired = false;
            return response;
        }
        response.repartitionRequired = true;
        return response;
    }
}

