/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.block;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.block.ReaderContext;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@StatsListener.DataQueueSize
public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM extends InputStream>
extends BaseOperator
implements Partitioner<AbstractBlockReader<R, B, STREAM>>,
StatsListener,
Operator.IdleTimeHandler {
    protected int operatorId;
    protected transient long windowId;
    @NotNull
    protected ReaderContext<STREAM> readerContext;
    protected transient STREAM stream;
    protected transient int blocksPerWindow;
    protected final BasicCounters<MutableLong> counters;
    protected transient Context.OperatorContext context;
    protected transient long sleepTimeMillis;
    protected Set<Integer> partitionKeys;
    protected int partitionMask;
    private boolean collectStats = true;
    protected int maxReaders = 16;
    protected int minReaders = 1;
    protected long intervalMillis = 60000L;
    protected final transient StatsListener.Response response;
    protected transient int partitionCount = 1;
    protected final transient Map<Integer, Integer> backlogPerOperator;
    private transient long nextMillis;
    protected transient B lastProcessedBlock;
    protected transient long lastBlockOpenTime = -1L;
    protected transient boolean consecutiveBlock;
    @AutoMetric
    private long bytesRead;
    public final transient DefaultOutputPort<B> blocksMetadataOutput = new DefaultOutputPort();
    public final transient DefaultOutputPort<ReaderRecord<R>> messages = new DefaultOutputPort();
    public final transient DefaultInputPort<B> blocksMetadataInput = new DefaultInputPort<B>(){

        public void process(B block) {
            AbstractBlockReader.this.processBlockMetadata(block);
        }
    };
    private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockReader.class);

    public AbstractBlockReader() {
        this.response = new StatsListener.Response();
        this.backlogPerOperator = Maps.newHashMap();
        this.counters = new BasicCounters<MutableLong>(MutableLong.class);
    }

    public void setup(Context.OperatorContext context) {
        this.operatorId = context.getId();
        LOG.debug("{}: partition keys {} mask {}", new Object[]{this.operatorId, this.partitionKeys, this.partitionMask});
        this.context = context;
        this.counters.setCounter(ReaderCounterKeys.BLOCKS, new MutableLong());
        this.counters.setCounter(ReaderCounterKeys.RECORDS, new MutableLong());
        this.counters.setCounter(ReaderCounterKeys.BYTES, new MutableLong());
        this.counters.setCounter(ReaderCounterKeys.TIME, new MutableLong());
        this.sleepTimeMillis = ((Integer)context.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
    }

    public void beginWindow(long windowId) {
        this.windowId = windowId;
        this.blocksPerWindow = 0;
        this.bytesRead = 0L;
    }

    public void handleIdleTime() {
        if (this.lastProcessedBlock != null && System.currentTimeMillis() - this.lastBlockOpenTime > this.intervalMillis) {
            try {
                this.teardownStream(this.lastProcessedBlock);
                this.lastProcessedBlock = null;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        try {
            Thread.sleep(this.sleepTimeMillis);
        }
        catch (InterruptedException ie) {
            throw new RuntimeException(ie);
        }
    }

    public void endWindow() {
        this.counters.getCounter(ReaderCounterKeys.BLOCKS).add((long)this.blocksPerWindow);
        this.context.setCounters(this.counters);
    }

    protected void processBlockMetadata(B block) {
        try {
            long blockStartTime = System.currentTimeMillis();
            if (block.getPreviousBlockId() == -1L || this.lastProcessedBlock == null || block.getPreviousBlockId() != this.lastProcessedBlock.getBlockId()) {
                this.teardownStream(this.lastProcessedBlock);
                this.consecutiveBlock = false;
                this.lastBlockOpenTime = System.currentTimeMillis();
                this.stream = this.setupStream(block);
            } else {
                this.consecutiveBlock = true;
            }
            this.readBlock((BlockMetadata)block);
            this.lastProcessedBlock = block;
            this.counters.getCounter(ReaderCounterKeys.TIME).add(System.currentTimeMillis() - blockStartTime);
            if (this.blocksMetadataOutput.isConnected()) {
                this.blocksMetadataOutput.emit(block);
            }
            ++this.blocksPerWindow;
        }
        catch (IOException ie) {
            try {
                if (this.lastProcessedBlock != null) {
                    this.teardownStream(this.lastProcessedBlock);
                    this.lastProcessedBlock = null;
                }
            }
            catch (IOException ioe) {
                throw new RuntimeException("closing last", ie);
            }
            throw new RuntimeException(ie);
        }
    }

    protected void readBlock(BlockMetadata blockMetadata) throws IOException {
        ReaderContext.Entity entity;
        this.readerContext.initialize(this.stream, blockMetadata, this.consecutiveBlock);
        while ((entity = this.readerContext.next()) != null) {
            this.counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
            this.bytesRead += entity.getUsedBytes();
            R record = this.convertToRecord(entity.getRecord());
            if (record == null) continue;
            this.counters.getCounter(ReaderCounterKeys.RECORDS).increment();
            this.messages.emit(new ReaderRecord<R>(blockMetadata.getBlockId(), record));
        }
    }

    public Collection<Partitioner.Partition<AbstractBlockReader<R, B, STREAM>>> definePartitions(Collection<Partitioner.Partition<AbstractBlockReader<R, B, STREAM>>> partitions, Partitioner.PartitioningContext context) {
        if (partitions.iterator().next().getStats() == null) {
            return partitions;
        }
        ArrayList newPartitions = Lists.newArrayList();
        for (Partitioner.Partition<AbstractBlockReader<R, B, STREAM>> partition : partitions) {
            newPartitions.add(new DefaultPartition(partition.getPartitionedInstance()));
        }
        partitions.clear();
        int morePartitionsToCreate = this.partitionCount - newPartitions.size();
        ArrayList deletedCounters = Lists.newArrayList();
        if (morePartitionsToCreate < 0) {
            Iterator partitionIterator = newPartitions.iterator();
            while (morePartitionsToCreate++ < 0) {
                Partitioner.Partition toRemove = (Partitioner.Partition)partitionIterator.next();
                deletedCounters.add(((AbstractBlockReader)((Object)toRemove.getPartitionedInstance())).counters);
                LOG.debug("partition removed {}", (Object)((AbstractBlockReader)((Object)toRemove.getPartitionedInstance())).operatorId);
                partitionIterator.remove();
            }
        } else {
            KryoCloneUtils<AbstractBlockReader> cloneUtils = KryoCloneUtils.createCloneUtils(this);
            while (morePartitionsToCreate-- > 0) {
                DefaultPartition partition = new DefaultPartition((Object)cloneUtils.getClone());
                newPartitions.add(partition);
            }
        }
        DefaultPartition.assignPartitionKeys(Collections.unmodifiableCollection(newPartitions), this.blocksMetadataInput);
        int lPartitionMask = ((Partitioner.PartitionKeys)((Partitioner.Partition)newPartitions.iterator().next()).getPartitionKeys().get(this.blocksMetadataInput)).mask;
        for (Partitioner.Partition newPartition : newPartitions) {
            AbstractBlockReader reader = (AbstractBlockReader)((Object)newPartition.getPartitionedInstance());
            reader.partitionKeys = ((Partitioner.PartitionKeys)newPartition.getPartitionKeys().get(this.blocksMetadataInput)).partitions;
            reader.partitionMask = lPartitionMask;
            LOG.debug("partitions {},{}", reader.partitionKeys, (Object)reader.partitionMask);
        }
        AbstractBlockReader targetReader = (AbstractBlockReader)((Object)((Partitioner.Partition)newPartitions.iterator().next()).getPartitionedInstance());
        for (BasicCounters removedCounter : deletedCounters) {
            this.addCounters(targetReader.counters, removedCounter);
        }
        return newPartitions;
    }

    protected void addCounters(BasicCounters<MutableLong> target, BasicCounters<MutableLong> source) {
        for (ReaderCounterKeys key : ReaderCounterKeys.values()) {
            MutableLong scounter;
            MutableLong tcounter = target.getCounter(key);
            if (tcounter == null) {
                tcounter = new MutableLong();
                target.setCounter(key, tcounter);
            }
            if ((scounter = source.getCounter(key)) == null) continue;
            tcounter.add(scounter.longValue());
        }
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractBlockReader<R, B, STREAM>>> integerPartitionMap) {
    }

    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats) {
        int newPartitionCount;
        this.response.repartitionRequired = false;
        if (!this.collectStats) {
            return this.response;
        }
        List lastWindowedStats = stats.getLastWindowedStats();
        if (lastWindowedStats != null && lastWindowedStats.size() > 0) {
            Stats.OperatorStats lastStats = (Stats.OperatorStats)lastWindowedStats.get(lastWindowedStats.size() - 1);
            if (lastStats.inputPorts.size() > 0) {
                this.backlogPerOperator.put(stats.getOperatorId(), ((Stats.OperatorStats.PortStats)lastStats.inputPorts.get((int)0)).queueSize);
            }
        }
        if (System.currentTimeMillis() < this.nextMillis) {
            return this.response;
        }
        this.nextMillis = System.currentTimeMillis() + this.intervalMillis;
        LOG.debug("Proposed NextMillis = {}", (Object)this.nextMillis);
        long totalBacklog = 0L;
        for (Map.Entry<Integer, Integer> backlog : this.backlogPerOperator.entrySet()) {
            totalBacklog += (long)backlog.getValue().intValue();
        }
        LOG.debug("backlog {} partitionCount {}", (Object)totalBacklog, (Object)this.partitionCount);
        this.backlogPerOperator.clear();
        if (totalBacklog == (long)this.partitionCount) {
            return this.response;
        }
        if (totalBacklog > (long)this.maxReaders) {
            LOG.debug("large backlog {}", (Object)totalBacklog);
            newPartitionCount = this.maxReaders;
        } else if (totalBacklog < (long)this.minReaders) {
            LOG.debug("small backlog {}", (Object)totalBacklog);
            newPartitionCount = this.minReaders;
        } else {
            newPartitionCount = this.getAdjustedCount(totalBacklog);
            LOG.debug("moderate backlog {}", (Object)totalBacklog);
        }
        LOG.debug("backlog {} newPartitionCount {} partitionCount {}", new Object[]{totalBacklog, newPartitionCount, this.partitionCount});
        if (newPartitionCount == this.partitionCount) {
            return this.response;
        }
        this.partitionCount = newPartitionCount;
        this.response.repartitionRequired = true;
        LOG.debug("partition required", (Object)totalBacklog, (Object)this.partitionCount);
        return this.response;
    }

    protected int getAdjustedCount(long newCount) {
        int adjustCount = 1;
        while ((long)adjustCount < newCount) {
            adjustCount <<= 1;
        }
        if ((long)adjustCount > newCount) {
            adjustCount >>>= 1;
        }
        LOG.debug("adjust {} => {}", (Object)newCount, (Object)adjustCount);
        return adjustCount;
    }

    protected abstract STREAM setupStream(B var1) throws IOException;

    protected void teardownStream(B block) throws IOException {
        if (this.stream != null) {
            ((InputStream)this.stream).close();
            this.stream = null;
        }
    }

    protected abstract R convertToRecord(byte[] var1);

    public void setMaxReaders(int maxReaders) {
        this.maxReaders = maxReaders;
    }

    public void setMinReaders(int minReaders) {
        this.minReaders = minReaders;
    }

    public int getMaxReaders() {
        return this.maxReaders;
    }

    public int getMinReaders() {
        return this.minReaders;
    }

    public void setCollectStats(boolean collectStats) {
        this.collectStats = collectStats;
    }

    public boolean isCollectStats() {
        return this.collectStats;
    }

    public void setIntervalMillis(long intervalMillis) {
        this.intervalMillis = intervalMillis;
    }

    public long getIntervalMillis() {
        return this.intervalMillis;
    }

    public void setReaderContext(ReaderContext<STREAM> readerContext) {
        this.readerContext = readerContext;
    }

    public ReaderContext<STREAM> getReaderContext() {
        return this.readerContext;
    }

    public String toString() {
        return "Reader{nextMillis=" + this.nextMillis + ", intervalMillis=" + this.intervalMillis + '}';
    }

    public static enum ReaderCounterKeys {
        RECORDS,
        BLOCKS,
        BYTES,
        TIME;

    }

    public static class ReaderRecord<R> {
        private final long blockId;
        private final R record;

        private ReaderRecord() {
            this.blockId = -1L;
            this.record = null;
        }

        public ReaderRecord(long blockId, R record) {
            this.blockId = blockId;
            this.record = record;
        }

        public long getBlockId() {
            return this.blockId;
        }

        public R getRecord() {
            return this.record;
        }
    }
}

