/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@OperatorAnnotation(checkpointableWithinAppWindow=false)
@Deprecated
public class FileSplitter
implements InputOperator,
Operator.CheckpointListener {
    protected Long blockSize;
    private int sequenceNo;
    @Min(value=1L)
    protected int blocksThreshold;
    protected transient long blockCount;
    protected Iterator<BlockMetadata.FileBlockMetadata> blockMetadataIterator;
    @NotNull
    protected TimeBasedDirectoryScanner scanner;
    @NotNull
    protected WindowDataManager windowDataManager;
    @NotNull
    protected final transient LinkedList<FileInfo> currentWindowRecoveryState;
    protected transient FileSystem fs;
    protected transient int operatorId;
    protected transient Context.OperatorContext context;
    protected transient long currentWindowId;
    protected final BasicCounters<MutableLong> fileCounters;
    public final transient DefaultOutputPort<FileMetadata> filesMetadataOutput = new DefaultOutputPort();
    public final transient DefaultOutputPort<BlockMetadata.FileBlockMetadata> blocksMetadataOutput = new DefaultOutputPort();
    private static final Logger LOG = LoggerFactory.getLogger(FileSplitter.class);

    public FileSplitter() {
        this.currentWindowRecoveryState = Lists.newLinkedList();
        this.fileCounters = new BasicCounters<MutableLong>(MutableLong.class);
        this.windowDataManager = new WindowDataManager.NoopWindowDataManager();
        this.scanner = new TimeBasedDirectoryScanner();
        this.blocksThreshold = Integer.MAX_VALUE;
    }

    public void setup(Context.OperatorContext context) {
        Preconditions.checkArgument((!this.scanner.files.isEmpty() ? 1 : 0) != 0, (Object)"empty files");
        Preconditions.checkArgument((this.blockSize == null || this.blockSize > 0L ? 1 : 0) != 0, (Object)"invalid block size");
        this.operatorId = context.getId();
        this.context = context;
        this.fileCounters.setCounter(Counters.PROCESSED_FILES, new MutableLong());
        this.windowDataManager.setup((Context)context);
        try {
            this.fs = this.scanner.getFSInstance();
        }
        catch (IOException e) {
            throw new RuntimeException("creating fs", e);
        }
        if (this.blockSize == null) {
            this.blockSize = this.fs.getDefaultBlockSize(new Path(this.scanner.files.iterator().next()));
        }
        if ((Long)context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < this.windowDataManager.getLargestRecoveryWindow()) {
            this.blockMetadataIterator = null;
        } else {
            this.scanner.setup(context);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void teardown() {
        try {
            this.scanner.teardown();
        }
        catch (Throwable t) {
            DTThrowable.rethrow((Throwable)t);
        }
        finally {
            try {
                this.fs.close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void beginWindow(long windowId) {
        this.blockCount = 0L;
        this.currentWindowId = windowId;
        if (windowId <= this.windowDataManager.getLargestRecoveryWindow()) {
            this.replay(windowId);
        }
    }

    protected void replay(long windowId) {
        try {
            LinkedList recoveredData = (LinkedList)this.windowDataManager.load(this.operatorId, windowId);
            if (recoveredData == null) {
                return;
            }
            if (this.blockMetadataIterator != null) {
                this.emitBlockMetadata();
            }
            for (FileInfo info : recoveredData) {
                if (info.directoryPath != null) {
                    this.scanner.lastModifiedTimes.put(info.directoryPath, info.modifiedTime);
                } else {
                    this.scanner.lastModifiedTimes.put(info.relativeFilePath, info.modifiedTime);
                }
                FileMetadata fileMetadata = this.buildFileMetadata(info);
                this.fileCounters.getCounter(Counters.PROCESSED_FILES).increment();
                this.filesMetadataOutput.emit((Object)fileMetadata);
                this.blockMetadataIterator = new BlockMetadataIterator(this, fileMetadata, this.blockSize);
                if (this.emitBlockMetadata()) continue;
                break;
            }
            if (windowId == this.windowDataManager.getLargestRecoveryWindow()) {
                this.scanner.setup(this.context);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("replay", e);
        }
    }

    public void emitTuples() {
        FileInfo fileInfo;
        if (this.currentWindowId <= this.windowDataManager.getLargestRecoveryWindow()) {
            return;
        }
        Throwable throwable = this.scanner.atomicThrowable.get();
        if (throwable != null) {
            DTThrowable.rethrow((Throwable)throwable);
        }
        if (this.blockMetadataIterator != null && this.blockCount < (long)this.blocksThreshold) {
            this.emitBlockMetadata();
        }
        while (this.blockCount < (long)this.blocksThreshold && (fileInfo = this.scanner.pollFile()) != null) {
            this.currentWindowRecoveryState.add(fileInfo);
            try {
                FileMetadata fileMetadata = this.buildFileMetadata(fileInfo);
                this.filesMetadataOutput.emit((Object)fileMetadata);
                this.fileCounters.getCounter(Counters.PROCESSED_FILES).increment();
                if (!fileMetadata.isDirectory()) {
                    this.blockMetadataIterator = new BlockMetadataIterator(this, fileMetadata, this.blockSize);
                    if (!this.emitBlockMetadata()) break;
                }
                if (!fileInfo.lastFileOfScan) continue;
                break;
            }
            catch (IOException e) {
                throw new RuntimeException("creating metadata", e);
            }
        }
    }

    public void endWindow() {
        if (this.currentWindowId > this.windowDataManager.getLargestRecoveryWindow()) {
            try {
                this.windowDataManager.save(this.currentWindowRecoveryState, this.operatorId, this.currentWindowId);
            }
            catch (IOException e) {
                throw new RuntimeException("saving recovery", e);
            }
        }
        this.currentWindowRecoveryState.clear();
        this.context.setCounters(this.fileCounters);
    }

    protected boolean emitBlockMetadata() {
        while (this.blockMetadataIterator.hasNext()) {
            if (this.blockCount++ < (long)this.blocksThreshold) {
                this.blocksMetadataOutput.emit((Object)this.blockMetadataIterator.next());
                continue;
            }
            return false;
        }
        this.blockMetadataIterator = null;
        return true;
    }

    protected BlockMetadata.FileBlockMetadata createBlockMetadata(long pos, long lengthOfFileInBlock, int blockNumber, FileMetadata fileMetadata, boolean isLast) {
        return new BlockMetadata.FileBlockMetadata(fileMetadata.getFilePath(), fileMetadata.getBlockIds()[blockNumber - 1], pos, lengthOfFileInBlock, isLast, blockNumber == 1 ? -1L : fileMetadata.getBlockIds()[blockNumber - 2]);
    }

    protected FileMetadata buildFileMetadata(FileInfo fileInfo) throws IOException {
        String filePathStr = fileInfo.getFilePath();
        LOG.debug("file {}", (Object)filePathStr);
        FileMetadata fileMetadata = new FileMetadata(filePathStr);
        Path path = new Path(filePathStr);
        fileMetadata.setFileName(path.getName());
        FileStatus status = this.fs.getFileStatus(path);
        fileMetadata.setDirectory(status.isDirectory());
        fileMetadata.setFileLength(status.getLen());
        if (!status.isDirectory()) {
            int noOfBlocks = (int)(status.getLen() / this.blockSize + (long)(status.getLen() % this.blockSize == 0L ? 0 : 1));
            if (fileMetadata.getDataOffset() >= status.getLen()) {
                noOfBlocks = 0;
            }
            fileMetadata.setNumberOfBlocks(noOfBlocks);
            this.populateBlockIds(fileMetadata);
        }
        return fileMetadata;
    }

    protected void populateBlockIds(FileMetadata fileMetadata) {
        long[] blockIds = new long[fileMetadata.getNumberOfBlocks()];
        long longLeftSide = (long)this.operatorId << 32;
        for (int i = 0; i < fileMetadata.getNumberOfBlocks(); ++i) {
            blockIds[i] = longLeftSide | (long)this.sequenceNo++ & 0xFFFFFFFFL;
        }
        fileMetadata.setBlockIds(blockIds);
    }

    public void setBlockSize(Long blockSize) {
        this.blockSize = blockSize;
    }

    public Long getBlockSize() {
        return this.blockSize;
    }

    public void setBlocksThreshold(int threshold) {
        this.blocksThreshold = threshold;
    }

    public int getBlocksThreshold() {
        return this.blocksThreshold;
    }

    public void setScanner(TimeBasedDirectoryScanner scanner) {
        this.scanner = scanner;
    }

    public TimeBasedDirectoryScanner getScanner() {
        return this.scanner;
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void checkpointed(long l) {
    }

    public void committed(long l) {
        try {
            this.windowDataManager.deleteUpTo(this.operatorId, l);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static enum Counters {
        PROCESSED_FILES;

    }

    @Deprecated
    protected static class FileInfo {
        protected final String directoryPath;
        protected final String relativeFilePath;
        protected final long modifiedTime;
        protected transient boolean lastFileOfScan;

        private FileInfo() {
            this.directoryPath = null;
            this.relativeFilePath = null;
            this.modifiedTime = -1L;
        }

        protected FileInfo(@Nullable String directoryPath, @NotNull String relativeFilePath, long modifiedTime) {
            this.directoryPath = directoryPath;
            this.relativeFilePath = relativeFilePath;
            this.modifiedTime = modifiedTime;
        }

        public String getDirectoryPath() {
            return this.directoryPath;
        }

        public String getRelativeFilePath() {
            return this.relativeFilePath;
        }

        public String getFilePath() {
            if (this.directoryPath == null) {
                return this.relativeFilePath;
            }
            return new Path(this.directoryPath, this.relativeFilePath).toUri().getPath();
        }

        public boolean isLastFileOfScan() {
            return this.lastFileOfScan;
        }
    }

    @Deprecated
    public static class TimeBasedDirectoryScanner
    implements Component<Context.OperatorContext>,
    Runnable {
        private static long DEF_SCAN_INTERVAL_MILLIS = 5000L;
        protected boolean recursive = true;
        protected volatile transient boolean trigger;
        @NotNull
        protected final Map<String, Long> lastModifiedTimes = Maps.newHashMap();
        @NotNull
        protected final Set<String> files;
        @Min(value=0L)
        protected long scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS;
        private String filePatternRegularExp;
        protected transient long lastScanMillis;
        protected transient FileSystem fs;
        protected final transient LinkedBlockingDeque<FileInfo> discoveredFiles;
        protected final transient ExecutorService scanService;
        protected final transient AtomicReference<Throwable> atomicThrowable;
        private volatile transient boolean running;
        protected final transient HashSet<String> ignoredFiles;
        protected transient Pattern regex;
        protected transient long sleepMillis;

        public TimeBasedDirectoryScanner() {
            this.files = Sets.newLinkedHashSet();
            this.scanService = Executors.newSingleThreadExecutor();
            this.discoveredFiles = new LinkedBlockingDeque();
            this.atomicThrowable = new AtomicReference();
            this.ignoredFiles = Sets.newHashSet();
        }

        public void setup(Context.OperatorContext context) {
            this.sleepMillis = ((Integer)context.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
            if (this.filePatternRegularExp != null) {
                this.regex = Pattern.compile(this.filePatternRegularExp);
            }
            try {
                this.fs = this.getFSInstance();
            }
            catch (IOException e) {
                throw new RuntimeException("opening fs", e);
            }
            this.scanService.submit(this);
        }

        public void teardown() {
            this.running = false;
            this.scanService.shutdownNow();
            try {
                this.fs.close();
            }
            catch (IOException e) {
                throw new RuntimeException("closing fs", e);
            }
        }

        protected FileSystem getFSInstance() throws IOException {
            return FileSystem.newInstance((URI)new Path(this.files.iterator().next()).toUri(), (Configuration)new Configuration());
        }

        @Override
        public void run() {
            this.running = true;
            try {
                while (this.running) {
                    if (this.trigger || System.currentTimeMillis() - this.scanIntervalMillis >= this.lastScanMillis) {
                        this.trigger = false;
                        for (String afile : this.files) {
                            this.scan(new Path(afile), null);
                        }
                        this.scanComplete();
                        continue;
                    }
                    Thread.sleep(this.sleepMillis);
                }
            }
            catch (Throwable throwable) {
                LOG.error("service", throwable);
                this.running = false;
                this.atomicThrowable.set(throwable);
                DTThrowable.rethrow((Throwable)throwable);
            }
        }

        protected void scanComplete() {
            LOG.debug("scan complete {}", (Object)this.lastScanMillis);
            FileInfo fileInfo = this.discoveredFiles.peekLast();
            if (fileInfo != null) {
                fileInfo.lastFileOfScan = true;
            }
            this.lastScanMillis = System.currentTimeMillis();
        }

        protected void scan(@NotNull Path filePath, Path rootPath) {
            try {
                FileStatus[] childStatuses;
                FileStatus parentStatus = this.fs.getFileStatus(filePath);
                String parentPathStr = filePath.toUri().getPath();
                LOG.debug("scan {}", (Object)parentPathStr);
                Long oldModificationTime = this.lastModifiedTimes.get(parentPathStr);
                this.lastModifiedTimes.put(parentPathStr, parentStatus.getModificationTime());
                if (this.skipFile(filePath, parentStatus.getModificationTime(), oldModificationTime)) {
                    return;
                }
                LOG.debug("scan {}", (Object)filePath.toUri().getPath());
                for (FileStatus status : childStatuses = this.fs.listStatus(filePath)) {
                    Path childPath = status.getPath();
                    String childPathStr = childPath.toUri().getPath();
                    if (this.skipFile(childPath, status.getModificationTime(), oldModificationTime)) continue;
                    if (status.isDirectory() && this.recursive) {
                        this.scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath);
                    }
                    if (this.ignoredFiles.contains(childPathStr)) continue;
                    if (this.acceptFile(childPathStr)) {
                        FileInfo info;
                        LOG.debug("found {}", (Object)childPathStr);
                        if (rootPath == null) {
                            info = parentStatus.isDirectory() ? new FileInfo(parentPathStr, childPath.getName(), parentStatus.getModificationTime()) : new FileInfo(null, childPathStr, parentStatus.getModificationTime());
                        } else {
                            URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri());
                            info = new FileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(), parentStatus.getModificationTime());
                        }
                        this.discoveredFiles.add(info);
                        continue;
                    }
                    this.ignoredFiles.add(childPathStr);
                }
            }
            catch (FileNotFoundException fnf) {
                LOG.warn("Failed to list directory {}", (Object)filePath, (Object)fnf);
            }
            catch (IOException e) {
                throw new RuntimeException("listing files", e);
            }
        }

        protected boolean skipFile(@NotNull Path path, @NotNull Long modificationTime, Long lastModificationTime) throws IOException {
            return lastModificationTime != null && modificationTime <= lastModificationTime;
        }

        protected boolean acceptFile(String filePathStr) {
            Matcher matcher;
            return this.regex == null || (matcher = this.regex.matcher(filePathStr)).matches();
        }

        public FileInfo pollFile() {
            return this.discoveredFiles.poll();
        }

        public String getFilePatternRegularExp() {
            return this.filePatternRegularExp;
        }

        public void setFilePatternRegularExp(String filePatternRegexp) {
            this.filePatternRegularExp = filePatternRegexp;
        }

        public void setFiles(String files) {
            Iterables.addAll(this.files, (Iterable)Splitter.on((String)",").omitEmptyStrings().split((CharSequence)files));
        }

        public String getFiles() {
            return Joiner.on((String)",").join(this.files);
        }

        public void setRecursive(boolean recursive) {
            this.recursive = recursive;
        }

        public boolean isRecursive() {
            return this.recursive;
        }

        public void setTrigger(boolean trigger) {
            this.trigger = trigger;
        }

        public boolean isTrigger() {
            return this.trigger;
        }

        public long getScanIntervalMillis() {
            return this.scanIntervalMillis;
        }

        public void setScanIntervalMillis(long scanIntervalMillis) {
            this.scanIntervalMillis = scanIntervalMillis;
        }
    }

    @Deprecated
    public static class FileMetadata {
        @NotNull
        private String filePath;
        private String fileName;
        private int numberOfBlocks;
        private long dataOffset;
        private long fileLength;
        private long discoverTime;
        private long[] blockIds;
        private boolean isDirectory;

        protected FileMetadata() {
            this.filePath = null;
            this.discoverTime = System.currentTimeMillis();
        }

        public FileMetadata(@NotNull String filePath) {
            this.filePath = filePath;
            this.discoverTime = System.currentTimeMillis();
        }

        public int getNumberOfBlocks() {
            return this.numberOfBlocks;
        }

        public void setNumberOfBlocks(int numberOfBlocks) {
            this.numberOfBlocks = numberOfBlocks;
        }

        public String getFileName() {
            return this.fileName;
        }

        public void setFileName(String fileName) {
            this.fileName = fileName;
        }

        public void setFilePath(String filePath) {
            this.filePath = filePath;
        }

        public String getFilePath() {
            return this.filePath;
        }

        public long getDataOffset() {
            return this.dataOffset;
        }

        public void setDataOffset(long offset) {
            this.dataOffset = offset;
        }

        public long getFileLength() {
            return this.fileLength;
        }

        public void setFileLength(long fileLength) {
            this.fileLength = fileLength;
        }

        public long getDiscoverTime() {
            return this.discoverTime;
        }

        public void setDiscoverTime(long discoverTime) {
            this.discoverTime = discoverTime;
        }

        public long[] getBlockIds() {
            return this.blockIds;
        }

        public void setBlockIds(long[] blockIds) {
            this.blockIds = blockIds;
        }

        public void setDirectory(boolean isDirectory) {
            this.isDirectory = isDirectory;
        }

        public boolean isDirectory() {
            return this.isDirectory;
        }
    }

    public static class BlockMetadataIterator
    implements Iterator<BlockMetadata.FileBlockMetadata> {
        private final FileMetadata fileMetadata;
        private final long blockSize;
        private long pos;
        private int blockNumber;
        private final FileSplitter splitter;

        protected BlockMetadataIterator() {
            this.fileMetadata = null;
            this.blockSize = -1L;
            this.splitter = null;
        }

        public BlockMetadataIterator(FileSplitter splitter, FileMetadata fileMetadata, long blockSize) {
            this.splitter = splitter;
            this.fileMetadata = fileMetadata;
            this.blockSize = blockSize;
            this.pos = fileMetadata.getDataOffset();
            this.blockNumber = 0;
        }

        @Override
        public boolean hasNext() {
            return this.pos < this.fileMetadata.getFileLength();
        }

        @Override
        public BlockMetadata.FileBlockMetadata next() {
            long length;
            while ((length = this.blockSize * (long)(++this.blockNumber)) <= this.pos) {
            }
            boolean isLast = length >= this.fileMetadata.getFileLength();
            long lengthOfFileInBlock = isLast ? this.fileMetadata.getFileLength() : length;
            BlockMetadata.FileBlockMetadata fileBlock = this.splitter.createBlockMetadata(this.pos, lengthOfFileInBlock, this.blockNumber, this.fileMetadata, isLast);
            this.pos = lengthOfFileInBlock;
            return fileBlock;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("remove not supported");
        }
    }
}

