/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.lib.io.fs;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.util.DTThrowable;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
public abstract class AbstractReconciler<INPUT, QUEUETUPLE>
extends BaseOperator
implements Operator.CheckpointListener,
Operator.IdleTimeHandler {
    private static final Logger logger = LoggerFactory.getLogger(AbstractReconciler.class);
    public transient DefaultInputPort<INPUT> input = new DefaultInputPort<INPUT>(){

        public void process(INPUT input) {
            AbstractReconciler.this.processTuple(input);
        }
    };
    protected transient ExecutorService executorService;
    protected long currentWindowId;
    protected transient int spinningTime = 10;
    private Map<Long, List<QUEUETUPLE>> currentWindowTuples = Maps.newConcurrentMap();
    private Queue<Long> currentWindows = Queues.newLinkedBlockingQueue();
    protected Queue<QUEUETUPLE> committedTuples = Queues.newLinkedBlockingQueue();
    protected transient Queue<QUEUETUPLE> doneTuples = Queues.newLinkedBlockingQueue();
    private transient Queue<QUEUETUPLE> waitingTuples = Queues.newLinkedBlockingQueue();
    private volatile transient boolean execute;
    private transient AtomicReference<Throwable> cause;

    public void setup(Context.OperatorContext context) {
        if (context != null) {
            this.spinningTime = (Integer)context.getValue(Context.OperatorContext.SPIN_MILLIS);
        }
        this.execute = true;
        this.cause = new AtomicReference();
        this.waitingTuples.addAll(this.committedTuples);
        this.executorService = Executors.newSingleThreadExecutor((ThreadFactory)new NameableThreadFactory("Reconciler-Helper"));
        this.executorService.submit(this.processEnqueuedData());
    }

    public void beginWindow(long windowId) {
        this.currentWindowId = windowId;
        this.currentWindowTuples.put(this.currentWindowId, new ArrayList());
        this.currentWindows.add(windowId);
    }

    public void endWindow() {
        while (this.doneTuples.peek() != null) {
            this.committedTuples.remove(this.doneTuples.poll());
        }
    }

    public void handleIdleTime() {
        if (this.execute) {
            try {
                Thread.sleep(this.spinningTime);
            }
            catch (InterruptedException ie) {
                throw new RuntimeException(ie);
            }
        } else {
            logger.error("Exception: ", this.cause);
            DTThrowable.rethrow((Throwable)this.cause.get());
        }
    }

    public void checkpointed(long l) {
    }

    public void committed(long l) {
        logger.debug(" current committed window {}", (Object)l);
        if (this.currentWindows.isEmpty()) {
            return;
        }
        long processedWindowId = this.currentWindows.peek();
        while (processedWindowId <= l) {
            List<QUEUETUPLE> outputDataList = this.currentWindowTuples.get(processedWindowId);
            if (outputDataList != null && !outputDataList.isEmpty()) {
                this.committedTuples.addAll(outputDataList);
                this.waitingTuples.addAll(outputDataList);
            }
            this.currentWindows.remove();
            this.currentWindowTuples.remove(processedWindowId);
            if (this.currentWindows.isEmpty()) {
                return;
            }
            processedWindowId = this.currentWindows.peek();
        }
    }

    public void teardown() {
        this.execute = false;
        this.executorService.shutdownNow();
    }

    private Runnable processEnqueuedData() {
        return new Runnable(){

            @Override
            public void run() {
                try {
                    while (AbstractReconciler.this.execute) {
                        while (AbstractReconciler.this.waitingTuples.isEmpty()) {
                            Thread.sleep(AbstractReconciler.this.spinningTime);
                        }
                        Object output = AbstractReconciler.this.waitingTuples.remove();
                        AbstractReconciler.this.processCommittedData(output);
                        AbstractReconciler.this.doneTuples.add(output);
                    }
                }
                catch (Throwable e) {
                    AbstractReconciler.this.cause.set(e);
                    AbstractReconciler.this.execute = false;
                }
            }
        };
    }

    protected void enqueueForProcessing(QUEUETUPLE queueTuple) {
        this.currentWindowTuples.get(this.currentWindowId).add(queueTuple);
    }

    protected abstract void processTuple(INPUT var1);

    protected abstract void processCommittedData(QUEUETUPLE var1);
}

