/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.fs.AbstractFileSplitter;
import com.datatorrent.lib.io.fs.FileStitcher;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Synchronizer
extends BaseOperator {
    private Map<String, AbstractFileSplitter.FileMetadata> fileMetadataMap = Maps.newHashMap();
    private Map<String, Map<Long, BlockMetadata.FileBlockMetadata>> fileToReceivedBlocksMetadataMap = Maps.newHashMap();
    public final transient DefaultInputPort<AbstractFileSplitter.FileMetadata> filesMetadataInput = new DefaultInputPort<AbstractFileSplitter.FileMetadata>(){

        public void process(AbstractFileSplitter.FileMetadata fileMetadata) {
            String filePath = fileMetadata.getFilePath();
            Map receivedBlocksMetadata = Synchronizer.this.getReceivedBlocksMetadata(filePath);
            Synchronizer.this.fileMetadataMap.put(filePath, fileMetadata);
            Synchronizer.this.emitTriggerIfAllBlocksReceived(fileMetadata, receivedBlocksMetadata);
        }
    };
    public final transient DefaultInputPort<BlockMetadata.FileBlockMetadata> blocksMetadataInput = new DefaultInputPort<BlockMetadata.FileBlockMetadata>(){

        public void process(BlockMetadata.FileBlockMetadata blockMetadata) {
            String filePath = blockMetadata.getFilePath();
            LOG.debug("received blockId {} for file {}", (Object)blockMetadata.getBlockId(), (Object)filePath);
            Map receivedBlocksMetadata = Synchronizer.this.getReceivedBlocksMetadata(filePath);
            receivedBlocksMetadata.put(blockMetadata.getBlockId(), blockMetadata);
            AbstractFileSplitter.FileMetadata fileMetadata = (AbstractFileSplitter.FileMetadata)Synchronizer.this.fileMetadataMap.get(filePath);
            if (fileMetadata != null) {
                Synchronizer.this.emitTriggerIfAllBlocksReceived(fileMetadata, receivedBlocksMetadata);
            }
        }
    };
    private static final Logger LOG = LoggerFactory.getLogger(Synchronizer.class);
    public final transient DefaultOutputPort<OutputFileMetadata> trigger = new DefaultOutputPort();

    public void setup(Context.OperatorContext context) {
        super.setup(context);
    }

    public void beginWindow(long windowId) {
        super.beginWindow(windowId);
    }

    public void endWindow() {
    }

    private void emitTriggerIfAllBlocksReceived(AbstractFileSplitter.FileMetadata fileMetadata, Map<Long, BlockMetadata.FileBlockMetadata> receivedBlocksMetadata) {
        String filePath = fileMetadata.getFilePath();
        if (receivedBlocksMetadata.size() != fileMetadata.getNumberOfBlocks()) {
            this.fileMetadataMap.put(filePath, fileMetadata);
        } else {
            Set<Long> receivedBlocks = receivedBlocksMetadata.keySet();
            boolean blockMissing = false;
            if (!fileMetadata.isDirectory()) {
                for (long blockId : fileMetadata.getBlockIds()) {
                    if (receivedBlocks.contains(blockId)) continue;
                    blockMissing = true;
                }
            }
            if (!blockMissing) {
                long fileProcessingTime = System.currentTimeMillis() - fileMetadata.getDiscoverTime();
                List<StitchBlock> outputBlocks = this.constructOutputBlockMetadataList(fileMetadata);
                OutputFileMetadata outputFileMetadata = new OutputFileMetadata(fileMetadata, outputBlocks);
                this.trigger.emit((Object)outputFileMetadata);
                LOG.debug("Total time taken to process the file {} is {} ms", (Object)fileMetadata.getFilePath(), (Object)fileProcessingTime);
                this.fileMetadataMap.remove(filePath);
            }
        }
    }

    private List<StitchBlock> constructOutputBlockMetadataList(AbstractFileSplitter.FileMetadata fileMetadata) {
        String filePath = fileMetadata.getFilePath();
        Map<Long, BlockMetadata.FileBlockMetadata> receivedBlocksMetadata = this.fileToReceivedBlocksMetadataMap.get(filePath);
        ArrayList outputBlocks = Lists.newArrayList();
        if (fileMetadata.isDirectory()) {
            return outputBlocks;
        }
        long[] blockIDs = fileMetadata.getBlockIds();
        for (int i = 0; i < blockIDs.length; ++i) {
            Long blockId = blockIDs[i];
            StitchBlockMetaData outputFileBlockMetaData = new StitchBlockMetaData(receivedBlocksMetadata.get(blockId), fileMetadata.getRelativePath(), i == blockIDs.length - 1);
            outputBlocks.add(outputFileBlockMetaData);
        }
        return outputBlocks;
    }

    private Map<Long, BlockMetadata.FileBlockMetadata> getReceivedBlocksMetadata(String filePath) {
        Map<Long, BlockMetadata.FileBlockMetadata> receivedBlocksMetadata = this.fileToReceivedBlocksMetadataMap.get(filePath);
        if (receivedBlocksMetadata == null) {
            receivedBlocksMetadata = new HashMap<Long, BlockMetadata.FileBlockMetadata>();
            this.fileToReceivedBlocksMetadataMap.put(filePath, receivedBlocksMetadata);
        }
        return receivedBlocksMetadata;
    }

    public static class StitchBlockMetaData
    extends BlockMetadata.FileBlockMetadata
    implements StitchBlock {
        public static final int BUFFER_SIZE = 65536;
        String sourceRelativePath;
        boolean isLastBlockSource;

        public StitchBlockMetaData() {
        }

        public StitchBlockMetaData(BlockMetadata.FileBlockMetadata fmd, String sourceRelativePath, boolean isLastBlockSource) {
            super(fmd.getFilePath(), fmd.getBlockId(), fmd.getOffset(), fmd.getLength(), fmd.isLastBlock(), fmd.getPreviousBlockId());
            this.sourceRelativePath = sourceRelativePath;
            this.isLastBlockSource = isLastBlockSource;
        }

        public String getSourceRelativePath() {
            return this.sourceRelativePath;
        }

        public void setSourceRelativePath(String sourceRelativePath) {
            this.sourceRelativePath = sourceRelativePath;
        }

        public boolean isLastBlockSource() {
            return this.isLastBlockSource;
        }

        public void setLastBlockSource(boolean isLastBlockSource) {
            this.isLastBlockSource = isLastBlockSource;
        }

        @Override
        public void writeTo(FileSystem appFS, String blocksDir, OutputStream outputStream) throws IOException, FileStitcher.BlockNotFoundException {
            Path blockPath = new Path(blocksDir, Long.toString(this.getBlockId()));
            if (!appFS.exists(blockPath)) {
                throw new FileStitcher.BlockNotFoundException(blockPath);
            }
            this.writeTo(appFS, blocksDir, outputStream, 0L, appFS.getFileStatus(blockPath).getLen());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void writeTo(FileSystem appFS, String blocksDir, OutputStream outputStream, long offset, long length) throws IOException, FileStitcher.BlockNotFoundException {
            FSDataInputStream inStream = null;
            byte[] buffer = new byte[65536];
            Path blockPath = new Path(blocksDir, Long.toString(this.getBlockId()));
            if (!appFS.exists(blockPath)) {
                throw new FileStitcher.BlockNotFoundException(blockPath);
            }
            inStream = appFS.open(blockPath);
            try {
                int inputBytesRead;
                inStream.skip(offset);
                long bytesRemainingToRead = length;
                int bytesToread = Math.min(65536, (int)bytesRemainingToRead);
                while ((inputBytesRead = inStream.read(buffer, 0, bytesToread)) != -1 && bytesRemainingToRead > 0L) {
                    outputStream.write(buffer, 0, inputBytesRead);
                    bytesToread = Math.min(65536, (int)(bytesRemainingToRead -= (long)inputBytesRead));
                }
            }
            finally {
                inStream.close();
            }
        }
    }

    public static interface StitchBlock {
        public void writeTo(FileSystem var1, String var2, OutputStream var3) throws IOException, FileStitcher.BlockNotFoundException;

        public long getBlockId();
    }

    public static interface StitchedFileMetaData {
        public String getStitchedFileRelativePath();

        public List<StitchBlock> getStitchBlocksList();
    }

    public static class OutputFileMetadata
    extends AbstractFileSplitter.FileMetadata
    implements StitchedFileMetaData {
        private List<StitchBlock> stitchBlocksList;

        protected OutputFileMetadata() {
            this.stitchBlocksList = Lists.newArrayList();
        }

        protected OutputFileMetadata(AbstractFileSplitter.FileMetadata fileMetaData, List<StitchBlock> stitchBlocksList) {
            super(fileMetaData);
            this.stitchBlocksList = stitchBlocksList;
        }

        public OutputFileMetadata(@NotNull String filePath) {
            super(filePath);
        }

        @Override
        public String getStitchedFileRelativePath() {
            return this.getRelativePath();
        }

        @Override
        public List<StitchBlock> getStitchBlocksList() {
            return this.stitchBlocksList;
        }

        public void setOutputBlockMetaDataList(List<StitchBlock> outputBlockMetaDataList) {
            this.stitchBlocksList = outputBlockMetaDataList;
        }
    }
}

