/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.stream;

import com.datatorrent.stram.engine.AbstractReservoir;
import com.datatorrent.stram.engine.Stream;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.engine.SweepableReservoir;
import com.datatorrent.stram.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InlineStream
implements Stream {
    private int count;
    private AbstractReservoir reservoir;
    private static final Logger logger = LoggerFactory.getLogger(InlineStream.class);

    public InlineStream(int capacity) {
        this.reservoir = AbstractReservoir.newReservoir("InlineStream", capacity);
    }

    public SweepableReservoir getReservoir() {
        return this.reservoir;
    }

    public void setup(StreamContext context) {
        this.reservoir.setId(context.getId());
    }

    public void activate(StreamContext context) {
    }

    public void deactivate() {
    }

    public void teardown() {
    }

    public void put(Object tuple) {
        try {
            this.reservoir.put(tuple);
            if (!(tuple instanceof Tuple)) {
                ++this.count;
            }
        }
        catch (InterruptedException ie) {
            logger.debug("Interrupted", (Throwable)ie);
            throw new RuntimeException(ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getCount(boolean reset) {
        try {
            int n = this.count;
            return n;
        }
        finally {
            if (reset) {
                this.count = 0;
            }
        }
    }

    public String toString() {
        return this.getClass().getName() + '@' + Integer.toHexString(this.hashCode()) + "{reservoir=" + this.getReservoir().toString() + '}';
    }
}

