/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.lib.io.fs.AbstractFileSplitter;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@OperatorAnnotation(checkpointableWithinAppWindow=false)
public class FileSplitterInput
extends AbstractFileSplitter
implements InputOperator,
Operator.CheckpointListener {
    @NotNull
    private WindowDataManager windowDataManager;
    @NotNull
    protected final transient LinkedList<ScannedFileInfo> currentWindowRecoveryState = Lists.newLinkedList();
    @Valid
    @NotNull
    private TimeBasedDirectoryScanner scanner;
    @NotNull
    private Map<String, Map<String, Long>> referenceTimes;
    private transient long sleepMillis;
    private static final Logger LOG = LoggerFactory.getLogger(FileSplitterInput.class);

    public FileSplitterInput() {
        this.windowDataManager = new WindowDataManager.NoopWindowDataManager();
        this.referenceTimes = Maps.newHashMap();
        this.scanner = new TimeBasedDirectoryScanner();
    }

    @Override
    public void setup(Context.OperatorContext context) {
        this.sleepMillis = ((Integer)context.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
        this.scanner.setup(context);
        this.windowDataManager.setup((Context)context);
        super.setup(context);
        long largestRecoveryWindow = this.windowDataManager.getLargestRecoveryWindow();
        if (largestRecoveryWindow == -1L || (Long)context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) {
            this.scanner.startScanning(Collections.unmodifiableMap(this.referenceTimes));
        }
    }

    @Override
    public void beginWindow(long windowId) {
        super.beginWindow(windowId);
        if (windowId <= this.windowDataManager.getLargestRecoveryWindow()) {
            this.replay(windowId);
        }
    }

    protected void replay(long windowId) {
        try {
            LinkedList recoveredData = (LinkedList)this.windowDataManager.load(this.operatorId, windowId);
            if (recoveredData == null) {
                return;
            }
            if (this.blockMetadataIterator != null) {
                this.emitBlockMetadata();
            }
            for (ScannedFileInfo info : recoveredData) {
                this.updateReferenceTimes(info);
                AbstractFileSplitter.FileMetadata fileMetadata = this.buildFileMetadata(info);
                this.filesMetadataOutput.emit((Object)fileMetadata);
                this.blockMetadataIterator = new AbstractFileSplitter.BlockMetadataIterator(this, fileMetadata, this.blockSize);
                if (this.emitBlockMetadata()) continue;
                break;
            }
        }
        catch (IOException e) {
            throw new RuntimeException("replay", e);
        }
        if (windowId == this.windowDataManager.getLargestRecoveryWindow()) {
            this.scanner.startScanning(Collections.unmodifiableMap(this.referenceTimes));
        }
    }

    public void emitTuples() {
        if (this.currentWindowId <= this.windowDataManager.getLargestRecoveryWindow()) {
            return;
        }
        Throwable throwable = this.scanner.atomicThrowable.get();
        if (throwable != null) {
            DTThrowable.rethrow((Throwable)throwable);
        }
        if (this.blockMetadataIterator == null && this.scanner.discoveredFiles.isEmpty()) {
            try {
                Thread.sleep(this.sleepMillis);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("waiting for work", e);
            }
        }
        this.process();
    }

    @Override
    protected AbstractFileSplitter.FileInfo getFileInfo() {
        return this.scanner.pollFile();
    }

    @Override
    protected boolean processFileInfo(AbstractFileSplitter.FileInfo fileInfo) {
        ScannedFileInfo scannedFileInfo = (ScannedFileInfo)fileInfo;
        this.currentWindowRecoveryState.add(scannedFileInfo);
        this.updateReferenceTimes(scannedFileInfo);
        return super.processFileInfo(fileInfo);
    }

    protected void updateReferenceTimes(ScannedFileInfo fileInfo) {
        HashMap referenceTimePerInputDir = this.referenceTimes.get(fileInfo.getDirectoryPath());
        if (referenceTimePerInputDir == null) {
            referenceTimePerInputDir = Maps.newHashMap();
        }
        referenceTimePerInputDir.put(fileInfo.getFilePath(), fileInfo.modifiedTime);
        this.referenceTimes.put(fileInfo.getDirectoryPath(), referenceTimePerInputDir);
    }

    public void endWindow() {
        if (this.currentWindowId > this.windowDataManager.getLargestRecoveryWindow()) {
            try {
                this.windowDataManager.save(this.currentWindowRecoveryState, this.operatorId, this.currentWindowId);
            }
            catch (IOException e) {
                throw new RuntimeException("saving recovery", e);
            }
        }
        this.currentWindowRecoveryState.clear();
    }

    @Override
    protected long getDefaultBlockSize() {
        return this.scanner.fs.getDefaultBlockSize(new Path((String)this.scanner.files.iterator().next()));
    }

    @Override
    protected FileStatus getFileStatus(Path path) throws IOException {
        return this.scanner.fs.getFileStatus(path);
    }

    public void checkpointed(long l) {
    }

    public void committed(long l) {
        try {
            this.windowDataManager.deleteUpTo(this.operatorId, l);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void teardown() {
        this.scanner.teardown();
    }

    public void setWindowDataManager(WindowDataManager windowDataManager) {
        this.windowDataManager = windowDataManager;
    }

    public WindowDataManager getWindowDataManager() {
        return this.windowDataManager;
    }

    public void setScanner(TimeBasedDirectoryScanner scanner) {
        this.scanner = scanner;
    }

    public TimeBasedDirectoryScanner getScanner() {
        return this.scanner;
    }

    public static class ScannedFileInfo
    extends AbstractFileSplitter.FileInfo {
        protected final long modifiedTime;

        protected ScannedFileInfo() {
            this.modifiedTime = -1L;
        }

        public ScannedFileInfo(@Nullable String directoryPath, @NotNull String relativeFilePath, long modifiedTime) {
            super(directoryPath, relativeFilePath);
            this.modifiedTime = modifiedTime;
        }

        public long getModifiedTime() {
            return this.modifiedTime;
        }
    }

    public static class TimeBasedDirectoryScanner
    implements Runnable,
    Component<Context.OperatorContext> {
        private static long DEF_SCAN_INTERVAL_MILLIS = 5000L;
        private static String FILE_BEING_COPIED = "_COPYING_";
        private boolean recursive = true;
        private volatile transient boolean trigger;
        @NotNull
        @Size(min=1)
        private final Set<String> files;
        @Min(value=0L)
        private long scanIntervalMillis = DEF_SCAN_INTERVAL_MILLIS;
        private String filePatternRegularExp;
        private String ignoreFilePatternRegularExp;
        protected transient long lastScanMillis;
        protected transient FileSystem fs;
        protected final transient LinkedBlockingDeque<ScannedFileInfo> discoveredFiles;
        protected final transient ExecutorService scanService;
        protected final transient AtomicReference<Throwable> atomicThrowable;
        private volatile transient boolean running;
        protected final transient HashSet<String> ignoredFiles;
        protected transient Pattern regex;
        private transient Pattern ignoreRegex;
        protected transient long sleepMillis;
        protected transient Map<String, Map<String, Long>> referenceTimes;
        private transient ScannedFileInfo lastScannedInfo;
        private transient int numDiscoveredPerIteration;

        public TimeBasedDirectoryScanner() {
            this.files = Sets.newLinkedHashSet();
            this.scanService = Executors.newSingleThreadExecutor();
            this.discoveredFiles = new LinkedBlockingDeque();
            this.atomicThrowable = new AtomicReference();
            this.ignoredFiles = Sets.newHashSet();
        }

        public void setup(Context.OperatorContext context) {
            this.sleepMillis = ((Integer)context.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
            if (this.filePatternRegularExp != null) {
                this.regex = Pattern.compile(this.filePatternRegularExp);
            }
            if (this.ignoreFilePatternRegularExp != null) {
                this.ignoreRegex = Pattern.compile(this.ignoreFilePatternRegularExp);
            }
            try {
                this.fs = this.getFSInstance();
            }
            catch (IOException e) {
                throw new RuntimeException("opening fs", e);
            }
        }

        protected void startScanning(Map<String, Map<String, Long>> referenceTimes) {
            this.referenceTimes = (Map)Preconditions.checkNotNull(referenceTimes);
            this.scanService.submit(this);
        }

        protected void stopScanning() {
            this.running = false;
        }

        public void teardown() {
            this.stopScanning();
            this.scanService.shutdownNow();
            try {
                this.fs.close();
            }
            catch (IOException e) {
                throw new RuntimeException("closing fs", e);
            }
        }

        protected FileSystem getFSInstance() throws IOException {
            return FileSystem.newInstance((URI)new Path(this.files.iterator().next()).toUri(), (Configuration)new Configuration());
        }

        @Override
        public void run() {
            this.running = true;
            try {
                while (this.running) {
                    if ((this.trigger || System.currentTimeMillis() - this.scanIntervalMillis >= this.lastScanMillis) && this.isIterationCompleted()) {
                        this.trigger = false;
                        this.lastScannedInfo = null;
                        this.numDiscoveredPerIteration = 0;
                        for (String afile : this.files) {
                            String filePath = new File(afile).getAbsolutePath();
                            LOG.debug("Scan started for input {}", (Object)filePath);
                            Map<String, Long> lastModifiedTimesForInputDir = this.referenceTimes.get(filePath);
                            this.scan(new Path(afile), null, lastModifiedTimesForInputDir);
                        }
                        this.scanIterationComplete();
                        continue;
                    }
                    Thread.sleep(this.sleepMillis);
                }
            }
            catch (Throwable throwable) {
                LOG.error("service", throwable);
                this.running = false;
                this.atomicThrowable.set(throwable);
                DTThrowable.rethrow((Throwable)throwable);
            }
        }

        private boolean isIterationCompleted() {
            if (this.lastScannedInfo == null) {
                return true;
            }
            Map<String, Long> referenceTime = this.referenceTimes.get(this.lastScannedInfo.getDirectoryPath());
            if (referenceTime != null) {
                return referenceTime.get(this.lastScannedInfo.getFilePath()) != null;
            }
            return false;
        }

        protected void scanIterationComplete() {
            LOG.debug("scan complete {} {}", (Object)this.lastScanMillis, (Object)this.numDiscoveredPerIteration);
            this.lastScanMillis = System.currentTimeMillis();
        }

        protected void scan(@NotNull Path filePath, Path rootPath) {
            Map<String, Long> lastModifiedTimesForInputDir = this.referenceTimes.get(filePath.toUri().getPath());
            this.scan(filePath, rootPath, lastModifiedTimesForInputDir);
        }

        private void scan(Path filePath, Path rootPath, Map<String, Long> lastModifiedTimesForInputDir) {
            try {
                FileStatus parentStatus = this.fs.getFileStatus(filePath);
                String parentPathStr = filePath.toUri().getPath();
                LOG.debug("scan {}", (Object)parentPathStr);
                FileStatus[] childStatuses = this.fs.listStatus(filePath);
                if (childStatuses.length == 0 && rootPath == null && (lastModifiedTimesForInputDir == null || lastModifiedTimesForInputDir.get(parentPathStr) == null)) {
                    ScannedFileInfo info = new ScannedFileInfo(null, filePath.toString(), parentStatus.getModificationTime());
                    this.processDiscoveredFile(info);
                }
                for (FileStatus childStatus : childStatuses) {
                    Path childPath = childStatus.getPath();
                    String childPathStr = childPath.toUri().getPath();
                    if (childStatus.isDirectory() && this.isRecursive()) {
                        this.addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir);
                        this.scan(childPath, rootPath == null ? parentStatus.getPath() : rootPath, lastModifiedTimesForInputDir);
                        continue;
                    }
                    if (this.acceptFile(childPathStr)) {
                        this.addToDiscoveredFiles(rootPath, parentStatus, childStatus, lastModifiedTimesForInputDir);
                        continue;
                    }
                    this.ignoredFiles.add(childPathStr);
                }
            }
            catch (FileNotFoundException fnf) {
                LOG.warn("Failed to list directory {}", (Object)filePath, (Object)fnf);
            }
            catch (IOException e) {
                throw new RuntimeException("listing files", e);
            }
        }

        private void addToDiscoveredFiles(Path rootPath, FileStatus parentStatus, FileStatus childStatus, Map<String, Long> lastModifiedTimesForInputDir) throws IOException {
            Path childPath = childStatus.getPath();
            String childPathStr = childPath.toUri().getPath();
            Long oldModificationTime = null;
            if (lastModifiedTimesForInputDir != null) {
                oldModificationTime = lastModifiedTimesForInputDir.get(childPathStr);
            }
            if (TimeBasedDirectoryScanner.skipFile(childPath, childStatus.getModificationTime(), oldModificationTime) || childStatus.isDirectory() && oldModificationTime != null) {
                return;
            }
            if (this.ignoredFiles.contains(childPathStr)) {
                return;
            }
            ScannedFileInfo info = this.createScannedFileInfo(parentStatus.getPath(), parentStatus, childPath, childStatus, rootPath);
            LOG.debug("Processing file: " + info.getFilePath());
            this.processDiscoveredFile(info);
        }

        protected void processDiscoveredFile(ScannedFileInfo info) {
            ++this.numDiscoveredPerIteration;
            this.lastScannedInfo = info;
            this.discoveredFiles.add(info);
        }

        protected ScannedFileInfo createScannedFileInfo(Path parentPath, FileStatus parentStatus, Path childPath, FileStatus childStatus, Path rootPath) {
            ScannedFileInfo info;
            if (rootPath == null) {
                info = parentStatus.isDirectory() ? new ScannedFileInfo(parentPath.toUri().getPath(), childPath.getName(), childStatus.getModificationTime()) : new ScannedFileInfo(null, childPath.toUri().getPath(), childStatus.getModificationTime());
            } else {
                URI relativeChildURI = rootPath.toUri().relativize(childPath.toUri());
                info = new ScannedFileInfo(rootPath.toUri().getPath(), relativeChildURI.getPath(), childStatus.getModificationTime());
            }
            return info;
        }

        protected static boolean skipFile(@NotNull Path path, @NotNull Long modificationTime, Long lastModificationTime) throws IOException {
            return lastModificationTime != null && modificationTime <= lastModificationTime;
        }

        protected boolean acceptFile(String filePathStr) {
            Matcher matcher;
            if (this.fs.getScheme().equalsIgnoreCase("hdfs") && filePathStr.endsWith(FILE_BEING_COPIED)) {
                return false;
            }
            if (this.regex != null && !(matcher = this.regex.matcher(filePathStr)).matches()) {
                return false;
            }
            return this.ignoreRegex == null || !(matcher = this.ignoreRegex.matcher(filePathStr)).matches();
        }

        public AbstractFileSplitter.FileInfo pollFile() {
            return this.discoveredFiles.poll();
        }

        protected int getNumDiscoveredPerIteration() {
            return this.numDiscoveredPerIteration;
        }

        public String getFilePatternRegularExp() {
            return this.filePatternRegularExp;
        }

        public void setFilePatternRegularExp(String filePatternRegexp) {
            this.filePatternRegularExp = filePatternRegexp;
        }

        public String getIgnoreFilePatternRegularExp() {
            return this.ignoreFilePatternRegularExp;
        }

        public void setIgnoreFilePatternRegularExp(String ignoreFilePatternRegex) {
            this.ignoreFilePatternRegularExp = ignoreFilePatternRegex;
        }

        public void setFiles(String files) {
            Iterables.addAll(this.files, (Iterable)Splitter.on((String)",").omitEmptyStrings().split((CharSequence)files));
        }

        public String getFiles() {
            return Joiner.on((String)",").join(this.files);
        }

        public void setRecursive(boolean recursive) {
            this.recursive = recursive;
        }

        public boolean isRecursive() {
            return this.recursive;
        }

        public void setTrigger(boolean trigger) {
            this.trigger = trigger;
        }

        public boolean isTrigger() {
            return this.trigger;
        }

        public long getScanIntervalMillis() {
            return this.scanIntervalMillis;
        }

        public void setScanIntervalMillis(long scanIntervalMillis) {
            this.scanIntervalMillis = scanIntervalMillis;
        }
    }
}

