/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.datamocker.schedule;

import com.oceanbase.tools.datamocker.core.Dispatcher;
import com.oceanbase.tools.datamocker.core.task.AbstractCallBack;
import com.oceanbase.tools.datamocker.core.task.TableTask;
import com.oceanbase.tools.datamocker.core.task.TableTaskContext;
import com.oceanbase.tools.datamocker.core.task.TableTaskInfo;
import com.oceanbase.tools.datamocker.core.write.output.MockerDataSource;
import com.oceanbase.tools.datamocker.core.write.output.MockerFile;
import com.oceanbase.tools.datamocker.model.enums.MockTaskStatus;
import com.oceanbase.tools.datamocker.model.exception.MockerError;
import com.oceanbase.tools.datamocker.model.exception.MockerException;
import com.oceanbase.tools.datamocker.schedule.MockContext;
import com.oceanbase.tools.datamocker.schedule.MockExecutorService;
import com.oceanbase.tools.datamocker.util.PrintUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public abstract class AbstractScheduler {
    private static final Logger log = LoggerFactory.getLogger(AbstractScheduler.class);
    private static final int CORE_POOL_SIZE;
    private static final int MAX_POOL_SIZE;
    private final MockExecutorService service;
    private final long startTimestamp;

    public AbstractScheduler() {
        ThreadPoolExecutor executor = this.pool();
        if (executor == null) {
            executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy());
        }
        Validate.isTrue((executor.getCorePoolSize() == executor.getMaximumPoolSize() ? 1 : 0) != 0, (String)"core pool size has to be equal to max pool size");
        Validate.isTrue((executor.getCorePoolSize() >= 5 ? 1 : 0) != 0, (String)"core pool size of thread pool can not be smaller than 5");
        this.service = new MockExecutorService(executor);
        this.startTimestamp = System.currentTimeMillis();
    }

    public MockContext execute(Dispatcher<TableTaskInfo> dispatcher) {
        log.info("Thread pool's initialization has been done. coreSize={},maxSize={}", (Object)CORE_POOL_SIZE, (Object)MAX_POOL_SIZE);
        MockContext context = new MockContext(this.service, dispatcher.getTaskId(), dispatcher.getName(), dispatcher.totalCount());
        final AbstractScheduler thisScheduler = this;
        int concurrentCount = dispatcher.getConcurrent();
        final boolean[] flags = new boolean[concurrentCount];
        for (int i = 0; i < concurrentCount; ++i) {
            flags[i] = true;
        }
        Callable<Integer> scheduleTask = () -> {
            MDC.put((String)"mocktask.workspace", (String)context.getTaskId());
            int totalCount = 0;
            int total = 0;
            long maxTimeout = 0L;
            Long timeoutSum = 0L;
            for (int i = 0; i < dispatcher.getConcurrent(); ++i) {
                for (int j = 0; j < dispatcher.getTaskSize(i); ++j) {
                    TableTaskInfo tableTask = (TableTaskInfo)dispatcher.getObj(i, j);
                    Long timeout = tableTask.getMetaData().getTimeoutMilliseconds();
                    timeoutSum = timeoutSum + timeout;
                    if (maxTimeout >= timeout) continue;
                    maxTimeout = timeout;
                }
            }
            long failCount = 0L;
            long maxFailCount = maxTimeout / 5000L + 36L;
            while (!Thread.currentThread().isInterrupted() && this.interval() < timeoutSum) {
                for (int i = 0; i < concurrentCount; ++i) {
                    if (!flags[i]) continue;
                    TableTaskInfo task = (TableTaskInfo)dispatcher.getObj(i, 0);
                    if (task != null) {
                        Map<Set<String>, Integer> dataGroups = this.scheduleDataTask(task.dataWriteGroups(), this.service.getActiveCount(), this.service.getCorePoolSize(), this.service.getMaximumPoolSize());
                        if (dataGroups == null) {
                            Thread.sleep(5000L);
                            log.warn("Insufficient thread resources, will retry, schema={}, tableName={}", task.getMetaData().getTableSchema(), (Object)task.getMetaData().getTableName());
                            if (failCount++ <= maxFailCount) continue;
                            log.warn("Task scheduling operation timed out, the scheduling thread will exit, timeout={} min", (Object)(maxTimeout / 60000L + 3L));
                            this.clearResource(dispatcher);
                            return totalCount;
                        }
                        int currentActive = 0;
                        for (Map.Entry<Set<String>, Integer> entry : dataGroups.entrySet()) {
                            currentActive += entry.getValue().intValue();
                        }
                        Set<Set<String>> columnGroups = this.scheduleColumnTask(task.columnGroups(), this.service.getActiveCount() + currentActive, this.service.getCorePoolSize(), this.service.getMaximumPoolSize());
                        if (columnGroups == null) {
                            Thread.sleep(5000L);
                            log.warn("Insufficient thread resources, will retry, schema={}, tableName={}", task.getMetaData().getTableSchema(), (Object)task.getMetaData().getTableName());
                            if (failCount++ <= maxFailCount) continue;
                            log.warn("Task scheduling operation timed out, the scheduling thread will exit, timeout={} min", (Object)(maxTimeout / 60000L + 3L));
                            this.clearResource(dispatcher);
                            return totalCount;
                        }
                        this.validateSet(columnGroups);
                        failCount = 0L;
                        if (!this.validateThreadResource(columnGroups, dataGroups, this.service.getActiveCount(), this.service.getMaximumPoolSize())) {
                            int required = columnGroups.size();
                            Set<Map.Entry<Set<String>, Integer>> entrySet = dataGroups.entrySet();
                            Iterator<Map.Entry<Set<String>, Integer>> iterator = entrySet.iterator();
                            while (iterator.hasNext()) {
                                Map.Entry<Set<String>, Integer> entry = iterator.next();
                                required += entry.getValue().intValue();
                            }
                            log.warn("The thread resource requirements given by the custom scheduling algorithm exceed the currently available thread resources, requiredThreadCount={}, availableThreadCount={}", (Object)required, (Object)(this.service.getMaximumPoolSize() - this.service.getActiveCount()));
                            this.clearResource(dispatcher);
                            return totalCount;
                        }
                        dispatcher.pop(i);
                        flags[i] = false;
                        TableTask mockTaskBean = new TableTask(task, columnGroups, dataGroups, dispatcher.getName(), i);
                        mockTaskBean.getContext().setStatus(MockTaskStatus.PENDING);
                        mockTaskBean.init(this.service, new AbstractCallBack<TableTaskContext>(){

                            @Override
                            public void doOnSuccess(TableTaskContext param) {
                                ((MockerDataSource)param.getDataSource()).clear();
                                for (MockerFile fileManager : param.getFileManagers()) {
                                    fileManager.close();
                                }
                                flags[param.getTopIndex()] = true;
                                try {
                                    thisScheduler.onSuccess(param);
                                }
                                catch (Throwable e) {
                                    log.error("Some errors happened when scheduler onSuccess executed", e);
                                }
                            }

                            @Override
                            public void doOnFailure(TableTaskContext param, Throwable e) {
                                ((MockerDataSource)param.getDataSource()).clear();
                                for (MockerFile fileManager : param.getFileManagers()) {
                                    fileManager.close();
                                }
                                flags[param.getTopIndex()] = true;
                                try {
                                    thisScheduler.onFailure(param, e);
                                }
                                catch (Throwable e1) {
                                    log.error("Some errors happened when scheduler onFailure executed", e);
                                }
                            }
                        });
                        if (this.service.isShutdown()) {
                            ((MockerDataSource)task.getDataSource()).clear();
                            for (MockerFile manager : task.getFileManagers()) {
                                manager.close();
                            }
                            this.clearResource(dispatcher);
                            log.warn("Task has been shutdown, total task executed is {}", (Object)totalCount);
                            return totalCount;
                        }
                        ++totalCount;
                        TableTaskContext mockContext = this.service.submit(mockTaskBean);
                        context.appendContext(mockContext);
                        continue;
                    }
                    ++total;
                    flags[i] = false;
                }
                if (total < concurrentCount) continue;
            }
            this.clearResource(dispatcher);
            log.info("Scheduled task execution completed, totalTaskExecuted={}, duration={}", (Object)totalCount, (Object)PrintUtil.convertToReadableTimeString(this.interval(), TimeUnit.MILLISECONDS, TimeUnit.MINUTES, TimeUnit.SECONDS));
            MDC.clear();
            return totalCount;
        };
        this.service.submitCallable(scheduleTask);
        return context;
    }

    private void clearResource(Dispatcher<TableTaskInfo> dispatcher) throws Exception {
        for (int i = 0; i < dispatcher.getConcurrent(); ++i) {
            for (int j = 0; j < dispatcher.getTaskSize(i); ++j) {
                TableTaskInfo bean = dispatcher.getObj(i, j);
                ((MockerDataSource)bean.getDataSource()).clear();
                for (MockerFile manager : bean.getFileManagers()) {
                    manager.close();
                }
            }
        }
    }

    private boolean validateThreadResource(Set<Set<String>> columnGroups, Map<Set<String>, Integer> dataGroups, int active, int max) {
        int freeResource = max - active;
        if (freeResource < 0) {
            throw new MockerException(MockerError.UNKNOWN_ERROR, "Free resource thread pool size is smaller than zero");
        }
        int required = columnGroups.size();
        Set<Map.Entry<Set<String>, Integer>> entrySet = dataGroups.entrySet();
        for (Map.Entry<Set<String>, Integer> entry : entrySet) {
            if (entry.getValue() <= 0) {
                throw new MockerException(MockerError.PARAMETER_ERROR, "Thread count can not be smaller than zero");
            }
            required += entry.getValue().intValue();
        }
        return required < freeResource;
    }

    private void validateSet(Set<Set<String>> input) {
        ArrayList<Set<String>> middle = new ArrayList<Set<String>>(input);
        for (int i = 0; i < middle.size(); ++i) {
            HashSet copyObj = new HashSet((Collection)middle.get(i));
            for (int j = i + 1; j < middle.size(); ++j) {
                copyObj.retainAll((Collection)middle.get(j));
                if (copyObj.size() == 0) continue;
                throw new MockerException(MockerError.PARAMETER_ERROR, "Column group set is illegal");
            }
        }
    }

    private long interval() {
        return System.currentTimeMillis() - this.startTimestamp;
    }

    protected abstract Set<Set<String>> scheduleColumnTask(Set<String> var1, int var2, int var3, int var4);

    protected abstract Map<Set<String>, Integer> scheduleDataTask(Set<String> var1, int var2, int var3, int var4);

    protected abstract void onSuccess(TableTaskContext var1);

    protected abstract void onFailure(TableTaskContext var1, Throwable var2);

    public abstract ThreadPoolExecutor pool();

    static {
        MAX_POOL_SIZE = CORE_POOL_SIZE = Math.max(Runtime.getRuntime().availableProcessors(), 5);
    }
}

