/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.stream;

import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.stram.tuple.Tuple;
import java.util.Set;

public class PartitionAwareSink<T>
implements Sink<T> {
    private final StreamCodec<T> serde;
    private final Set<Integer> partitions;
    private final int mask;
    private volatile Sink<T> output;
    private int count;

    public PartitionAwareSink(StreamCodec<T> serde, Set<Integer> partitions, int mask, Sink<T> output) {
        this.serde = serde;
        this.partitions = partitions;
        this.output = output;
        this.mask = mask;
    }

    public void put(T payload) {
        if (payload instanceof Tuple) {
            ++this.count;
            this.output.put(payload);
        } else if (this.canSendToOutput(payload)) {
            ++this.count;
            this.output.put(payload);
        }
    }

    protected boolean canSendToOutput(T payload) {
        return this.partitions.contains(this.serde.getPartition(payload) & this.mask);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getCount(boolean reset) {
        try {
            int n = this.count;
            return n;
        }
        finally {
            if (reset) {
                this.count = 0;
            }
        }
    }
}

