/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.block;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.block.AbstractBlockReader;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.datatorrent.netlet.util.Slice;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.mutable.MutableLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlockWriter
extends AbstractFileOutputOperator<AbstractBlockReader.ReaderRecord<Slice>>
implements Partitioner<BlockWriter> {
    public static final String DEFAULT_BLOCKS_DIR = "blocks";
    private String blocksDirectory = "blocks";
    private transient List<BlockMetadata.FileBlockMetadata> blockMetadatas;
    public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blockMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>(){

        public void process(BlockMetadata.FileBlockMetadata blockMetadata) {
            BlockWriter.this.blockMetadatas.add(blockMetadata);
            LOG.debug("received blockId {} for file {} ", (Object)blockMetadata.getBlockId(), (Object)blockMetadata.getFilePath());
        }
    };
    public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blockMetadataOutput = new DefaultOutputPort();
    private static final Logger LOG = LoggerFactory.getLogger(BlockWriter.class);

    public BlockWriter() {
        this.blockMetadatas = Lists.newArrayList();
        this.filePath = "";
    }

    @Override
    public void setup(Context.OperatorContext context) {
        this.filePath = (String)context.getValue(Context.DAGContext.APPLICATION_PATH) + "/" + this.blocksDirectory;
        super.setup(context);
    }

    @Override
    public void endWindow() {
        super.endWindow();
        this.streamsCache.asMap().clear();
        this.endOffsets.clear();
        for (BlockMetadata.FileBlockMetadata blockMetadata : this.blockMetadatas) {
            try {
                this.finalizeFile(Long.toString(blockMetadata.getBlockId()));
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            this.blockMetadataOutput.emit((Object)blockMetadata);
        }
        this.blockMetadatas.clear();
    }

    @Override
    protected String getFileName(AbstractBlockReader.ReaderRecord<Slice> tuple) {
        return Long.toString(tuple.getBlockId());
    }

    @Override
    protected byte[] getBytesForTuple(AbstractBlockReader.ReaderRecord<Slice> tuple) {
        return tuple.getRecord().buffer;
    }

    public Collection<Partitioner.Partition<BlockWriter>> definePartitions(Collection<Partitioner.Partition<BlockWriter>> partitions, Partitioner.PartitioningContext context) {
        if (context.getParallelPartitionCount() == 0) {
            return partitions;
        }
        if (context.getParallelPartitionCount() == partitions.size()) {
            LOG.debug("no change is partition count: " + partitions.size());
            return partitions;
        }
        ArrayList deletedCounters = Lists.newArrayList();
        LOG.debug("block writer parallel partition count {}", (Object)context.getParallelPartitionCount());
        int morePartitionsToCreate = context.getParallelPartitionCount() - partitions.size();
        if (morePartitionsToCreate < 0) {
            Iterator<Partitioner.Partition<BlockWriter>> partitionIterator = partitions.iterator();
            while (morePartitionsToCreate++ < 0) {
                Partitioner.Partition<BlockWriter> toRemove = partitionIterator.next();
                deletedCounters.add(((BlockWriter)((Object)toRemove.getPartitionedInstance())).fileCounters);
                partitionIterator.remove();
            }
        } else {
            BlockWriter anOperator = (BlockWriter)((Object)partitions.iterator().next().getPartitionedInstance());
            while (morePartitionsToCreate-- > 0) {
                DefaultPartition partition = new DefaultPartition((Object)anOperator);
                partitions.add((Partitioner.Partition<BlockWriter>)partition);
            }
        }
        BlockWriter targetWriter = (BlockWriter)((Object)partitions.iterator().next().getPartitionedInstance());
        for (BasicCounters removedCounter : deletedCounters) {
            this.addCounters(targetWriter.fileCounters, removedCounter);
        }
        LOG.debug("Block writers {}", (Object)partitions.size());
        return partitions;
    }

    protected void addCounters(BasicCounters<MutableLong> target, BasicCounters<MutableLong> source) {
        for (AbstractFileOutputOperator.Counters key : AbstractFileOutputOperator.Counters.values()) {
            MutableLong scounter;
            MutableLong tcounter = target.getCounter(key);
            if (tcounter == null) {
                tcounter = new MutableLong();
                target.setCounter(key, tcounter);
            }
            if ((scounter = source.getCounter(key)) == null) continue;
            tcounter.add(scounter.longValue());
        }
    }

    public String getBlocksDirectory() {
        return this.blocksDirectory;
    }

    public void setBlocksDirectory(String blocksDirectory) {
        this.blocksDirectory = blocksDirectory;
    }

    public void partitioned(Map<Integer, Partitioner.Partition<BlockWriter>> partitions) {
    }
}

