/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.task.window.core;

import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.api.compute.TaskContext;
import edu.iu.dsc.tws.api.compute.modifiers.Closable;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.task.window.IWindowCompute;
import edu.iu.dsc.tws.task.window.api.GlobalStreamId;
import edu.iu.dsc.tws.task.window.api.IEvictionPolicy;
import edu.iu.dsc.tws.task.window.api.ITimestampExtractor;
import edu.iu.dsc.tws.task.window.api.IWindow;
import edu.iu.dsc.tws.task.window.api.IWindowMessage;
import edu.iu.dsc.tws.task.window.api.WindowLifeCycleListener;
import edu.iu.dsc.tws.task.window.config.WindowConfig;
import edu.iu.dsc.tws.task.window.core.AbstractSingleWindowDataSink;
import edu.iu.dsc.tws.task.window.event.WatermarkEventGenerator;
import edu.iu.dsc.tws.task.window.exceptions.InvalidWindow;
import edu.iu.dsc.tws.task.window.manage.WindowManager;
import edu.iu.dsc.tws.task.window.policy.eviction.count.CountEvictionPolicy;
import edu.iu.dsc.tws.task.window.policy.eviction.count.WatermarkCountEvictionPolicy;
import edu.iu.dsc.tws.task.window.policy.eviction.duration.DurationEvictionPolicy;
import edu.iu.dsc.tws.task.window.policy.eviction.duration.WatermarkDurationEvictionPolicy;
import edu.iu.dsc.tws.task.window.policy.trigger.IWindowingPolicy;
import edu.iu.dsc.tws.task.window.policy.trigger.count.CountWindowPolicy;
import edu.iu.dsc.tws.task.window.policy.trigger.count.WatermarkCountWindowPolicy;
import edu.iu.dsc.tws.task.window.policy.trigger.duration.DurationWindowPolicy;
import edu.iu.dsc.tws.task.window.policy.trigger.duration.WatermarkDurationWindowPolicy;
import edu.iu.dsc.tws.task.window.strategy.IWindowStrategy;
import edu.iu.dsc.tws.task.window.util.WindowParameter;
import edu.iu.dsc.tws.task.window.util.WindowUtils;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public abstract class BaseWindowedSink<T>
extends AbstractSingleWindowDataSink<T>
implements IWindowCompute<T>,
Closable {
    private static final Logger LOG = Logger.getLogger(BaseWindowedSink.class.getName());
    protected static final long DEFAULT_WATERMARK_INTERVAL = 1000L;
    protected static final long DEFAULT_MAX_LAG = 0L;
    protected long maxLagMs = 0L;
    protected WindowConfig.Duration watermarkInterval = null;
    protected WindowConfig.Duration allowedLateness = null;
    protected WindowManager<T> windowManager;
    protected IWindowingPolicy<T> windowingPolicy;
    protected WindowParameter windowParameter;
    protected WindowLifeCycleListener<T> windowLifeCycleListener;
    protected IEvictionPolicy<T> evictionPolicy;
    protected IWindow iWindow;
    protected T collectiveOutput;
    protected IWindowMessage<T> collectiveEvents;
    protected ITimestampExtractor<T> iTimestampExtractor;
    protected WatermarkEventGenerator<T> watermarkEventGenerator;

    @Override
    public abstract boolean execute(IWindowMessage<T> var1);

    public abstract boolean getExpire(IWindowMessage<T> var1);

    public abstract boolean getLateMessages(IMessage<T> var1);

    protected BaseWindowedSink() {
    }

    public void prepare(Config cfg, TaskContext ctx) {
        super.prepare(cfg, ctx);
        this.windowLifeCycleListener = this.newWindowLifeCycleListener();
        this.windowManager = new WindowManager<T>(this.windowLifeCycleListener);
        this.initialize(ctx);
    }

    public void initialize(TaskContext context) {
        try {
            if (this.iWindow == null) {
                this.iWindow = WindowUtils.getWindow(this.windowParameter.getWindowCountSize(), this.windowParameter.getSlidingCountSize(), this.windowParameter.getWindowDurationSize(), this.windowParameter.getSldingDurationSize());
            }
            if (this.iTimestampExtractor != null) {
                long watermarkInt = 1L;
                watermarkInt = this.watermarkInterval != null ? this.watermarkInterval.value : 1000L;
                this.maxLagMs = this.allowedLateness != null ? this.allowedLateness.value : 0L;
                this.watermarkEventGenerator = new WatermarkEventGenerator<T>(this.windowManager, this.maxLagMs, watermarkInt, this.getComponentStreams(context));
            }
            this.setPolicies(this.iWindow.getWindowStrategy());
            this.start();
        }
        catch (InvalidWindow invalidWindow) {
            invalidWindow.printStackTrace();
        }
    }

    @Override
    public boolean execute(IMessage<T> message) {
        if (this.isTimestamped()) {
            long time = this.iTimestampExtractor.extractTimestamp(message.getContent());
            GlobalStreamId streamId = new GlobalStreamId(message.edge());
            if (this.watermarkEventGenerator.track(streamId, time)) {
                this.windowManager.add(message, time);
            } else {
                this.getLateMessages(message);
            }
        } else {
            this.windowManager.add(message);
        }
        return true;
    }

    public BaseWindowedSink<T> withTumblingCountWindow(long tumblingCount) {
        this.windowParameter = new WindowParameter();
        this.windowParameter.withTumblingCountWindow(tumblingCount);
        return this;
    }

    public BaseWindowedSink<T> withTumblingDurationWindow(long tumblingDuration, TimeUnit timeUnit) {
        this.windowParameter = new WindowParameter();
        this.windowParameter.withTumblingDurationWindow(tumblingDuration, timeUnit);
        return this;
    }

    public BaseWindowedSink<T> withSlidingCountWindow(long windowCount, long slidingCount) {
        this.windowParameter = new WindowParameter();
        this.windowParameter.withSlidingingCountWindow(windowCount, slidingCount);
        return this;
    }

    public BaseWindowedSink<T> withSlidingDurationWindow(long windowDuration, TimeUnit windowTU, long slidingDuration, TimeUnit slidingTU) {
        this.windowParameter = new WindowParameter();
        this.windowParameter.withSlidingDurationWindow(windowDuration, windowTU, slidingDuration, slidingTU);
        return this;
    }

    public BaseWindowedSink<T> withCustomTimestampExtractor(ITimestampExtractor timestampExtractor) {
        this.iTimestampExtractor = timestampExtractor;
        return this;
    }

    public BaseWindowedSink<T> withTimestampExtractor() {
        this.iTimestampExtractor = null;
        return this;
    }

    public BaseWindowedSink<T> withAllowedLateness(long lateness, TimeUnit timeUnit) {
        this.allowedLateness = new WindowConfig.Duration(lateness, timeUnit);
        return this;
    }

    public BaseWindowedSink<T> withWatermarkInterval(long watermarkInt, TimeUnit timeUnit) {
        this.watermarkInterval = new WindowConfig.Duration(watermarkInt, timeUnit);
        return this;
    }

    public BaseWindowedSink<T> withWindow(IWindow window) {
        this.iWindow = window;
        return this;
    }

    protected WindowLifeCycleListener<T> newWindowLifeCycleListener() {
        return new WindowLifeCycleListener<T>(){

            @Override
            public void onExpiry(IWindowMessage<T> events) {
                BaseWindowedSink.this.getExpire(events);
            }

            @Override
            public void onActivation(IWindowMessage<T> events, IWindowMessage<T> newEvents, IWindowMessage<T> expired) {
                BaseWindowedSink.this.collectiveEvents = events;
                BaseWindowedSink.this.execute(BaseWindowedSink.this.collectiveEvents);
            }
        };
    }

    public IWindowingPolicy<T> getWindowingPolicy(WindowConfig.Count slidingIntervalCount, WindowConfig.Duration slidingIntervalDuration, WindowManager<T> manager, IEvictionPolicy<T> policy) {
        if (slidingIntervalCount != null) {
            return new CountWindowPolicy<T>(slidingIntervalCount.value, manager, policy);
        }
        return new DurationWindowPolicy<T>(slidingIntervalDuration.value, manager, policy);
    }

    public IEvictionPolicy<T> getEvictionPolicy(WindowConfig.Count windowLengthCount, WindowConfig.Duration windowLengthDuration) {
        if (windowLengthCount != null) {
            return new CountEvictionPolicy(windowLengthCount.value);
        }
        return new DurationEvictionPolicy(windowLengthDuration.value);
    }

    public void setPolicies(IWindowStrategy<T> windowStrategy) {
        IEvictionPolicy<T> eviPolicy = windowStrategy.getEvictionPolicy();
        if (this.isTimestamped()) {
            if (eviPolicy instanceof CountEvictionPolicy) {
                LOG.info(String.format("WatermarkCountEvictionPolicy selected", new Object[0]));
                this.evictionPolicy = new WatermarkCountEvictionPolicy(this.windowParameter.getWindowCountSize().value);
            }
            if (eviPolicy instanceof DurationEvictionPolicy) {
                LOG.info(String.format("WatermarkDurationEvictionPolicy selected", new Object[0]));
                this.evictionPolicy = new WatermarkDurationEvictionPolicy(this.windowParameter.getWindowDurationSize().value, this.maxLagMs);
            }
        } else {
            this.evictionPolicy = eviPolicy;
        }
        IWindowingPolicy<T> winPolicy = windowStrategy.getWindowingPolicy(this.windowManager, this.evictionPolicy);
        if (this.isTimestamped()) {
            if (winPolicy instanceof CountWindowPolicy) {
                LOG.info(String.format("WatermarkCountWindowingPolicy selected", new Object[0]));
                this.windowingPolicy = new WatermarkCountWindowPolicy<T>(this.windowParameter.getSlidingCountSize().value, this.windowManager, this.evictionPolicy, this.windowManager);
            }
            if (winPolicy instanceof DurationWindowPolicy) {
                LOG.info(String.format("WatermarkDurationWindowingPolicy selected", new Object[0]));
                this.windowingPolicy = new WatermarkDurationWindowPolicy<T>(this.windowParameter.getSldingDurationSize().value, this.windowManager, this.windowManager, this.evictionPolicy);
            }
        } else {
            this.windowingPolicy = winPolicy;
        }
        this.windowManager.setEvictionPolicy(this.evictionPolicy);
        this.windowManager.setWindowingPolicy(this.windowingPolicy);
    }

    public void start() {
        if (this.watermarkEventGenerator != null) {
            LOG.info("Starting WatermarkGenerator");
            LOG.log(Level.FINE, "Starting watermark generator");
            this.watermarkEventGenerator.start();
        }
        LOG.log(Level.FINE, "Starting windowing policy");
        this.windowingPolicy.start();
    }

    public void close() {
        if (this.watermarkEventGenerator != null) {
            this.watermarkEventGenerator.shutdown();
        }
        this.windowManager.shutdown();
    }

    public void reset() {
    }

    protected boolean isTimestamped() {
        return this.iTimestampExtractor != null;
    }

    private Set<GlobalStreamId> getComponentStreams(TaskContext context) {
        HashSet<GlobalStreamId> streams = new HashSet();
        streams = this.wrapGlobalStreamId(context);
        return streams;
    }

    private Set<GlobalStreamId> wrapGlobalStreamId(TaskContext context) {
        HashSet<GlobalStreamId> streams = new HashSet<GlobalStreamId>();
        for (String s : context.getInEdges().keySet()) {
            GlobalStreamId globalStreamId = new GlobalStreamId(s);
            streams.add(globalStreamId);
        }
        return streams;
    }

    private static class WindowedLateOutputCollector<T> {
        private final List<IMessage<T>> messageList;
        private IWindowMessage<T> iWindowMessage = null;

        WindowedLateOutputCollector(List<IMessage<T>> list) {
            this.messageList = list;
        }
    }
}

