/*
 * Decompiled with CFR 0.152.
 */
package com.ksyun.kmr.hadoop.fs.ks3.parallel;

import com.ksyun.kmr.hadoop.fs.ks3.Utils;
import com.ksyun.kmr.hadoop.fs.ks3.bean.Event;
import com.ksyun.kmr.hadoop.fs.ks3.parallel.EngineShutter;
import com.ksyun.kmr.hadoop.fs.ks3.parallel.EventHandler;
import com.ksyun.kmr.hadoop.fs.ks3.parallel.EventProducer;
import com.ksyun.kmr.hadoop.fs.ks3.parallel.ThreadFactoryBuilder;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import shadedforhadoopks3.com.google.common.util.concurrent.RateLimiter;
import shadedforhadoopks3.com.lmax.disruptor.BlockingWaitStrategy;
import shadedforhadoopks3.com.lmax.disruptor.RingBuffer;
import shadedforhadoopks3.com.lmax.disruptor.WaitStrategy;
import shadedforhadoopks3.com.lmax.disruptor.dsl.Disruptor;
import shadedforhadoopks3.com.lmax.disruptor.dsl.ProducerType;
import shadedforhadoopks3.org.apache.commons.lang3.tuple.Pair;

public class MultiActionEngine
implements EngineShutter {
    private Disruptor<Event> disruptor;
    private String name;
    private int bufferSize;
    private int parallelNum;
    private EventHandler.Processor processor;
    private EventProducer producer;
    private AtomicInteger receivedNum = new AtomicInteger(0);
    private AtomicInteger handledNum = new AtomicInteger(0);
    private boolean beginShutdown = false;
    private RateLimiter rateLimiter;
    public AtomicReference<Exception> exception;
    private ThreadFactoryBuilder threadFactoryBuilder;
    public static Pair<Integer, TimeUnit> defaultTimeoutConf = Pair.of(10, TimeUnit.MINUTES);
    public static Pair<Integer, TimeUnit> timeoutConf = defaultTimeoutConf;

    public static void resetTimeoutConf() {
        timeoutConf = defaultTimeoutConf;
    }

    public MultiActionEngine(String name, int bufferSize, int parallelNum, RateLimiter rateLimiter, AtomicReference<Exception> exception, EventHandler.Processor processor) {
        this.name = name;
        this.bufferSize = bufferSize;
        this.parallelNum = parallelNum;
        this.rateLimiter = rateLimiter;
        this.processor = processor;
        this.exception = exception;
        this.initDisruptor();
    }

    public MultiActionEngine(String name, int bufferSize, int parallelNum, int rateLimit, EventHandler.Processor processor) {
        this(name, bufferSize, parallelNum, RateLimiter.create(rateLimit), new AtomicReference<Exception>(), processor);
    }

    public MultiActionEngine(String name, int bufferSize, int parallelNum, int rateLimit, AtomicReference<Exception> exception, EventHandler.Processor processor) {
        this(name, bufferSize, parallelNum, RateLimiter.create(rateLimit), exception, processor);
    }

    public RateLimiter getRateLimiter() {
        return this.rateLimiter;
    }

    public boolean fail() {
        return this.exception.get() != null;
    }

    public boolean notFail() {
        return this.exception.get() == null;
    }

    public void increHandledNum() {
        this.handledNum.incrementAndGet();
    }

    public boolean getBeginShutdown() {
        return this.beginShutdown;
    }

    private void initDisruptor() {
        Disruptor<Event> disruptor = null;
        try {
            ThreadFactoryBuilder builder = new ThreadFactoryBuilder(this.name + "-multi-action-engine-%d");
            ThreadFactory threadFactory = builder.setDaemon(true).build();
            this.threadFactoryBuilder = builder;
            disruptor = new Disruptor<Event>(Event::new, this.bufferSize, threadFactory, ProducerType.MULTI, (WaitStrategy)new BlockingWaitStrategy());
            EventHandler[] eventHandlers = new EventHandler[this.parallelNum];
            for (int i = 0; i < this.parallelNum; ++i) {
                eventHandlers[i] = new EventHandler(this, this.processor);
            }
            disruptor.handleEventsWithWorkerPool(eventHandlers);
            disruptor.start();
            RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();
            this.producer = new EventProducer(ringBuffer);
            this.disruptor = disruptor;
        }
        catch (RuntimeException e) {
            try {
                if (disruptor != null) {
                    disruptor.shutdown(2L, TimeUnit.SECONDS);
                }
            }
            catch (Exception e2) {
                RuntimeException re = new RuntimeException("shutdown disruptor fail when init disruptor fail");
                e2.initCause(e);
                re.initCause(e2);
                throw re;
            }
            throw e;
        }
    }

    public boolean sendData(Map<String, Object> data) {
        this.receivedNum.incrementAndGet();
        if (this.notFail()) {
            this.producer.onData(data);
            return true;
        }
        return false;
    }

    private void initCause(Exception e) {
        Exception ce = this.exception.get();
        if (ce != null) {
            e.initCause(ce);
        }
    }

    private void throwException(String message) {
        RuntimeException ex = new RuntimeException(message);
        this.initCause(ex);
        throw ex;
    }

    @Override
    public void shutdown() {
        if (!this.beginShutdown) {
            this.beginShutdown = true;
            Exception shutdownException = null;
            try {
                if (this.disruptor != null) {
                    this.disruptor.shutdown(timeoutConf.getLeft().intValue(), timeoutConf.getRight());
                }
            }
            catch (Exception e) {
                this.initCause(e);
                shutdownException = e;
                try {
                    if (this.disruptor != null) {
                        this.disruptor.halt();
                    }
                    if (this.threadFactoryBuilder != null) {
                        for (Thread t : this.threadFactoryBuilder.getRecords().values()) {
                            t.interrupt();
                        }
                    }
                    Thread.sleep(1000L);
                }
                catch (Exception e2) {
                    e2.initCause(e);
                    shutdownException = e2;
                }
            }
            int recvNum = this.receivedNum.get();
            int handleNum = this.handledNum.get();
            String info = " recvnum " + recvNum + " handlenum " + handleNum;
            if (shutdownException != null) {
                Utils.rethrowRuntimeEx(shutdownException, "multi action engine shutdown fail," + info);
            }
            if (this.fail() || recvNum != handleNum) {
                this.throwException("multi action engine job fail," + info);
            }
        } else {
            this.throwException("multi action engine shutdown again");
        }
    }
}

