/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.cluster.task.runner;

import com.antgroup.geaflow.cluster.task.runner.ITaskRunner;
import com.antgroup.geaflow.common.exception.GeaflowInterruptedException;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTaskRunner<TASK>
implements ITaskRunner<TASK> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTaskRunner.class);
    private static final int POOL_TIMEOUT = 100;
    private final LinkedBlockingQueue<TASK> taskQueue = new LinkedBlockingQueue();
    protected volatile boolean running = true;

    @Override
    public void run() {
        while (this.running) {
            try {
                TASK task = this.taskQueue.poll(100L, TimeUnit.MILLISECONDS);
                if (!this.running || task == null) continue;
                this.process(task);
            }
            catch (InterruptedException e) {
                throw new GeaflowInterruptedException((Throwable)e);
            }
            catch (Throwable t) {
                LOGGER.error(t.getMessage(), t);
                throw new GeaflowRuntimeException(t);
            }
        }
    }

    @Override
    public void add(TASK task) {
        this.taskQueue.add(task);
    }

    protected abstract void process(TASK var1);

    @Override
    public void interrupt() {
    }

    @Override
    public void shutdown() {
        this.running = false;
    }
}

