/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.loaddump.loader.record;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alipay.oceanbase.rpc.ObGlobal;
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObLoadDupActionType;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.oceanbase.partition.calculator.ObPartIdCalculator;
import com.oceanbase.partition.calculator.cache.ObTableCache;
import com.oceanbase.partition.calculator.helper.TableEntryExtractor;
import com.oceanbase.partition.calculator.model.TableEntry;
import com.oceanbase.partition.calculator.model.TableEntryKey;
import com.oceanbase.tools.loaddump.base.State;
import com.oceanbase.tools.loaddump.common.enums.LoadStatus;
import com.oceanbase.tools.loaddump.common.enums.ObjectType;
import com.oceanbase.tools.loaddump.common.enums.ServerMode;
import com.oceanbase.tools.loaddump.common.enums.TaskType;
import com.oceanbase.tools.loaddump.common.model.Database;
import com.oceanbase.tools.loaddump.common.model.LoadParameter;
import com.oceanbase.tools.loaddump.common.model.ServerStatus;
import com.oceanbase.tools.loaddump.common.model.SubFile;
import com.oceanbase.tools.loaddump.common.model.TableInfo;
import com.oceanbase.tools.loaddump.common.model.TaskDetail;
import com.oceanbase.tools.loaddump.common.model.TaskState;
import com.oceanbase.tools.loaddump.concurrent.ExecutorTemplate;
import com.oceanbase.tools.loaddump.concurrent.NamedThreadFactory;
import com.oceanbase.tools.loaddump.concurrent.ThreadPoolBuilder;
import com.oceanbase.tools.loaddump.context.LoadContext;
import com.oceanbase.tools.loaddump.context.TaskContext;
import com.oceanbase.tools.loaddump.directpath.DirectPathConnection;
import com.oceanbase.tools.loaddump.loader.AbstractLoader;
import com.oceanbase.tools.loaddump.loader.ILoader;
import com.oceanbase.tools.loaddump.metrics.Meter;
import com.oceanbase.tools.loaddump.metrics.MetricRegistry;
import com.oceanbase.tools.loaddump.metrics.Slf4jReporter;
import com.oceanbase.tools.loaddump.parser.record.Record;
import com.oceanbase.tools.loaddump.parser.record.csv.CsvFormat;
import com.oceanbase.tools.loaddump.ringbuffer.RingBufferGroup;
import com.oceanbase.tools.loaddump.utils.CollectionUtils;
import com.oceanbase.tools.loaddump.utils.DBUtils;
import com.oceanbase.tools.loaddump.utils.ExceptionUtils;
import com.oceanbase.tools.loaddump.utils.NumberUtils;
import com.oceanbase.tools.loaddump.utils.SerializeUtils;
import com.oceanbase.tools.loaddump.utils.SqlUtils;
import com.oceanbase.tools.loaddump.utils.StringUtils;
import com.oceanbase.tools.loaddump.utils.TimeUtils;
import com.oceanbase.tools.loaddump.writer.oceanbase.AbstractOceanBaseWriter;
import com.oceanbase.tools.loaddump.writer.oceanbase.DirectPathWriter;
import com.oceanbase.tools.loaddump.writer.oceanbase.JdbcClientWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRecordFileLoader
extends AbstractLoader {
    private static final Logger log = LoggerFactory.getLogger(AbstractRecordFileLoader.class);
    private static final int SCHEMA_REFRESH_TIME = 10000;
    private final MetricRegistry registry = new MetricRegistry();
    protected long blockSize;
    protected Database database;
    protected double readWriteRatio;
    protected String checkpointPath;
    protected RingBufferGroup bufferGroup;
    protected ExecutorService readerExecutor;
    protected Map<String, CsvFormat> csvFormatMap;
    protected final Map<String, Optional<TableEntry>> tableEntryMap;
    protected final ScheduledExecutorService logReporterExecutor;
    protected final ScheduledExecutorService loadRefreshExecutor;
    protected final Map<String, Map<Long, String>> tablePartLeaderMap;
    protected final Map<String, Map<Long, LoadStatus>> tableLoadStatusMap;
    private final List<AbstractOceanBaseWriter> oceanBaseWriters = new ArrayList<AbstractOceanBaseWriter>();
    protected TableEntryExtractor extractor = new TableEntryExtractor();
    protected LoadContext loadCtx;
    private ConcurrentHashMap<String, DirectPathConnection> directLoadConnMap;
    private boolean noAuthOnQueryLeaderLocation;
    private boolean noAuthOnMonitorMemTable;

    public AbstractRecordFileLoader(LoadParameter parameter) {
        super(parameter);
        this.csvFormatMap = parameter.initCsvFormatMap();
        this.database = parameter.getDatabase();
        this.blockSize = parameter.getBlockSize();
        this.readWriteRatio = parameter.getReadWriteRatio();
        this.checkpointPath = parameter.getCheckpointPath();
        this.tableEntryMap = new ConcurrentHashMap<String, Optional<TableEntry>>(16);
        this.tablePartLeaderMap = new ConcurrentHashMap<String, Map<Long, String>>(16);
        this.tableLoadStatusMap = new ConcurrentHashMap<String, Map<Long, LoadStatus>>(16);
        this.logReporterExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("log-reporter-"), new ThreadPoolExecutor.DiscardOldestPolicy());
        this.loadRefreshExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("load-refresher-"), new ThreadPoolExecutor.DiscardOldestPolicy());
    }

    @Override
    public ILoader prepare() throws Exception {
        ExecutorTemplate.setPoolSize(this.parameter.getThreads());
        if (this.parameter.isRetry()) {
            File checkpoint = new File(this.checkpointPath);
            if (!checkpoint.exists() || checkpoint.isDirectory()) {
                throw new FileNotFoundException("The file: \"" + this.checkpointPath + "\" is missing");
            }
            List<SubFile> persistFiles = SerializeUtils.deserializeListByKryo(this.checkpointPath);
            Preconditions.checkState((boolean)ObjectUtils.isNotEmpty(persistFiles), (Object)"No subfiles are recovered from the checkpoint file");
            ArrayList<SubFile> recoveryFiles = new ArrayList<SubFile>();
            for (SubFile persistFile : persistFiles) {
                if (TaskState.SUCCESS != persistFile.getTaskState()) {
                    persistFile.getParsedCount().set(0L);
                    persistFile.getResource().setStorageConfig(this.parameter.getStorageConfig());
                }
                recoveryFiles.add(persistFile);
            }
            Preconditions.checkState((boolean)ObjectUtils.isNotEmpty(recoveryFiles), (Object)"Maybe all the subfiles have already been load finished");
            this.parameter.setSubFiles(recoveryFiles);
            log.info("Recovery {} non-success subfiles finished", (Object)recoveryFiles.size());
        } else {
            this.parameter.setSubFiles(this.generateSubFiles());
            this.truncateTableIfNecessary();
            Preconditions.checkArgument((boolean)ObjectUtils.isNotEmpty(this.parameter.getSubFiles()), (String)"No subfiles are generated from path: %s", (Object)this.filePath);
            for (SubFile subFile : this.parameter.getSubFiles()) {
                subFile.setTaskState(TaskState.INITIAL);
                subFile.setObjectType(ObjectType.TABLE.getName());
                subFile.setSchemaName(this.database.getSchemaName());
                subFile.setTaskDetail(new TaskDetail());
                subFile.getTaskDetail().setState(TaskState.INITIAL);
                subFile.getTaskDetail().setTaskType(TaskType.LOAD_RECORD);
                subFile.getTaskDetail().setObject(subFile.getObjectName());
                subFile.getTaskDetail().setType(ObjectType.TABLE.getName());
            }
        }
        this.loadCtx = new LoadContext(this.parameter);
        if (this.parameter.isDirectMode()) {
            this.buildDirectLoadConnMaps();
            this.loadCtx.setDirectLoadConnMap(this.directLoadConnMap);
        } else {
            this.preloadTableEntryCollection();
        }
        try {
            SerializeUtils.serializeListByKryo(this.parameter.getSubFiles(), this.checkpointPath);
        }
        catch (Exception e) {
            this.parameter.setCanWrite(false);
            log.warn("Unable to serialize the checkpoint file. But it won't affect the data integrity. Reason: {}.", (Object)ExceptionUtils.getRootCauseMessage(e));
        }
        this.state = State.PREPARE;
        return this;
    }

    private void preloadTableEntryCollection() throws Exception {
        ServerMode serverMode = this.database.getServerMode();
        if (this.parameter.isReplaceData() && serverMode.isOracleMode() && !serverMode.isPrevious("2.2.76")) {
            log.info("Querying primary/unique constraints metadata for data replacing...");
            ExecutorTemplate queryConstraintTemplate = new ExecutorTemplate("query-table-constraints-", this.parameter.getThreads());
            this.database.getTableInfoMap().values().forEach(tableInfo -> queryConstraintTemplate.submit(() -> {
                String schemaName = tableInfo.getSchema();
                String tableName = tableInfo.getTable();
                try {
                    tableInfo.setPrimaryCols(this.metadataProvider.queryPrimaryKeyList(serverMode, schemaName, tableName));
                    tableInfo.setNotNullUniqueKeyMap(this.metadataProvider.queryNonNullUniqueKeyMap(serverMode, schemaName, tableName));
                }
                catch (Exception e) {
                    log.error("Querying primary/unique key failed. Reason: ", (Throwable)e);
                }
                tableInfo.preparePrimaryForReplace();
            }));
        }
        HashSet tableInfoCopies = Sets.newHashSet(this.database.getTableInfoMap().values());
        AtomicLong remain = new AtomicLong(tableInfoCopies.size());
        Iterator iter = tableInfoCopies.iterator();
        while (iter.hasNext()) {
            TableInfo tableInfo2 = (TableInfo)iter.next();
            if (tableInfo2.isWithDatafiles()) continue;
            iter.remove();
            remain.decrementAndGet();
            log.debug("Ignore to query table entry for table: \"{}\" without datafiles. Remain: {}", (Object)tableInfo2.getTable(), (Object)remain);
        }
        if (remain.get() <= 0L) {
            log.info("Ignore to query table entry for any tables as they are all without datafiles");
            return;
        }
        ExecutorTemplate<Void> template = new ExecutorTemplate<Void>("preload-table-entry-", this.parameter.getThreads());
        boolean isSubsequentV4 = serverMode.isSubsequent("4.0.0.0");
        boolean isPubCloud = this.connectionKey.hasNoSysPrivileges();
        for (TableInfo tableInfoCopy : tableInfoCopies) {
            template.submit(() -> {
                TableEntry tableEntry;
                String table = tableInfoCopy.getTable();
                TableEntryKey tableEntryKey = this.connectionKey.createTableEntryKey(table);
                if (isPubCloud && !isSubsequentV4) {
                    tableEntry = this.getTableEntryForPubCloud(tableEntryKey);
                } else if (isSubsequentV4) {
                    try {
                        tableEntry = this.extractor.queryTableEntry(this.sessionManager.getBusinessDataSource(), tableEntryKey, true);
                    }
                    catch (Exception e) {
                        tableEntry = null;
                        if (!this.noAuthOnQueryLeaderLocation) {
                            log.warn("No authorized to query system tables/views, partition-based load balance will be disabled. Detail message: {}", (Object)e.getMessage());
                            this.noAuthOnQueryLeaderLocation = true;
                        }
                    }
                } else {
                    tableEntry = this.extractor.queryTableEntry(this.sessionManager.getSystemDataSource(), tableEntryKey, false);
                }
                ObTableCache.getInstance().addTableEntry(tableEntry);
                this.tableEntryMap.put(table, Optional.ofNullable(tableEntry));
                this.tableLoadStatusMap.put(table, new ConcurrentHashMap(16));
                long r = remain.decrementAndGet();
                log.info("Query table entry for table: \"{}\" finished. Remain: {}", (Object)table, (Object)r);
                return null;
            });
        }
        template.waitForResult();
    }

    private TableEntry getTableEntryForPubCloud(TableEntryKey tableEntryKey) {
        TableEntry tableEntry = new TableEntry();
        tableEntry.setPartitionNum(Long.valueOf(1L));
        tableEntry.setTableName(tableEntryKey.getTableName());
        tableEntry.setTenantName(tableEntryKey.getTenantName());
        tableEntry.setClusterName(tableEntryKey.getClusterName());
        tableEntry.setDatabaseName(tableEntryKey.getSchemaName());
        tableEntry.setServerMode(tableEntryKey.getServerMode());
        return tableEntry;
    }

    private void truncateTableIfNecessary() {
        if (!this.parameter.isTruncatable() && !this.parameter.isDeleteable()) {
            log.debug("Ignore to clean any tables as --truncate-table or --delete-from-table is not specified");
            return;
        }
        HashSet tableInfoCopies = Sets.newHashSet(this.database.getTableInfoMap().values());
        AtomicLong remain = new AtomicLong(tableInfoCopies.size());
        Iterator iter = tableInfoCopies.iterator();
        while (iter.hasNext()) {
            TableInfo tableInfo = (TableInfo)iter.next();
            if (this.parameter.isTruncateWithDataFile()) {
                if (!tableInfo.isEmptyTable() && tableInfo.isWithDatafiles()) continue;
                log.info("Ignore to clean the empty table or table without data files: {}. Remain: {}", (Object)tableInfo.getSchemaTable(), (Object)remain);
            } else {
                if (!tableInfo.isEmptyTable()) continue;
                log.debug("Ignore to clean the empty table: {}. Remain: {}", (Object)tableInfo.getSchemaTable(), (Object)remain);
            }
            iter.remove();
            remain.decrementAndGet();
        }
        if (remain.get() <= 0L) {
            log.debug("Ignore to clean any tables as they are all empty in the schema");
            return;
        }
        ExecutorTemplate<Boolean> template = new ExecutorTemplate<Boolean>("truncate-table-");
        for (TableInfo tableInfoCopy : tableInfoCopies) {
            template.submit(() -> {
                String ddl = this.parameter.isTruncatable() ? "truncate table " + tableInfoCopy.getSchemaTable() : "delete from " + tableInfoCopy.getSchemaTable();
                try (Connection conn = this.sessionManager.getPooledBizConnection();){
                    try (PreparedStatement ps = conn.prepareStatement(ddl);){
                        Stopwatch elapsed = Stopwatch.createStarted();
                        ps.execute();
                        remain.decrementAndGet();
                        log.info("Exec {} finished. Elapsed: {}. Remain: {}", new Object[]{ddl, elapsed.stop(), remain});
                    }
                    Boolean bl = true;
                    return bl;
                }
                catch (Exception e) {
                    log.error("Exec {} failed. Error: {}", (Object)ddl, (Object)ExceptionUtils.getRootCauseMessage(e));
                    return false;
                }
            });
        }
        List results = template.waitForResult();
        if (results.stream().anyMatch(e -> e == false)) {
            throw new RuntimeException("Truncate some tables failed.");
        }
        int refreshTime = tableInfoCopies.size() > 10 ? 10000 : 5000;
        TimeUtils.sleep(TimeUnit.MILLISECONDS, refreshTime);
        log.info("Wait {} ms for observer to refresh schema finished", (Object)refreshTime);
    }

    protected abstract Record parseFirstRecord(SubFile var1);

    protected void reorganizeSubFiles() throws Exception {
        if (this.parameter.isDirectMode()) {
            this.groupAndSortByTable();
        } else {
            this.shuffleByLeaderLocation();
        }
    }

    protected void groupAndSortByTable() {
        List<SubFile> groupedAndSorted = this.parameter.getSubFiles().stream().collect(Collectors.groupingBy(SubFile::getObjectName)).entrySet().stream().sorted(Comparator.comparingInt(entry -> -((List)entry.getValue()).size())).flatMap(entry -> ((List)entry.getValue()).stream()).collect(Collectors.toList());
        this.parameter.setSubFiles(groupedAndSorted);
    }

    protected void shuffleByLeaderLocation() throws Exception {
        boolean noSys;
        HashSet tableNameCopies = Sets.newHashSet(this.database.getTableNames());
        boolean subsequentV4 = !this.database.getServerMode().isPrevious("4.0.0.0");
        AtomicInteger remain = new AtomicInteger(tableNameCopies.size());
        Iterator iter = tableNameCopies.iterator();
        while (iter.hasNext()) {
            String table2 = (String)iter.next();
            if (this.database.getTableInfo(table2).isWithDatafiles()) continue;
            iter.remove();
            remain.decrementAndGet();
            log.debug("Ignore to shuffle data files for table: \"{}\" without datafiles. Remain: {}", (Object)table2, (Object)remain);
        }
        if (remain.get() <= 0) {
            log.info("Ignore to shuffle data files for any tables as they are all without datafiles");
            return;
        }
        boolean bl = noSys = this.connectionKey.hasNoSysPrivileges() && !subsequentV4;
        if (noSys) {
            tableNameCopies.forEach(table -> {
                Map cfr_ignored_0 = this.tablePartLeaderMap.put((String)table, new HashMap());
            });
        } else {
            Optional<TableEntry> nullableEntry;
            DruidDataSource dataSource;
            ExecutorTemplate<Map> template = new ExecutorTemplate<Map>("query-leader-location-", this.parameter.getThreads());
            if (subsequentV4) {
                dataSource = this.sessionManager.getBusinessDataSource();
                for (String table3 : tableNameCopies) {
                    nullableEntry = this.tableEntryMap.get(table3);
                    template.submit(() -> {
                        HashMap<String, Map> map = new HashMap<String, Map>();
                        if (!nullableEntry.isPresent()) {
                            map.put(table3, new HashMap());
                            return map;
                        }
                        TableEntry entry = (TableEntry)nullableEntry.get();
                        try {
                            try (DruidPooledConnection conn = dataSource.getConnection();){
                                try {
                                    map.put(table3, this.extractor.queryLeaderLocationMap((Connection)conn, (TableEntry)nullableEntry.get(), (List)Lists.newArrayList(entry.getPartIdNameMap().keySet()), true));
                                }
                                catch (Exception e) {
                                    if (!this.noAuthOnQueryLeaderLocation) {
                                        log.warn("No authorized to query system tables/views, partition-based load balance will be disabled. Detail message: {}", (Object)e.getMessage());
                                        this.noAuthOnQueryLeaderLocation = true;
                                    }
                                    map.put(table3, new HashMap());
                                    HashMap<String, Map> hashMap = map;
                                    if (conn != null) {
                                        if (var8_8 != null) {
                                            try {
                                                conn.close();
                                            }
                                            catch (Throwable throwable) {
                                                var8_8.addSuppressed(throwable);
                                            }
                                        } else {
                                            conn.close();
                                        }
                                    }
                                    remain.decrementAndGet();
                                    log.info("Query leader location of table: \"{}\" finished. Remain: {}", (Object)table3, (Object)remain);
                                    return hashMap;
                                }
                                HashMap<String, Map> hashMap = map;
                                return hashMap;
                            }
                            {
                                catch (Throwable throwable) {
                                    throw throwable;
                                }
                            }
                        }
                        finally {
                            remain.decrementAndGet();
                            log.info("Query leader location of table: \"{}\" finished. Remain: {}", (Object)table3, (Object)remain);
                        }
                    });
                }
                this.tablePartLeaderMap.putAll(template.waitForResult().stream().flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
            } else {
                dataSource = this.sessionManager.getSystemDataSource();
                for (String table3 : tableNameCopies) {
                    nullableEntry = this.tableEntryMap.get(table3);
                    template.submit(() -> {
                        if (!nullableEntry.isPresent()) {
                            HashMap map = new HashMap();
                            map.put(table3, new HashMap());
                            return map;
                        }
                        TableEntry entry = (TableEntry)nullableEntry.get();
                        try {
                            try (DruidPooledConnection conn = dataSource.getConnection();){
                                HashMap<String, Map> map = new HashMap<String, Map>();
                                map.put(table3, this.extractor.queryLeaderLocationMap((Connection)conn, entry, (List)Lists.newArrayList(entry.getPartIdNameMap().keySet())));
                                HashMap<String, Map> hashMap = map;
                                return hashMap;
                            }
                            {
                                catch (Throwable throwable) {
                                    throw throwable;
                                }
                            }
                        }
                        finally {
                            remain.decrementAndGet();
                            log.info("Query leader location of table: \"{}\" finished. Remain: {}", (Object)table3, (Object)remain);
                        }
                    });
                }
                this.tablePartLeaderMap.putAll(template.waitForResult().stream().flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
            }
        }
        ArrayList<SubFile> subfiles = new ArrayList<SubFile>();
        List<SubFile> subFiles = this.parameter.getSubFiles();
        AtomicInteger total = new AtomicInteger(subFiles.size());
        ExecutorTemplate template = new ExecutorTemplate("parse-first-record", this.parameter.getThreads());
        HashMap locationMap = new HashMap(16);
        log.info("Waiting to calculate leader for {} subfiles....", (Object)subFiles.size());
        subFiles.stream().filter(Objects::nonNull).forEach(subFile -> template.submit(() -> {
            Record firstRecord = this.parseFirstRecord((SubFile)subFile);
            if (firstRecord == null) {
                log.warn("Ignore the first null record. File: {}", (Object)subFile.getUniquePath());
                subfiles.add((SubFile)subFile);
                return null;
            }
            String table = subFile.getObjectName();
            if (firstRecord.isParsed() && StringUtils.isNotBlank(firstRecord.getOriginContent())) {
                List<Record> recordList = SqlUtils.parseStatement(firstRecord, this.database.getTableInfoMap().get(table));
                firstRecord = recordList.get(0);
            }
            TableEntry tableEntry = this.tableEntryMap.get(table).orElse(null);
            ObPartIdCalculator calculator = this.connectionKey.createPartitionIdCalculator(tableEntry, subsequentV4);
            TableInfo tableInfo = this.database.getTableInfo(table);
            long partId = DBUtils.calculatePartitionId(tableInfo, calculator, firstRecord);
            subFile.setPartitionId(partId);
            String leader = this.tablePartLeaderMap.get(table).get(partId);
            if (StringUtils.isBlank(leader)) {
                if (!this.connectionKey.hasNoSysPrivileges() && partId != Long.MIN_VALUE) {
                    log.debug("The leader is null. table: \"{}\", part: {} ", (Object)table, (Object)partId);
                }
                leader = "0.0.0.0";
                this.tablePartLeaderMap.get(table).putIfAbsent(partId, "0.0.0.0");
            }
            Map map = locationMap;
            synchronized (map) {
                locationMap.putIfAbsent(leader, new ArrayList());
                ((List)locationMap.get(leader)).add(subFile);
            }
            int r = total.decrementAndGet();
            if (!"0.0.0.0".equals(leader)) {
                log.debug("Calculate leader: {} of table \"{}\", part: {}. Remain: {}", new Object[]{leader, table, partId, r});
            } else {
                log.warn("Calculate leader of table \"{}\" failed, treat it as non-partition table. Remain: {}", (Object)table, (Object)r);
            }
            return null;
        }));
        template.waitForResult();
        if (CollectionUtils.isNotEmpty(subfiles)) {
            Collections.shuffle(subfiles);
        }
        locationMap.values().stream().filter(Objects::nonNull).forEach(Collections::sort);
        ArrayList subfilesGroup = new ArrayList(locationMap.values());
        while (!subfilesGroup.isEmpty()) {
            int idx = 0;
            while (idx < subfilesGroup.size()) {
                if (((List)subfilesGroup.get(idx)).size() == 0) {
                    subfilesGroup.remove(idx);
                    continue;
                }
                SubFile subFile2 = (SubFile)((List)subfilesGroup.get(idx)).get(0);
                subfiles.add(subFile2);
                ((List)subfilesGroup.get(idx)).remove(0);
                ++idx;
            }
        }
        this.parameter.setSubFiles(subfiles);
    }

    @Override
    public TaskContext loadSchemaAsync() {
        throw new UnsupportedOperationException("Load schema is unsupported");
    }

    @Override
    public TaskContext loadRecordAsync() throws Exception {
        this.checkState();
        this.reorganizeSubFiles();
        this.startLoadRefresher();
        this.startWriter();
        this.startReader();
        this.startLogReporter();
        this.state = State.RUNNING;
        return this;
    }

    public int getReaderThreads() {
        int threads = this.parameter.getThreads();
        return (int)Math.max(this.readWriteRatio * (double)threads, 1.0);
    }

    public int getWriterThreads() {
        return Math.max(this.parameter.getThreads(), 1);
    }

    protected abstract void startReader() throws Exception;

    protected void startWriter() {
        Set<Object> leaderServers = this.parameter.isDirectMode() ? Sets.newHashSet((Object[])new String[]{"0.0.0.0"}) : this.tablePartLeaderMap.values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toSet());
        int threads = this.getWriterThreads();
        this.meter = this.registerMeter("2. Dequeue Performance Monitor: ");
        int leaderBuffer = this.parameter.getBufferSize() / Math.max(leaderServers.size(), 1);
        int capacity = NumberUtils.computeTheNearestPowerOfTwo(leaderBuffer, 8, 8192);
        this.bufferGroup = new RingBufferGroup(capacity, threads);
        for (String string : leaderServers) {
            AbstractOceanBaseWriter[] writers = this.assembleWriters();
            this.oceanBaseWriters.addAll(Arrays.asList(writers));
            this.bufferGroup.register(string, writers, new ThreadPoolBuilder().setCorePoolSize(threads).setMaximumPoolSize(threads).setQueueSize(this.bufferGroup.getCapacity()).setThreadPrefixName(string.replace(':', '-') + "-writer-thread-").build());
            log.info("Start {} database writer threads finished. [{}]", (Object)threads, (Object)string);
        }
    }

    private AbstractOceanBaseWriter[] assembleWriters() {
        int writerNum = this.getWriterThreads();
        if (this.parameter.isDirectMode()) {
            AbstractOceanBaseWriter[] writers = new DirectPathWriter[writerNum];
            for (int i = 0; i < writerNum; ++i) {
                writers[i] = new DirectPathWriter(this.parameter, this.directLoadConnMap);
                writers[i].setMeter(this.meter);
                writers[i].setBufferGroup(this.bufferGroup);
                writers[i].setLoadCtx(this.loadCtx);
            }
            return writers;
        }
        AbstractOceanBaseWriter[] writers = new JdbcClientWriter[writerNum];
        for (int i = 0; i < writerNum; ++i) {
            writers[i] = new JdbcClientWriter(this.supervisor, this.parameter);
            writers[i].setMeter(this.meter);
            writers[i].setBufferGroup(this.bufferGroup);
            writers[i].setLoadCtx(this.loadCtx);
            ((JdbcClientWriter)writers[i]).setTableLoadStatusMap(this.tableLoadStatusMap);
            ((JdbcClientWriter)writers[i]).setConnectionFuture(CompletableFuture.supplyAsync(() -> {
                try {
                    return this.sessionManager.createNewConnection();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }));
        }
        return writers;
    }

    protected void startLogReporter() {
        if (this.parameter.isDirectMode()) {
            Supplier<Boolean> stopEvent = () -> !this.directLoadConnMap.isEmpty() && this.directLoadConnMap.values().stream().allMatch(t -> t.isClosed() || t.isCommitting());
            Slf4jReporter.forRegistry(this.registry).scheduleOn(this.logReporterExecutor).shutdownExecutorOnStop(true).build().start(5L, TimeUnit.SECONDS, stopEvent);
        } else {
            Slf4jReporter.forRegistry(this.registry).scheduleOn(this.logReporterExecutor).shutdownExecutorOnStop(true).build().start(5L, TimeUnit.SECONDS);
        }
    }

    protected Meter registerMeter(String name) {
        return this.registry.meter(name);
    }

    protected void startLoadRefresher() {
        if (this.parameter.isDirectMode()) {
            return;
        }
        log.info("Waiting to refresh observer load status....");
        Stopwatch stopwatch = Stopwatch.createStarted();
        this.refreshObserverLoadStatus(true);
        log.debug("Refresh observer load status finished. Elapsed: {}", (Object)stopwatch);
        this.loadRefreshExecutor.scheduleAtFixedRate(() -> {
            if (this.supervisor.get()) {
                this.refreshObserverLoadStatus(false);
            }
        }, 0L, 200L, TimeUnit.MILLISECONDS);
    }

    protected void refreshObserverLoadStatus(boolean isStartup) {
        block12: {
            try {
                ServerMode serverMode = this.database.getServerMode();
                boolean subsequentV4 = !serverMode.isPreviousV4();
                Map<Object, Object> serverLoadStatusMap = new HashMap<String, LoadStatus>(3);
                if (this.parameter.hasNoSysPrivileges() && !subsequentV4) {
                    serverLoadStatusMap.put("0.0.0.0", new LoadStatus(false, 0.01));
                    this.noAuthOnMonitorMemTable = true;
                } else {
                    try {
                        serverLoadStatusMap = this.connectionKey.getMetadataProvider().queryObserverLoad(serverMode, this.connectionKey.getTenant());
                    }
                    catch (Exception e) {
                        if (!this.noAuthOnMonitorMemTable) {
                            log.warn("No authorized to query system tables/views, real-time monitor on mem-table will be disabled. Detail message: {}", (Object)e.getMessage());
                            this.noAuthOnMonitorMemTable = true;
                        }
                        serverLoadStatusMap.put("0.0.0.0", new LoadStatus(false, 0.01));
                    }
                }
                AtomicLong remain = new AtomicLong(this.tableLoadStatusMap.size());
                for (Map.Entry<String, Map<Long, LoadStatus>> tentry : this.tableLoadStatusMap.entrySet()) {
                    String table = tentry.getKey();
                    Map<Long, LoadStatus> partitionLoadStatusMap = tentry.getValue();
                    Map<Long, String> partitionLeaderMap = this.tablePartLeaderMap.get(table);
                    if (MapUtils.isEmpty(partitionLeaderMap)) continue;
                    for (Map.Entry<Long, String> pentry : partitionLeaderMap.entrySet()) {
                        LoadStatus expiredLoadStatus;
                        Long partId = pentry.getKey();
                        String leaderServer = pentry.getValue();
                        if (StringUtils.isBlank(leaderServer)) {
                            log.error("Refreshing observer load status failed as leader server is null. Table: \"{}\", Partition: {}", (Object)table, (Object)partId);
                            continue;
                        }
                        LoadStatus refreshedLoadStatus = (LoadStatus)serverLoadStatusMap.get(leaderServer);
                        if (refreshedLoadStatus == null) {
                            refreshedLoadStatus = new LoadStatus(false, 0.01);
                            if (!"0.0.0.0".equals(leaderServer)) {
                                log.warn("Refreshing observer load status failed. Maybe leader server is inactive? Leader Server: [{}], Table: \"{}\", Partition: {}", new Object[]{leaderServer, table, partId});
                            }
                        }
                        if ((expiredLoadStatus = partitionLoadStatusMap.putIfAbsent(partId, refreshedLoadStatus)) == null) continue;
                        expiredLoadStatus.setLeaderServer(leaderServer);
                        expiredLoadStatus.setMerging(refreshedLoadStatus.isMerging());
                        expiredLoadStatus.setMemUsedRatio(refreshedLoadStatus.getMemUsedRatio());
                        expiredLoadStatus.setSlowInsertThreshold(this.parameter.getSlowInsertThreshold());
                        expiredLoadStatus.setPauseInsertThreshold(this.parameter.getPauseInsertThreshold());
                    }
                    if (!isStartup) continue;
                    remain.decrementAndGet();
                    log.debug("Refresh observer load status success. Table: \"{}\". Remain: {}", (Object)table, (Object)remain);
                }
            }
            catch (Exception e) {
                if (new Random().nextInt(1000) <= 800) break block12;
                log.warn("Refresh observer load status failed. Error: {}", (Object)ExceptionUtils.getRootCauseMessage(e));
            }
        }
    }

    private synchronized void buildDirectLoadConnMaps() {
        if (this.directLoadConnMap != null) {
            return;
        }
        this.directLoadConnMap = new ConcurrentHashMap();
        Map<String, List<SubFile>> subFileMap = this.parameter.getSubFiles().stream().collect(Collectors.groupingBy(SubFile::getObjectName));
        ObLoadDupActionType dupAction = this.parameter.isReplaceData() ? ObLoadDupActionType.REPLACE : (this.parameter.getMaxDiscards() < 0 ? ObLoadDupActionType.IGNORE : ObLoadDupActionType.STOP_ON_DUP);
        Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+)\\.(\\d+)\\.(\\d+)");
        Matcher matcher = pattern.matcher(this.connectionKey.getServerMode().getVersion());
        if (matcher.find() && ObGlobal.OB_VERSION == 0L) {
            ObGlobal.OB_VERSION = ObGlobal.calcVersion((int)Integer.parseInt(matcher.group(1)), (short)((short)Integer.parseInt(matcher.group(2))), (byte)((byte)Integer.parseInt(matcher.group(3))), (byte)((byte)Integer.parseInt(matcher.group(4))));
        }
        subFileMap.forEach((t, subFiles) -> {
            try {
                DirectPathConnection conn = new DirectPathConnection.Builder().host(this.parameter.getHost()).port(this.parameter.getRpcPort()).tenant(this.parameter.getTenant()).user(this.parameter.getUser()).password(Optional.ofNullable(this.parameter.getPassword()).orElse("")).cluster(this.parameter.getCluster()).publicCloud(this.parameter.isPubCloud()).schema(this.parameter.getDatabaseName()).table((String)t).parallel(this.parameter.getParallel()).thread(this.parameter.getThreads()).maxErrorCount(this.parameter.getMaxErrors()).duplicateKeyAction(dupAction).directPathConfig(this.parameter.getSessionConfig().getDirectPathLoad()).build();
                this.directLoadConnMap.put((String)t, conn);
            }
            catch (Exception e) {
                log.error("Cannot establish connection to server. Table: {}. Reason: {}", t, (Object)e.getMessage());
                throw new RuntimeException(e);
            }
        });
    }

    @Override
    public ServerStatus getServerStatus() {
        return new ServerStatus(this.tableLoadStatusMap, this.noAuthOnMonitorMemTable);
    }

    @Override
    public boolean isThreadPoolAlive() {
        return this.isAlive(this.readerExecutor) && this.isAlive(this.writerExecutor);
    }

    @Override
    public void shutdown() throws Exception {
        this.supervisor.compareAndSet(true, false);
        if (this.bufferGroup != null) {
            this.bufferGroup.drainAndHalt();
        }
        this.shutdownInternal(false, this.readerExecutor);
        this.shutdownInternal(false, this.writerExecutor);
        this.shutdownInternal(true, this.loadRefreshExecutor);
        this.shutdownInternal(true, this.logReporterExecutor);
        if (this.directLoadConnMap != null) {
            this.taskDetailList.stream().filter(t -> t.getState() == TaskState.FAILURE).map(TaskDetail::getObject).flatMap(name -> this.taskDetailList.stream().filter(t -> name.equals(t.getObject()))).forEach(t -> {
                t.setState(TaskState.FAILURE);
                t.setCount(0L);
            });
            this.directLoadConnMap.values().forEach(c -> {
                log.info("[Timer] Table: {}, Write Elapsed: {}, Commit Elapsed: {}, Total Elapsed: {}", new Object[]{c.getTableName(), c.getWriteElapsed(), c.getCommitElapsed(), c.getTotalElapsed()});
                c.close();
            });
            this.directLoadConnMap.clear();
        }
        this.oceanBaseWriters.forEach(AbstractOceanBaseWriter::shutdown);
        this.state = State.TERMINATE;
    }

    @Override
    public void shutdownNow() throws Exception {
        this.supervisor.compareAndSet(true, false);
        if (this.bufferGroup != null) {
            this.bufferGroup.halt();
        }
        this.shutdownInternal(true, this.readerExecutor);
        this.shutdownInternal(true, this.writerExecutor);
        this.shutdownInternal(true, this.loadRefreshExecutor);
        this.shutdownInternal(true, this.logReporterExecutor);
        this.state = State.TERMINATE;
    }

    @Override
    public void stopLogReporter() {
        super.shutdownInternal(true, this.logReporterExecutor);
    }
}

