/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.engine;

import com.datatorrent.api.Sink;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.stram.engine.SweepableReservoir;
import com.datatorrent.stram.tuple.EndStreamTuple;
import com.datatorrent.stram.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WindowIdActivatedReservoir
implements SweepableReservoir {
    private Sink<Object> sink;
    private final String identifier;
    private final SweepableReservoir reservoir;
    private final long windowId;
    EndStreamTuple est;
    private static final Logger logger = LoggerFactory.getLogger(WindowIdActivatedReservoir.class);

    public WindowIdActivatedReservoir(String identifier, SweepableReservoir reservoir, long windowId) {
        this.identifier = identifier;
        this.reservoir = reservoir;
        this.windowId = windowId;
        reservoir.setSink((Sink<Object>)Sink.BLACKHOLE);
    }

    @Override
    public int size(boolean dataTupleAware) {
        return this.reservoir.size(dataTupleAware);
    }

    @Override
    public boolean isEmpty() {
        return this.reservoir.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Object remove() {
        if (this.est == null) {
            return this.reservoir.remove();
        }
        try {
            EndStreamTuple endStreamTuple = this.est;
            return endStreamTuple;
        }
        finally {
            this.est = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Sink<Object> setSink(Sink<Object> sink) {
        try {
            Sink<Object> sink2 = this.sink;
            return sink2;
        }
        finally {
            this.sink = sink;
        }
    }

    @Override
    public Tuple sweep() {
        Tuple t;
        while ((t = this.reservoir.sweep()) != null) {
            if (t.getType() == MessageType.BEGIN_WINDOW && t.getWindowId() > this.windowId) {
                this.reservoir.setSink(this.sink);
                this.est = new EndStreamTuple(this.windowId);
                return this.est;
            }
            this.reservoir.remove();
        }
        return null;
    }

    @Override
    public int getCount(boolean reset) {
        return 0;
    }

    public String toString() {
        return "WindowIdActivatedReservoir{identifier=" + this.identifier + ", windowId=" + this.windowId + '}';
    }
}

