/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.appdata.query;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.base.Preconditions;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WindowBoundedService
implements Component<Context.OperatorContext> {
    public static final long DEFAULT_FLUSH_INTERVAL_MILLIS = 10L;
    private final long executeIntervalMillis;
    private final Runnable runnable;
    protected transient ExecutorService executorThread;
    private final transient Semaphore mutex = new Semaphore(0);
    private volatile boolean terminated = false;
    private static final Logger LOG = LoggerFactory.getLogger(WindowBoundedService.class);

    public WindowBoundedService(Runnable runnable) {
        this.executeIntervalMillis = 10L;
        this.runnable = (Runnable)Preconditions.checkNotNull((Object)runnable);
    }

    public WindowBoundedService(long executeIntervalMillis, Runnable runnable) {
        Preconditions.checkArgument((executeIntervalMillis > 0L ? 1 : 0) != 0, (Object)"The executeIntervalMillis must be positive");
        this.executeIntervalMillis = executeIntervalMillis;
        this.runnable = (Runnable)Preconditions.checkNotNull((Object)runnable);
    }

    public void setup(Context.OperatorContext context) {
        this.executorThread = Executors.newSingleThreadExecutor((ThreadFactory)new NameableThreadFactory("Query Executor Thread"));
        this.executorThread.submit(new AsynchExecutorThread());
    }

    public void beginWindow(long windowId) {
        this.mutex.release();
    }

    public void endWindow() {
        try {
            this.mutex.acquire();
        }
        catch (InterruptedException ex) {
            DTThrowable.wrapIfChecked((Exception)ex);
        }
    }

    public void teardown() {
        LOG.info("Shutting down");
        this.terminated = true;
        this.mutex.release();
        this.executorThread.shutdown();
        try {
            this.executorThread.awaitTermination(10000L + this.executeIntervalMillis, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    public class AsynchExecutorThread
    implements Callable<Void> {
        private long lastExecuteTime = 0L;

        public AsynchExecutorThread() {
        }

        @Deprecated
        public AsynchExecutorThread(Thread mainThread) {
        }

        @Override
        public Void call() throws Exception {
            try {
                this.loop();
            }
            catch (Exception e) {
                LOG.error("Exception thrown while processing:", (Throwable)e);
                WindowBoundedService.this.mutex.release();
            }
            return null;
        }

        private void loop() throws Exception {
            while (true) {
                long currentTime;
                long diff;
                if ((diff = (currentTime = System.currentTimeMillis()) - this.lastExecuteTime) > WindowBoundedService.this.executeIntervalMillis) {
                    this.lastExecuteTime = currentTime;
                    WindowBoundedService.this.mutex.acquireUninterruptibly();
                    if (WindowBoundedService.this.terminated) {
                        LOG.info("Terminated");
                        return;
                    }
                    WindowBoundedService.this.runnable.run();
                    WindowBoundedService.this.mutex.release();
                    continue;
                }
                if (WindowBoundedService.this.terminated) {
                    LOG.info("Terminated");
                    return;
                }
                Thread.sleep(WindowBoundedService.this.executeIntervalMillis - diff);
            }
        }
    }
}

