/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.commons.lang3.ClassUtils;
import org.apache.hadoop.classification.InterfaceStability;

@InterfaceStability.Evolving
public abstract class SimpleSinglePortInputOperator<T>
extends BaseOperator
implements InputOperator,
Operator.ActivationListener<Context.OperatorContext> {
    private transient Thread ioThread;
    private transient boolean isActive = false;
    public final transient BufferingOutputPort<T> outputPort;

    public SimpleSinglePortInputOperator(int portCapacity) {
        this.outputPort = new BufferingOutputPort((Operator)this, portCapacity);
    }

    public SimpleSinglePortInputOperator() {
        this(1024);
    }

    public final void activate(Context.OperatorContext ctx) {
        this.isActive = true;
        if (this instanceof Runnable) {
            this.ioThread = new Thread((Runnable)((Object)this), "io-" + ClassUtils.getShortClassName(((Object)((Object)this)).getClass()));
            this.ioThread.start();
        }
    }

    public final void deactivate() {
        this.isActive = false;
        if (this.ioThread != null) {
            this.ioThread.interrupt();
        }
    }

    public final boolean isActive() {
        return this.isActive;
    }

    public void emitTuples() {
        this.outputPort.flush(Integer.MAX_VALUE);
    }

    public static class BufferingOutputPort<T>
    extends DefaultOutputPort<T> {
        public final ArrayBlockingQueue<T> tuples;

        public BufferingOutputPort(Operator operator) {
            this.tuples = new ArrayBlockingQueue(1024);
        }

        public BufferingOutputPort(Operator operator, int capacity) {
            this.tuples = new ArrayBlockingQueue(capacity);
        }

        public void emit(T tuple) {
            try {
                this.tuples.put(tuple);
            }
            catch (InterruptedException ex) {
                throw new RuntimeException(ex);
            }
        }

        public void flush(int count) {
            Iterator<T> iterator = this.tuples.iterator();
            while (count-- > 0 && iterator.hasNext()) {
                super.emit(iterator.next());
                iterator.remove();
            }
        }
    }
}

