/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.engine;

import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.common.util.ScheduledExecutorService;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.stram.engine.MuxReservoir;
import com.datatorrent.stram.engine.Stream;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.ResetWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WindowGenerator
extends MuxReservoir
implements Stream,
Runnable {
    public static final int WINDOW_MASK = 16383;
    public static final int MIN_WINDOW_ID = 0;
    public static final int MAX_WINDOW_ID = 15999;
    public static final int MAX_WINDOW_WIDTH = 503079487;
    private final ScheduledExecutorService ses;
    private final BlockingQueue<Tuple> queue;
    private long firstWindowMillis;
    private int windowWidthMillis;
    private long currentWindowMillis;
    private long baseSeconds;
    private int windowId;
    private long resetWindowMillis;
    private int checkPointWindowCount;
    private int checkpointCount = 60;
    private static final Logger logger = LoggerFactory.getLogger(WindowGenerator.class);

    public WindowGenerator(ScheduledExecutorService service, int capacity) {
        this.ses = service;
        this.queue = new CircularBuffer(capacity);
    }

    public final void advanceWindow() {
        this.currentWindowMillis += (long)this.windowWidthMillis;
        ++this.windowId;
    }

    private void resetBeginNewWindow() throws InterruptedException {
        long timespanBetween2Resets = 15999L * (long)this.windowWidthMillis + (long)this.windowWidthMillis;
        this.resetWindowMillis = this.currentWindowMillis - (this.currentWindowMillis - this.resetWindowMillis) % timespanBetween2Resets;
        this.windowId = (int)((this.currentWindowMillis - this.resetWindowMillis) / (long)this.windowWidthMillis);
        this.baseSeconds = this.resetWindowMillis / 1000L << 32;
        this.queue.put(new ResetWindowTuple(this.baseSeconds | (long)this.windowWidthMillis));
        this.queue.put(new Tuple(MessageType.BEGIN_WINDOW, this.baseSeconds | (long)this.windowId));
    }

    private void endCurrentBeginNewWindow() throws InterruptedException {
        this.queue.put(new EndWindowTuple(this.baseSeconds | (long)this.windowId));
        if (++this.checkPointWindowCount == this.checkpointCount) {
            this.queue.put(new Tuple(MessageType.CHECKPOINT, this.baseSeconds | (long)this.windowId));
            this.checkPointWindowCount = 0;
        }
        if (this.windowId == 15999) {
            this.advanceWindow();
            this.run();
        } else {
            this.advanceWindow();
            this.queue.put(new Tuple(MessageType.BEGIN_WINDOW, this.baseSeconds | (long)this.windowId));
        }
    }

    @Override
    public final void run() {
        try {
            this.resetBeginNewWindow();
        }
        catch (InterruptedException ie) {
            this.handleException(ie);
        }
    }

    public void setFirstWindow(long millis) {
        this.firstWindowMillis = millis;
    }

    public void setResetWindow(long millis) {
        this.resetWindowMillis = millis;
    }

    public void setWindowWidth(int millis) {
        if (millis > 503079487 || millis < 1) {
            throw new IllegalArgumentException(String.format("Window width %d is invalid as it's not in the range 1 to %d", millis, 503079487));
        }
        this.windowWidthMillis = millis;
    }

    public void setCheckpointCount(int streamingWindowCount, int offset) {
        logger.debug("setCheckpointCount: {} {}", (Object)streamingWindowCount, (Object)offset);
        this.checkpointCount = streamingWindowCount;
        this.checkPointWindowCount = offset;
    }

    public void setup(StreamContext context) {
        logger.info("WindowGenerator::setup does not do anything useful, please use setFirstWindow/setResetWindow/setWindowWidth do set properties.");
    }

    public void activate(StreamContext context) {
        this.currentWindowMillis = this.firstWindowMillis;
        Runnable subsequentRun = new Runnable(){

            @Override
            public void run() {
                try {
                    WindowGenerator.this.endCurrentBeginNewWindow();
                }
                catch (InterruptedException ie) {
                    WindowGenerator.this.handleException(ie);
                }
            }
        };
        long currentTms = this.ses.getCurrentTimeMillis();
        if (this.currentWindowMillis < currentTms) {
            logger.info("Catching up from {} to {}", (Object)this.currentWindowMillis, (Object)currentTms);
            this.ses.schedule(new Runnable(){

                @Override
                public void run() {
                    try {
                        WindowGenerator.this.resetBeginNewWindow();
                        do {
                            WindowGenerator.this.endCurrentBeginNewWindow();
                        } while (WindowGenerator.this.currentWindowMillis < WindowGenerator.this.ses.getCurrentTimeMillis());
                    }
                    catch (InterruptedException ie) {
                        WindowGenerator.this.handleException(ie);
                    }
                }
            }, 0L, TimeUnit.MILLISECONDS);
        } else {
            logger.info("The input will start to be sliced in {} milliseconds", (Object)(this.currentWindowMillis - currentTms));
            this.ses.schedule((Runnable)this, this.currentWindowMillis - currentTms, TimeUnit.MILLISECONDS);
        }
        this.ses.scheduleAtFixedRate(subsequentRun, this.currentWindowMillis - currentTms + (long)this.windowWidthMillis, (long)this.windowWidthMillis, TimeUnit.MILLISECONDS);
    }

    public void deactivate() {
        this.ses.shutdown();
    }

    private void handleException(Exception e) {
        if (!(e instanceof InterruptedException)) {
            throw new RuntimeException(e);
        }
        this.ses.shutdown();
    }

    public void teardown() {
    }

    public void put(Object tuple) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    protected Queue getQueue() {
        return this.queue;
    }

    public int getCount(boolean reset) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public static long getWindowCount(long millis, long firstMillis, long widthMillis) {
        long diff = millis - firstMillis;
        return diff / widthMillis;
    }

    public static long getWindowId(long millis, long firstWindowMillis, long windowWidthMillis) {
        long diff = millis - firstWindowMillis;
        long remainder = diff % (windowWidthMillis * 16000L);
        long baseSeconds = (millis - remainder) / 1000L;
        long windowId = remainder / windowWidthMillis;
        return baseSeconds << 32 | windowId;
    }

    public static long getNextWindowMillis(long windowId, long firstWindowMillis, long windowWidthMillis) {
        return WindowGenerator.getWindowMillis(windowId, firstWindowMillis, windowWidthMillis) + windowWidthMillis;
    }

    public static long getNextWindowId(long windowId, long firstWindowMillis, long windowWidthMillis) {
        return WindowGenerator.getAheadWindowId(windowId, firstWindowMillis, windowWidthMillis, 1);
    }

    public static long getAheadWindowId(long windowId, long firstWindowMillis, long windowWidthMillis, int ahead) {
        long millis = WindowGenerator.getWindowMillis(windowId, firstWindowMillis, windowWidthMillis);
        return WindowGenerator.getWindowId(millis += (long)ahead * windowWidthMillis, firstWindowMillis, windowWidthMillis);
    }

    public static long compareWindowId(long windowIdA, long windowIdB, long windowWidthMillis) {
        long millisA = WindowGenerator.getWindowMillis(windowIdA, 0L, windowWidthMillis);
        long millisB = WindowGenerator.getWindowMillis(windowIdB, 0L, windowWidthMillis);
        return (millisA - millisB) / windowWidthMillis;
    }

    public static long getWindowMillis(long windowId, long firstWindowMillis, long windowWidthMillis) {
        if (windowId == -1L) {
            return firstWindowMillis;
        }
        long baseMillis = (windowId >> 32) * 1000L;
        long diff = baseMillis - firstWindowMillis;
        long baseChangeInterval = windowWidthMillis * 16000L;
        assert (baseChangeInterval > 0L);
        long multiplier = diff / baseChangeInterval;
        if (diff % baseChangeInterval > 0L) {
            ++multiplier;
        }
        assert (multiplier >= 0L);
        return firstWindowMillis + multiplier * baseChangeInterval + (windowId &= 0x3FFFL) * windowWidthMillis;
    }

    public static long getBaseSecondsFromWindowId(long windowId) {
        return windowId >>> 32;
    }
}

