/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.engine;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Sink;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.Pair;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.debug.MuxSink;
import com.datatorrent.stram.engine.GenericNode;
import com.datatorrent.stram.engine.InputNode;
import com.datatorrent.stram.engine.OiONode;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.engine.SweepableReservoir;
import com.datatorrent.stram.engine.UnifierNode;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.plan.logical.Operators;
import com.datatorrent.stram.tuple.EndStreamTuple;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.math.IntMath;
import java.beans.IntrospectionException;
import java.beans.Introspector;
import java.beans.PropertyDescriptor;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class Node<OPERATOR extends Operator>
implements Component<OperatorContext>,
Runnable {
    public static final String INPUT = "input";
    public static final String OUTPUT = "output";
    protected int APPLICATION_WINDOW_COUNT;
    protected int DAG_CHECKPOINT_WINDOW_COUNT;
    protected int CHECKPOINT_WINDOW_COUNT;
    protected boolean DATA_TUPLE_AWARE;
    protected int id;
    protected final HashMap<String, Sink<Object>> outputs;
    protected volatile Sink<Object>[] sinks = Sink.NO_SINKS;
    protected boolean alive;
    protected final OPERATOR operator;
    protected final Operators.PortMappingDescriptor descriptor;
    public long currentWindowId;
    protected long endWindowEmitTime;
    protected long lastSampleCpuTime;
    protected ThreadMXBean tmb;
    protected HashMap<SweepableReservoir, Long> endWindowDequeueTimes;
    protected Checkpoint checkpoint;
    public int applicationWindowCount;
    public int checkpointWindowCount;
    public int nextCheckpointWindowCount;
    public int dagCheckpointOffsetCount;
    protected int controlTupleCount;
    public final OperatorContext context;
    public final BlockingQueue<StatsListener.OperatorResponse> commandResponse;
    private final List<Field> metricFields;
    private final Map<String, Method> metricMethods;
    private ExecutorService executorService;
    private Queue<Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo>> taskQueue;
    protected Stats.CheckpointStats checkpointStats;
    public long firstWindowMillis;
    public long windowWidthMillis;
    protected Operator.ProcessingMode PROCESSING_MODE;
    protected volatile boolean shutdown;
    private static final Logger logger = LoggerFactory.getLogger(Node.class);

    public Node(OPERATOR operator, OperatorContext context) {
        this.operator = operator;
        this.context = context;
        this.executorService = Executors.newSingleThreadExecutor();
        this.taskQueue = new LinkedList<Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo>>();
        this.outputs = new HashMap();
        this.descriptor = new Operators.PortMappingDescriptor();
        Operators.describe(operator, this.descriptor);
        this.endWindowDequeueTimes = new HashMap();
        this.tmb = ManagementFactory.getThreadMXBean();
        this.commandResponse = new LinkedBlockingQueue<StatsListener.OperatorResponse>();
        this.metricFields = Lists.newArrayList();
        for (Field field : ReflectionUtils.getDeclaredFieldsIncludingInherited(operator.getClass())) {
            if (!field.isAnnotationPresent(AutoMetric.class)) continue;
            this.metricFields.add(field);
            field.setAccessible(true);
        }
        this.metricMethods = Maps.newHashMap();
        try {
            for (PropertyDescriptor pd : Introspector.getBeanInfo(operator.getClass()).getPropertyDescriptors()) {
                AutoMetric rfa;
                Method readMethod = pd.getReadMethod();
                if (readMethod == null || (rfa = readMethod.getAnnotation(AutoMetric.class)) == null) continue;
                this.metricMethods.put(pd.getName(), readMethod);
            }
        }
        catch (IntrospectionException e) {
            throw new RuntimeException("introspecting {}", e);
        }
    }

    public Operator getOperator() {
        return this.operator;
    }

    public void setup(OperatorContext context) {
        this.shutdown = false;
        logger.debug("Operator Context = {}", (Object)context);
        this.operator.setup((Context)context);
    }

    public void teardown() {
        for (Operators.PortContextPair<Operator.InputPort<?>> portContextPair : this.descriptor.inputPorts.values()) {
            ((Operator.InputPort)portContextPair.component).teardown();
        }
        for (Operators.PortContextPair<Operator.InputPort<?>> portContextPair : this.descriptor.outputPorts.values()) {
            ((Operator.OutputPort)portContextPair.component).teardown();
        }
        if (this.executorService != null) {
            this.executorService.shutdown();
            boolean terminated = false;
            try {
                terminated = this.executorService.awaitTermination(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                logger.debug("Wait for graceful executor service {} shutdown interrupted for node {}", new Object[]{this.executorService, this, interruptedException});
            }
            if (!terminated) {
                logger.warn("Shutting down executor service {} for node {}", (Object)this.executorService, (Object)this);
                this.executorService.shutdownNow();
            }
        }
        this.operator.teardown();
    }

    public Operators.PortMappingDescriptor getPortMappingDescriptor() {
        return this.descriptor;
    }

    public void connectOutputPort(String port, Sink<Object> sink) {
        Operators.PortContextPair<Operator.OutputPort<?>> outputPort = this.descriptor.outputPorts.get(port);
        if (outputPort != null) {
            if (sink == null) {
                ((Operator.OutputPort)outputPort.component).setSink(null);
                this.outputs.remove(port);
            } else {
                ((Operator.OutputPort)outputPort.component).setSink(sink);
                this.outputs.put(port, sink);
            }
        }
    }

    public abstract void connectInputPort(String var1, SweepableReservoir var2);

    public void addSinks(Map<String, Sink<Object>> sinks) {
        boolean changes = false;
        for (Map.Entry<String, Sink<Object>> e : sinks.entrySet()) {
            Operators.PortContextPair<Operator.OutputPort<?>> pcpair = this.descriptor.outputPorts.get(e.getKey());
            if (pcpair == null) continue;
            changes = true;
            Sink<Object> ics = this.outputs.get(e.getKey());
            if (ics == null) {
                ((Operator.OutputPort)pcpair.component).setSink(e.getValue());
                this.outputs.put(e.getKey(), e.getValue());
                changes = true;
                continue;
            }
            if (ics instanceof MuxSink) {
                ((MuxSink)ics).add(e.getValue());
                continue;
            }
            MuxSink muxSink = new MuxSink(ics, e.getValue());
            ((Operator.OutputPort)pcpair.component).setSink((Sink)muxSink);
            this.outputs.put(e.getKey(), muxSink);
            changes = true;
        }
        if (changes) {
            this.activateSinks();
        }
    }

    public void removeSinks(Map<String, Sink<Object>> sinks) {
        boolean changes = false;
        for (Map.Entry<String, Sink<Object>> e : sinks.entrySet()) {
            Operators.PortContextPair<Operator.OutputPort<?>> pcpair = this.descriptor.outputPorts.get(e.getKey());
            if (pcpair == null) continue;
            Sink<Object> ics = this.outputs.get(e.getKey());
            if (ics == e.getValue()) {
                ((Operator.OutputPort)pcpair.component).setSink(null);
                this.outputs.remove(e.getKey());
                changes = true;
                continue;
            }
            if (!(ics instanceof MuxSink)) continue;
            MuxSink ms = (MuxSink)ics;
            ms.remove(e.getValue());
            Sink<Object>[] sinks1 = ms.getSinks();
            if (sinks1.length == 0) {
                ((Operator.OutputPort)pcpair.component).setSink(null);
                this.outputs.remove(e.getKey());
                changes = true;
                continue;
            }
            if (sinks1.length != 1) continue;
            ((Operator.OutputPort)pcpair.component).setSink(sinks1[0]);
            this.outputs.put(e.getKey(), sinks1[0]);
            changes = true;
        }
        if (changes) {
            this.activateSinks();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        this.shutdown = true;
        Node node = this;
        synchronized (node) {
            this.alive = false;
        }
        if (this.context == null) {
            logger.warn("Shutdown requested when context is not available!");
        } else {
            this.context.request(new StatsListener.OperatorRequest(){

                public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException {
                    Node.this.alive = false;
                    return null;
                }
            });
        }
    }

    public String toString() {
        return String.valueOf(this.getId());
    }

    protected void emitEndStream() {
        EndStreamTuple est = new EndStreamTuple(this.currentWindowId);
        for (Sink<Object> output : this.outputs.values()) {
            output.put((Object)est);
        }
        ++this.controlTupleCount;
    }

    protected void emitEndWindow() {
        long windowId = this.operator instanceof Operator.DelayOperator ? WindowGenerator.getAheadWindowId(this.currentWindowId, this.firstWindowMillis, this.windowWidthMillis, 1) : this.currentWindowId;
        EndWindowTuple ewt = new EndWindowTuple(windowId);
        int s = this.sinks.length;
        while (s-- > 0) {
            this.sinks[s].put((Object)ewt);
        }
        ++this.controlTupleCount;
    }

    protected void handleRequests(long windowId) {
        try {
            BlockingQueue<StatsListener.OperatorRequest> requests = this.context.getRequests();
            int size = requests.size();
            if (size > 0) {
                while (size-- > 0) {
                    StatsListener.OperatorResponse response = ((StatsListener.OperatorRequest)requests.remove()).execute(this.operator, this.context.getId(), windowId);
                    if (response == null) continue;
                    this.commandResponse.add(response);
                }
            }
        }
        catch (Error er) {
            throw er;
        }
        catch (RuntimeException re) {
            throw re;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    protected Map<String, Object> collectMetrics() {
        if (this.context.areMetricsListed() && (this.context.metricsToSend == null || this.context.metricsToSend.isEmpty())) {
            return null;
        }
        HashMap metricValues = Maps.newHashMap();
        try {
            for (Field field : this.metricFields) {
                if (this.context.metricsToSend != null && !this.context.metricsToSend.contains(field.getName())) continue;
                metricValues.put(field.getName(), field.get(this.operator));
            }
            for (Map.Entry entry : this.metricMethods.entrySet()) {
                if (this.context.metricsToSend != null && !this.context.metricsToSend.contains(entry.getKey())) continue;
                metricValues.put(entry.getKey(), ((Method)entry.getValue()).invoke(this.operator, new Object[0]));
            }
            this.context.clearMetrics();
            return metricValues;
        }
        catch (IllegalAccessException | InvocationTargetException iae) {
            throw new RuntimeException(iae);
        }
    }

    protected void reportStats(Stats.OperatorStats stats, long windowId) {
        stats.outputPorts = new ArrayList();
        for (Map.Entry<String, Sink<Object>> e : this.outputs.entrySet()) {
            Stats.OperatorStats.PortStats portStats = new Stats.OperatorStats.PortStats(e.getKey());
            portStats.tupleCount = e.getValue().getCount(true) - this.controlTupleCount;
            portStats.endWindowTimestamp = this.endWindowEmitTime;
            stats.outputPorts.add(portStats);
        }
        this.controlTupleCount = 0;
        long currentCpuTime = this.tmb.getCurrentThreadCpuTime();
        stats.cpuTimeUsed = currentCpuTime - this.lastSampleCpuTime;
        this.lastSampleCpuTime = currentCpuTime;
        if (this.checkpoint != null) {
            stats.checkpoint = this.checkpoint;
            stats.checkpointStats = this.checkpointStats;
            this.checkpointStats = null;
            this.checkpoint = null;
        } else {
            Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo> pair = this.taskQueue.peek();
            if (pair != null && ((FutureTask)pair.getFirst()).isDone()) {
                this.taskQueue.poll();
                try {
                    CheckpointWindowInfo checkpointWindowInfo = (CheckpointWindowInfo)pair.getSecond();
                    stats.checkpointStats = (Stats.CheckpointStats)((FutureTask)pair.getFirst()).get();
                    stats.checkpoint = new Checkpoint(checkpointWindowInfo.windowId, checkpointWindowInfo.applicationWindowCount, checkpointWindowInfo.checkpointWindowCount);
                    if (this.operator instanceof Operator.CheckpointListener) {
                        ((Operator.CheckpointListener)this.operator).checkpointed(checkpointWindowInfo.windowId);
                    }
                }
                catch (Exception ex) {
                    throw Throwables.propagate((Throwable)ex);
                }
            }
        }
        this.context.report(stats, windowId);
    }

    protected void activateSinks() {
        int size = this.outputs.size();
        if (size == 0) {
            this.sinks = Sink.NO_SINKS;
        } else {
            Sink[] newSinks = (Sink[])Array.newInstance(Sink.class, size);
            for (Sink<Object> s : this.outputs.values()) {
                newSinks[--size] = s;
            }
            this.sinks = newSinks;
        }
    }

    protected void deactivateSinks() {
        this.sinks = Sink.NO_SINKS;
    }

    void checkpoint(long windowId) {
        if (!this.context.stateless) {
            StorageAgent ba;
            if (this.operator instanceof Operator.CheckpointNotificationListener) {
                ((Operator.CheckpointNotificationListener)this.operator).beforeCheckpoint(windowId);
            }
            if ((ba = (StorageAgent)this.context.getValue(OperatorContext.STORAGE_AGENT)) != null) {
                try {
                    AsyncFSStorageAgent asyncFSStorageAgent;
                    this.checkpointStats = new Stats.CheckpointStats();
                    this.checkpointStats.checkpointStartTime = System.currentTimeMillis();
                    ba.save(this.operator, this.id, windowId);
                    if (ba instanceof AsyncFSStorageAgent && !(asyncFSStorageAgent = (AsyncFSStorageAgent)ba).isSyncCheckpoint()) {
                        if (this.PROCESSING_MODE != Operator.ProcessingMode.EXACTLY_ONCE) {
                            CheckpointWindowInfo checkpointWindowInfo = new CheckpointWindowInfo();
                            checkpointWindowInfo.windowId = windowId;
                            checkpointWindowInfo.applicationWindowCount = this.applicationWindowCount;
                            checkpointWindowInfo.checkpointWindowCount = this.checkpointWindowCount;
                            CheckpointHandler checkpointHandler = new CheckpointHandler();
                            checkpointHandler.agent = asyncFSStorageAgent;
                            checkpointHandler.operatorId = this.id;
                            checkpointHandler.windowId = windowId;
                            checkpointHandler.stats = this.checkpointStats;
                            FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<Stats.CheckpointStats>(checkpointHandler);
                            this.taskQueue.add((Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo>)new Pair(futureTask, (Object)checkpointWindowInfo));
                            this.executorService.submit(futureTask);
                            this.checkpoint = null;
                            this.checkpointStats = null;
                            return;
                        }
                        asyncFSStorageAgent.copyToHDFS(this.id, windowId);
                    }
                    this.checkpointStats.checkpointTime = System.currentTimeMillis() - this.checkpointStats.checkpointStartTime;
                }
                catch (IOException ie) {
                    try {
                        logger.warn("Rolling back checkpoint {} for Operator {} due to the exception {}", new Object[]{Codec.getStringWindowId((long)windowId), this.operator, ie});
                        ba.delete(this.id, windowId);
                    }
                    catch (IOException ex) {
                        logger.warn("Error while rolling back checkpoint", (Throwable)ex);
                    }
                    throw new RuntimeException(ie);
                }
            }
        }
        this.calculateNextCheckpointWindow();
        this.dagCheckpointOffsetCount = 0;
        this.checkpoint = new Checkpoint(windowId, this.applicationWindowCount, this.checkpointWindowCount);
        if (this.operator instanceof Operator.CheckpointListener) {
            ((Operator.CheckpointListener)this.operator).checkpointed(windowId);
        }
    }

    protected void calculateNextCheckpointWindow() {
        this.nextCheckpointWindowCount = this.PROCESSING_MODE != Operator.ProcessingMode.EXACTLY_ONCE ? (this.DAG_CHECKPOINT_WINDOW_COUNT - this.dagCheckpointOffsetCount + this.CHECKPOINT_WINDOW_COUNT - 1) / this.CHECKPOINT_WINDOW_COUNT * this.CHECKPOINT_WINDOW_COUNT : 1;
    }

    public static Node<?> retrieveNode(Object operator, OperatorContext context, OperatorDeployInfo.OperatorType type) {
        logger.debug("type={}, operator class={}", (Object)type, operator.getClass());
        Node node = operator instanceof InputOperator && type == OperatorDeployInfo.OperatorType.INPUT ? new InputNode((InputOperator)operator, context) : (operator instanceof Operator.Unifier && type == OperatorDeployInfo.OperatorType.UNIFIER ? new UnifierNode((Operator.Unifier<Object>)((Operator.Unifier)operator), context) : (type == OperatorDeployInfo.OperatorType.OIO ? new OiONode((Operator)operator, context) : new GenericNode((Operator)operator, context)));
        return node;
    }

    public int getId() {
        return this.id;
    }

    public void setId(int id) {
        if (this.id != 0) {
            throw new RuntimeException("Id cannot be changed from " + this.id + " to " + id);
        }
        this.id = id;
    }

    public void activate() {
        this.alive = true;
        this.APPLICATION_WINDOW_COUNT = (Integer)this.context.getValue(OperatorContext.APPLICATION_WINDOW_COUNT);
        if (this.context.getValue(OperatorContext.SLIDE_BY_WINDOW_COUNT) != null) {
            int slidingWindowCount = (Integer)this.context.getValue(OperatorContext.SLIDE_BY_WINDOW_COUNT);
            this.APPLICATION_WINDOW_COUNT = IntMath.gcd((int)this.APPLICATION_WINDOW_COUNT, (int)slidingWindowCount);
        }
        this.DAG_CHECKPOINT_WINDOW_COUNT = (Integer)this.context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
        this.CHECKPOINT_WINDOW_COUNT = (Integer)this.context.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT);
        Collection statsListeners = (Collection)this.context.getValue(OperatorContext.STATS_LISTENERS);
        if (this.CHECKPOINT_WINDOW_COUNT % this.APPLICATION_WINDOW_COUNT != 0) {
            logger.warn("{} is not exact multiple of {} for operator {}. This may cause side effects such as processing to begin without beginWindow preceding it in the first window after activation.", new Object[]{OperatorContext.CHECKPOINT_WINDOW_COUNT, OperatorContext.APPLICATION_WINDOW_COUNT, this.operator});
        }
        this.PROCESSING_MODE = (Operator.ProcessingMode)this.context.getValue(OperatorContext.PROCESSING_MODE);
        if (this.PROCESSING_MODE == Operator.ProcessingMode.EXACTLY_ONCE && this.CHECKPOINT_WINDOW_COUNT != 1) {
            logger.warn("Ignoring {} attribute in favor of {} processing mode", (Object)OperatorContext.CHECKPOINT_WINDOW_COUNT.getSimpleName(), (Object)Operator.ProcessingMode.EXACTLY_ONCE.name());
            this.CHECKPOINT_WINDOW_COUNT = 1;
        }
        this.activateSinks();
        if (this.operator instanceof Operator.ActivationListener) {
            ((Operator.ActivationListener)this.operator).activate((Context)this.context);
        }
        if (statsListeners != null) {
            Iterator iterator = statsListeners.iterator();
            while (iterator.hasNext()) {
                this.DATA_TUPLE_AWARE = ((StatsListener)iterator.next()).getClass().isAnnotationPresent(StatsListener.DataQueueSize.class);
                if (!this.DATA_TUPLE_AWARE) continue;
            }
        }
        if (!this.DATA_TUPLE_AWARE && this.operator instanceof StatsListener) {
            this.DATA_TUPLE_AWARE = this.operator.getClass().isAnnotationPresent(StatsListener.DataQueueSize.class);
        }
        this.handleRequests(this.currentWindowId);
    }

    public void deactivate() {
        if (this.operator instanceof Operator.ActivationListener) {
            ((Operator.ActivationListener)this.operator).deactivate();
        }
        if (!this.shutdown && !this.alive) {
            this.emitEndStream();
        }
        this.deactivateSinks();
    }

    private class CheckpointWindowInfo {
        public int applicationWindowCount;
        public int checkpointWindowCount;
        public long windowId;

        private CheckpointWindowInfo() {
        }
    }

    private class CheckpointHandler
    implements Callable<Stats.CheckpointStats> {
        public AsyncFSStorageAgent agent;
        public int operatorId;
        public long windowId;
        public Stats.CheckpointStats stats;

        private CheckpointHandler() {
        }

        @Override
        public Stats.CheckpointStats call() throws Exception {
            this.agent.copyToHDFS(Node.this.id, this.windowId);
            this.stats.checkpointTime = System.currentTimeMillis() - this.stats.checkpointStartTime;
            return this.stats;
        }
    }
}

