/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.loaddump.ringbuffer;

import com.google.common.base.Preconditions;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LiteBlockingWaitStrategy;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.oceanbase.tools.loaddump.common.constants.Constants;
import com.oceanbase.tools.loaddump.common.model.Insertion;
import com.oceanbase.tools.loaddump.ringbuffer.InsertionEventFactory;
import com.oceanbase.tools.loaddump.ringbuffer.Slf4jExceptionHandler;
import com.oceanbase.tools.loaddump.utils.StringUtils;
import com.oceanbase.tools.loaddump.vmoption.JavaOpts;
import com.oceanbase.tools.loaddump.writer.oceanbase.AbstractOceanBaseWriter;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RingBufferGroup {
    private static final Logger log = LoggerFactory.getLogger(RingBufferGroup.class);
    private final int capacity;
    private final int threads;
    private final Map<String, ThreadPoolExecutor> executorGroup;
    private final Map<String, WorkerPool<Insertion>> workerGroup;
    private final Map<String, RingBuffer<Insertion>> bufferGroup;

    public RingBufferGroup(int capacity, int threads) {
        this.threads = threads;
        this.capacity = capacity;
        this.workerGroup = new ConcurrentHashMap<String, WorkerPool<Insertion>>();
        this.bufferGroup = new ConcurrentHashMap<String, RingBuffer<Insertion>>();
        this.executorGroup = new ConcurrentHashMap<String, ThreadPoolExecutor>();
    }

    public void register(String leaderServer, AbstractOceanBaseWriter[] writers, ThreadPoolExecutor executor) {
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank(leaderServer), (Object)"The leader server is null");
        Preconditions.checkArgument((writers != null && writers.length > 0 ? 1 : 0) != 0, (Object)"The writers is null");
        Preconditions.checkArgument((executor != null ? 1 : 0) != 0, (Object)"The thread pool is null");
        WaitStrategy waitStrategy = RingBufferGroup.findBestWaitStrategy();
        InsertionEventFactory factory = new InsertionEventFactory();
        RingBuffer buffer = RingBuffer.createMultiProducer((EventFactory)factory, (int)this.capacity, (WaitStrategy)waitStrategy);
        log.info("Create {} slots for ring buffer finished. [{}]", (Object)this.capacity, (Object)leaderServer);
        SequenceBarrier barrier = buffer.newBarrier(new Sequence[0]);
        Slf4jExceptionHandler handler = new Slf4jExceptionHandler();
        WorkerPool pool = new WorkerPool(buffer, barrier, (ExceptionHandler)handler, (WorkHandler[])writers);
        buffer.addGatingSequences(pool.getWorkerSequences());
        pool.start((Executor)executor);
        this.workerGroup.putIfAbsent(leaderServer, (WorkerPool<Insertion>)pool);
        this.bufferGroup.putIfAbsent(leaderServer, (RingBuffer<Insertion>)buffer);
        this.executorGroup.putIfAbsent(leaderServer, executor);
    }

    public static WaitStrategy findBestWaitStrategy() {
        int cores = Integer.getInteger("parallel.count", Constants.AVAILABLE_CPUS);
        Object waitStrategy = cores < 16 ? new LiteBlockingWaitStrategy() : (cores < 48 ? new SleepingWaitStrategy() : new PhasedBackoffWaitStrategy(100L, 100L, TimeUnit.NANOSECONDS, (WaitStrategy)new BlockingWaitStrategy()));
        log.debug("Use {} as available cpu(s) is {}", (Object)ClassUtils.getAbbreviatedName(waitStrategy.getClass(), (int)5), (Object)cores);
        return waitStrategy;
    }

    public RingBuffer<Insertion> getBuffer(String leaderServer) {
        Optional<RingBuffer<Insertion>> bufferOptional;
        Preconditions.checkArgument((leaderServer != null ? 1 : 0) != 0, (Object)"Input leader server is null");
        RingBuffer<Insertion> buffer = this.bufferGroup.get(leaderServer);
        if (buffer == null && (bufferOptional = this.bufferGroup.values().stream().findFirst()).isPresent()) {
            buffer = bufferOptional.get();
        }
        Preconditions.checkState((buffer != null ? 1 : 0) != 0, (String)"The buffer: [%s] is unregistered", (Object)leaderServer);
        if (JavaOpts.isRebalanceImport && buffer.remainingCapacity() == 0L) {
            for (Map.Entry<String, RingBuffer<Insertion>> entry : this.bufferGroup.entrySet()) {
                if (entry.getValue().remainingCapacity() >= buffer.remainingCapacity()) continue;
                log.debug("The buffer: [{}] is full, rebalance to an idle buffer: [{}]", (Object)leaderServer, (Object)entry.getKey());
                return entry.getValue();
            }
        }
        return buffer;
    }

    public void halt() {
        this.executorGroup.values().stream().filter(Objects::nonNull).forEach(e -> {
            try {
                e.shutdownNow();
            }
            catch (Exception ex) {
                log.warn("Shutdown executor group now failed, ignore it", (Throwable)ex);
            }
        });
        this.workerGroup.values().stream().filter(Objects::nonNull).forEach(w -> {
            try {
                w.halt();
            }
            catch (Exception e) {
                log.warn("Halt worker group failed, ignore it", (Throwable)e);
            }
        });
        log.info("Halt all workers immediately at the end of their current cycle");
    }

    public void drainAndHalt() {
        this.executorGroup.values().stream().filter(Objects::nonNull).forEach(e -> {
            try {
                e.shutdown();
            }
            catch (Exception ex) {
                log.warn("Shutdown executor group failed, ignore it", (Throwable)ex);
            }
        });
        this.workerGroup.values().stream().filter(Objects::nonNull).forEach(w -> {
            try {
                w.drainAndHalt();
            }
            catch (Exception ex) {
                log.warn("Drain and halt worker group failed, ignore it", (Throwable)ex);
            }
        });
        log.debug("Drain and halt the worker group finished");
    }

    public int getCapacity() {
        return this.capacity;
    }

    public int getThreads() {
        return this.threads;
    }
}

