/*
 * Decompiled with CFR 0.152.
 */
package akka.stream.alpakka.file.impl;

import akka.NotUsed;
import akka.annotation.InternalApi;
import akka.japi.Pair;
import akka.stream.Attributes;
import akka.stream.Outlet;
import akka.stream.Shape;
import akka.stream.SourceShape;
import akka.stream.alpakka.file.DirectoryChange;
import akka.stream.javadsl.Source;
import akka.stream.stage.AbstractOutHandler;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.TimerGraphStageLogic;
import com.sun.nio.file.SensitivityWatchEventModifier;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.function.BiFunction;
import scala.concurrent.duration.FiniteDuration;

@InternalApi
public final class DirectoryChangesSource<T>
extends GraphStage<SourceShape<T>> {
    private static final Attributes DEFAULT_ATTRIBUTES = Attributes.name((String)"DirectoryChangesSource");
    private final Path directoryPath;
    private final FiniteDuration pollInterval;
    private final int maxBufferSize;
    private final BiFunction<Path, DirectoryChange, T> combiner;
    public final Outlet<T> out = Outlet.create((String)"DirectoryChangesSource.out");
    private final SourceShape<T> shape = SourceShape.of(this.out);

    public DirectoryChangesSource(Path path, FiniteDuration finiteDuration, int n, BiFunction<Path, DirectoryChange, T> biFunction) {
        this.directoryPath = path;
        this.pollInterval = finiteDuration;
        this.maxBufferSize = n;
        this.combiner = biFunction;
    }

    public SourceShape<T> shape() {
        return this.shape;
    }

    public Attributes initialAttributes() {
        return DEFAULT_ATTRIBUTES;
    }

    public GraphStageLogic createLogic(Attributes attributes) throws IOException {
        if (!Files.exists(this.directoryPath, new LinkOption[0])) {
            throw new IllegalArgumentException("The path: '" + this.directoryPath + "' does not exist");
        }
        if (!Files.isDirectory(this.directoryPath, new LinkOption[0])) {
            throw new IllegalArgumentException("The path '" + this.directoryPath + "' is not a directory");
        }
        return new TimerGraphStageLogic((Shape)this.shape){
            private final Queue<T> buffer;
            private final WatchService service;
            private final WatchKey watchKey;
            {
                this.buffer = new ArrayDeque();
                this.service = DirectoryChangesSource.this.directoryPath.getFileSystem().newWatchService();
                this.watchKey = DirectoryChangesSource.this.directoryPath.register(this.service, new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.OVERFLOW}, SensitivityWatchEventModifier.HIGH);
                this.setHandler(DirectoryChangesSource.this.out, (OutHandler)new AbstractOutHandler(){

                    public void onPull() throws Exception {
                        if (!buffer.isEmpty()) {
                            this.pushHead();
                        } else {
                            this.doPoll();
                            if (!buffer.isEmpty()) {
                                this.pushHead();
                            } else {
                                this.schedulePoll();
                            }
                        }
                    }
                });
            }

            public void onTimer(Object object) {
                if (!this.isClosed(DirectoryChangesSource.this.out)) {
                    this.doPoll();
                    if (!this.buffer.isEmpty()) {
                        this.pushHead();
                    } else {
                        this.schedulePoll();
                    }
                }
            }

            public void postStop() {
                try {
                    if (this.watchKey.isValid()) {
                        this.watchKey.cancel();
                    }
                    this.service.close();
                }
                catch (Exception exception) {
                    throw new RuntimeException(exception);
                }
            }

            private void pushHead() {
                Object t = this.buffer.poll();
                if (t != null) {
                    this.push(DirectoryChangesSource.this.out, t);
                }
            }

            private void schedulePoll() {
                this.scheduleOnce("poll", DirectoryChangesSource.this.pollInterval);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void doPoll() {
                try {
                    for (WatchEvent<?> watchEvent : this.watchKey.pollEvents()) {
                        WatchEvent.Kind<?> kind = watchEvent.kind();
                        if (StandardWatchEventKinds.OVERFLOW.equals(kind)) {
                            this.failStage(new RuntimeException("Overflow from watch service: '" + DirectoryChangesSource.this.directoryPath + "'"));
                            continue;
                        }
                        Path path = (Path)watchEvent.context();
                        Path path2 = DirectoryChangesSource.this.directoryPath.resolve(path);
                        DirectoryChange directoryChange = this.kindToChange(kind);
                        this.buffer.add(DirectoryChangesSource.this.combiner.apply(path2, directoryChange));
                        if (this.buffer.size() <= DirectoryChangesSource.this.maxBufferSize) continue;
                        this.failStage(new RuntimeException("Max event buffer size " + DirectoryChangesSource.this.maxBufferSize + " reached for " + path));
                    }
                }
                finally {
                    if (!this.watchKey.reset()) {
                        this.completeStage();
                    }
                }
            }

            private DirectoryChange kindToChange(WatchEvent.Kind<?> kind) {
                DirectoryChange directoryChange;
                if (kind.equals(StandardWatchEventKinds.ENTRY_CREATE)) {
                    directoryChange = DirectoryChange.Creation;
                } else if (kind.equals(StandardWatchEventKinds.ENTRY_DELETE)) {
                    directoryChange = DirectoryChange.Deletion;
                } else if (kind.equals(StandardWatchEventKinds.ENTRY_MODIFY)) {
                    directoryChange = DirectoryChange.Modification;
                } else {
                    throw new RuntimeException("Unexpected kind of event gotten from watch service for path '" + DirectoryChangesSource.this.directoryPath + "': " + kind);
                }
                return directoryChange;
            }
        };
    }

    public String toString() {
        return "DirectoryChangesSource(" + this.directoryPath + ')';
    }

    public static Source<Pair<Path, DirectoryChange>, NotUsed> create(Path path, FiniteDuration finiteDuration, int n) {
        return Source.fromGraph(new DirectoryChangesSource<Pair>(path, finiteDuration, n, Pair::apply));
    }
}

