/*
 * Decompiled with CFR 0.152.
 */
package com.mware.core.model;

import com.codahale.metrics.Counter;
import com.mware.core.config.Configuration;
import com.mware.core.exception.BcException;
import com.mware.core.ingest.WorkerSpout;
import com.mware.core.ingest.WorkerTuple;
import com.mware.core.ingest.dataworker.WorkerItem;
import com.mware.core.model.workQueue.WebQueueRepository;
import com.mware.core.model.workQueue.WorkQueueRepository;
import com.mware.core.status.MetricsManager;
import com.mware.core.status.StatusServer;
import com.mware.core.util.BcLogger;
import com.mware.core.util.BcLoggerFactory;
import java.util.LinkedList;
import java.util.Queue;

public abstract class WorkerBase<TWorkerItem extends WorkerItem> {
    private final boolean statusEnabled;
    private final boolean exitOnNextTupleFailure;
    private final Counter queueSizeMetric;
    private final MetricsManager metricsManager;
    private final String queueSizeMetricName;
    private WorkQueueRepository workQueueRepository;
    private WebQueueRepository webQueueRepository;
    private volatile boolean shouldRun;
    private StatusServer statusServer = null;
    private final Queue<WorkerItemWrapper> tupleQueue = new LinkedList<WorkerItemWrapper>();
    private final int tupleQueueSize;
    private Thread processThread;

    protected WorkerBase(WorkQueueRepository workQueueRepository, WebQueueRepository webQueueRepository, Configuration configuration, MetricsManager metricsManager) {
        this.workQueueRepository = workQueueRepository;
        this.webQueueRepository = webQueueRepository;
        this.metricsManager = metricsManager;
        this.exitOnNextTupleFailure = configuration.getBoolean(this.getClass().getName() + ".exitOnNextTupleFailure", true);
        this.tupleQueueSize = configuration.getInt(this.getClass().getName() + ".tupleQueueSize", 10);
        this.statusEnabled = configuration.getBoolean("status.enabled", true);
        this.queueSizeMetricName = metricsManager.getNamePrefix(this) + "queue-size-" + Thread.currentThread().getId();
        this.queueSizeMetric = metricsManager.counter(this.queueSizeMetricName);
    }

    protected void finalize() throws Throwable {
        this.metricsManager.removeMetric(this.queueSizeMetricName);
        super.finalize();
    }

    public void run() throws Exception {
        BcLogger logger = BcLoggerFactory.getLogger(this.getClass());
        logger.info("begin runner", new Object[0]);
        WorkerSpout workerSpout = this.prepareWorkerSpout();
        this.shouldRun = true;
        if (this.statusEnabled) {
            this.statusServer = this.createStatusServer();
        }
        this.startProcessThread(logger, workerSpout);
        this.pollWorkerSpout(logger, workerSpout);
        logger.info("end runner", new Object[0]);
    }

    private void startProcessThread(BcLogger logger, WorkerSpout workerSpout) {
        this.processThread = new Thread(() -> {
            while (this.shouldRun) {
                WorkerItemWrapper workerItemWrapper = null;
                try {
                    Queue<WorkerItemWrapper> queue = this.tupleQueue;
                    synchronized (queue) {
                        while (true) {
                            if (this.shouldRun && this.tupleQueue.size() == 0) {
                                this.tupleQueue.wait();
                                continue;
                            }
                            if (!this.shouldRun) {
                                return;
                            }
                            if (this.tupleQueue.size() > 0) {
                                workerItemWrapper = this.tupleQueue.remove();
                                this.queueSizeMetric.dec();
                                this.tupleQueue.notifyAll();
                            }
                            if (!this.shouldRun || workerItemWrapper != null) break;
                        }
                    }
                }
                catch (Exception ex) {
                    throw new BcException("Could not get next workerItem", ex);
                }
                if (!this.shouldRun) {
                    return;
                }
                try {
                    logger.debug("start processing", new Object[0]);
                    long startTime = System.currentTimeMillis();
                    this.process(workerItemWrapper.getWorkerItem());
                    long endTime = System.currentTimeMillis();
                    logger.debug("completed processing in (%dms)", endTime - startTime);
                    workerSpout.ack(workerItemWrapper.getWorkerTuple());
                }
                catch (Throwable ex) {
                    logger.error("Could not process tuple: %s", workerItemWrapper, ex);
                    workerSpout.fail(workerItemWrapper.getWorkerTuple());
                }
            }
        });
        this.processThread.setName(Thread.currentThread().getName() + "-process");
        this.processThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void pollWorkerSpout(BcLogger logger, WorkerSpout workerSpout) throws InterruptedException {
        while (this.shouldRun) {
            WorkerItemWrapper workerItemWrapper;
            WorkerTuple tuple = null;
            try {
                tuple = workerSpout.nextTuple();
                if (tuple == null) {
                    workerItemWrapper = null;
                } else {
                    TWorkerItem workerItem = this.tupleDataToWorkerItem(tuple.getData());
                    workerItemWrapper = new WorkerItemWrapper(this, workerItem, tuple);
                }
            }
            catch (InterruptedException ex) {
                if (tuple != null) {
                    workerSpout.fail(tuple);
                }
                throw ex;
            }
            catch (Exception ex) {
                if (tuple != null) {
                    workerSpout.fail(tuple);
                }
                this.handleNextTupleException(logger, ex);
                continue;
            }
            if (workerItemWrapper == null) continue;
            Queue<WorkerItemWrapper> queue = this.tupleQueue;
            synchronized (queue) {
                this.tupleQueue.add(workerItemWrapper);
                this.queueSizeMetric.inc();
                this.tupleQueue.notifyAll();
                while (this.shouldRun && this.tupleQueue.size() >= this.tupleQueueSize) {
                    this.tupleQueue.wait();
                }
            }
        }
    }

    protected void handleNextTupleException(BcLogger logger, Exception ex) throws InterruptedException {
        if (this.exitOnNextTupleFailure) {
            throw new BcException("Failed to get next tuple", ex);
        }
        logger.error("Failed to get next tuple", ex);
        Thread.sleep(10000L);
    }

    protected abstract StatusServer createStatusServer() throws Exception;

    protected abstract void process(TWorkerItem var1) throws Exception;

    protected abstract TWorkerItem tupleDataToWorkerItem(byte[] var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        this.shouldRun = false;
        if (this.statusServer != null) {
            this.statusServer.shutdown();
        }
        Queue<WorkerItemWrapper> queue = this.tupleQueue;
        synchronized (queue) {
            this.tupleQueue.notifyAll();
        }
        try {
            if (this.processThread != null) {
                this.processThread.join(10000L);
            }
        }
        catch (InterruptedException e) {
            throw new BcException("Could not stop process thread: " + this.processThread.getName());
        }
    }

    protected WorkerSpout prepareWorkerSpout() {
        WorkerSpout spout = this.workQueueRepository.createWorkerSpout(this.getQueueName());
        spout.open();
        return spout;
    }

    protected abstract String getQueueName();

    protected WorkQueueRepository getWorkQueueRepository() {
        return this.workQueueRepository;
    }

    public WebQueueRepository getWebQueueRepository() {
        return this.webQueueRepository;
    }

    public boolean shouldRun() {
        return this.shouldRun;
    }

    private static class WorkerItemWrapper {
        private final TWorkerItem workerItem;
        private final WorkerTuple workerTuple;
        final /* synthetic */ WorkerBase this$0;

        public WorkerItemWrapper(TWorkerItem workerItem, WorkerTuple workerTuple) {
            this.this$0 = var1_1;
            this.workerItem = workerItem;
            this.workerTuple = workerTuple;
        }

        public Object getMessageId() {
            return this.workerTuple.getMessageId();
        }

        public WorkerTuple getWorkerTuple() {
            return this.workerTuple;
        }

        public TWorkerItem getWorkerItem() {
            return this.workerItem;
        }

        public String toString() {
            return "WorkerItemWrapper{messageId=" + this.getMessageId() + ", workerItem=" + this.workerItem + '}';
        }
    }
}

