/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.loaddump.loader.record;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.oceanbase.partition.calculator.ObPartIdCalculator;
import com.oceanbase.partition.calculator.cache.ObTableCache;
import com.oceanbase.partition.calculator.helper.TableEntryExtractor;
import com.oceanbase.partition.calculator.model.TableEntry;
import com.oceanbase.partition.calculator.model.TableEntryKey;
import com.oceanbase.tools.loaddump.base.State;
import com.oceanbase.tools.loaddump.common.enums.LoadStatus;
import com.oceanbase.tools.loaddump.common.enums.ObjectType;
import com.oceanbase.tools.loaddump.common.enums.ServerMode;
import com.oceanbase.tools.loaddump.common.enums.TaskType;
import com.oceanbase.tools.loaddump.common.model.Database;
import com.oceanbase.tools.loaddump.common.model.LoadParameter;
import com.oceanbase.tools.loaddump.common.model.ServerStatus;
import com.oceanbase.tools.loaddump.common.model.SubFile;
import com.oceanbase.tools.loaddump.common.model.TableInfo;
import com.oceanbase.tools.loaddump.common.model.TaskDetail;
import com.oceanbase.tools.loaddump.common.model.TaskState;
import com.oceanbase.tools.loaddump.concurrent.ExecutorTemplate;
import com.oceanbase.tools.loaddump.concurrent.NamedThreadFactory;
import com.oceanbase.tools.loaddump.concurrent.ThreadPoolBuilder;
import com.oceanbase.tools.loaddump.context.TaskContext;
import com.oceanbase.tools.loaddump.loader.AbstractLoader;
import com.oceanbase.tools.loaddump.loader.ILoader;
import com.oceanbase.tools.loaddump.manager.session.SessionOption;
import com.oceanbase.tools.loaddump.manager.session.SessionProperties;
import com.oceanbase.tools.loaddump.metrics.Meter;
import com.oceanbase.tools.loaddump.metrics.MetricRegistry;
import com.oceanbase.tools.loaddump.metrics.Slf4jReporter;
import com.oceanbase.tools.loaddump.parser.record.Record;
import com.oceanbase.tools.loaddump.parser.record.csv.CsvFormat;
import com.oceanbase.tools.loaddump.ringbuffer.RingBufferGroup;
import com.oceanbase.tools.loaddump.utils.CollectionUtils;
import com.oceanbase.tools.loaddump.utils.DBUtils;
import com.oceanbase.tools.loaddump.utils.ExceptionUtils;
import com.oceanbase.tools.loaddump.utils.SerializeUtils;
import com.oceanbase.tools.loaddump.utils.SqlUtils;
import com.oceanbase.tools.loaddump.utils.StringUtils;
import com.oceanbase.tools.loaddump.utils.TimeUtils;
import com.oceanbase.tools.loaddump.writer.oceanbase.JdbcClientWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractRecordFileLoader
extends AbstractLoader {
    private static final Logger log = LoggerFactory.getLogger(AbstractRecordFileLoader.class);
    private static final int SCHEMA_REFRESH_TIME = 10000;
    private final MetricRegistry registry = new MetricRegistry();
    protected long blockSize;
    protected Database database;
    protected double readWriteRatio;
    protected String checkpointPath;
    protected RingBufferGroup bufferGroup;
    protected ExecutorService readerExecutor;
    protected Map<String, CsvFormat> csvFormatMap;
    protected List<JdbcClientWriter> temporaryWriters;
    protected final Map<String, Optional<TableEntry>> tableEntryMap;
    protected final ScheduledExecutorService logReporterExecutor;
    protected final ScheduledExecutorService loadRefreshExecutor;
    protected final Map<String, Map<Long, String>> tablePartLeaderMap;
    protected final Map<String, Map<Long, LoadStatus>> tableLoadStatusMap;
    protected TableEntryExtractor extractor = new TableEntryExtractor();

    public AbstractRecordFileLoader(LoadParameter parameter) {
        super(parameter);
        this.csvFormatMap = parameter.initCsvFormatMap();
        this.database = parameter.getDatabase();
        this.blockSize = parameter.getBlockSize();
        this.readWriteRatio = parameter.getReadWriteRatio();
        this.checkpointPath = parameter.getCheckpointPath();
        this.temporaryWriters = new ArrayList<JdbcClientWriter>(16);
        this.tableEntryMap = new ConcurrentHashMap<String, Optional<TableEntry>>(16);
        this.tablePartLeaderMap = new ConcurrentHashMap<String, Map<Long, String>>(16);
        this.tableLoadStatusMap = new ConcurrentHashMap<String, Map<Long, LoadStatus>>(16);
        this.logReporterExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("log-reporter-"), new ThreadPoolExecutor.DiscardOldestPolicy());
        this.loadRefreshExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("load-refresher-"), new ThreadPoolExecutor.DiscardOldestPolicy());
    }

    @Override
    public ILoader prepare() throws Exception {
        ExecutorTemplate.setPoolSize(this.parameter.getThreads());
        if (this.parameter.isRetry()) {
            File checkpoint = new File(this.checkpointPath);
            if (!checkpoint.exists() || checkpoint.isDirectory()) {
                throw new FileNotFoundException("The file: \"" + this.checkpointPath + "\" is missing");
            }
            List<SubFile> persistFiles = SerializeUtils.deserializeListByKryo(this.checkpointPath);
            Preconditions.checkState((boolean)ObjectUtils.isNotEmpty(persistFiles), (Object)"No subfiles are recovered from the checkpoint file");
            ArrayList<SubFile> recoveryFiles = new ArrayList<SubFile>();
            for (SubFile persistFile : persistFiles) {
                if (TaskState.SUCCESS != persistFile.getTaskState()) {
                    persistFile.getParsedCount().set(0L);
                }
                recoveryFiles.add(persistFile);
                this.globalContext.initContext(persistFile.getObjectName());
            }
            Preconditions.checkState((boolean)ObjectUtils.isNotEmpty(recoveryFiles), (Object)"Maybe all the subfiles have already been load finished");
            this.parameter.setSubFiles(recoveryFiles);
            log.info("Recovery {} non-success subfiles finished", (Object)recoveryFiles.size());
        } else {
            this.parameter.setSubFiles(this.generateSubFiles());
            this.truncateTableIfNecessary();
            Preconditions.checkArgument((boolean)ObjectUtils.isNotEmpty(this.parameter.getSubFiles()), (String)"No subfiles are generated from path: %s", (Object)this.filePath);
            for (SubFile subFile : this.parameter.getSubFiles()) {
                subFile.setTaskState(TaskState.INITIAL);
                subFile.setObjectType(ObjectType.TABLE.getName());
                subFile.setSchemaName(this.database.getSchemaName());
                subFile.setTaskDetail(new TaskDetail());
                subFile.getTaskDetail().setState(TaskState.INITIAL);
                subFile.getTaskDetail().setTaskType(TaskType.LOAD_RECORD);
                subFile.getTaskDetail().setObject(subFile.getObjectName());
                subFile.getTaskDetail().setType(ObjectType.TABLE.getName());
                this.globalContext.initContext(subFile.getObjectName());
            }
        }
        this.preloadTableEntryCollection();
        SerializeUtils.serializeListByKryoSafely(this.parameter.getSubFiles(), this.checkpointPath);
        this.state = State.PREPARE;
        return this;
    }

    private void preloadTableEntryCollection() throws Exception {
        HashSet tableInfoCopies = Sets.newHashSet(this.database.getTableInfoMap().values());
        AtomicLong remain = new AtomicLong(tableInfoCopies.size());
        Iterator iter = tableInfoCopies.iterator();
        while (iter.hasNext()) {
            TableInfo tableInfo = (TableInfo)iter.next();
            if (tableInfo.isWithDatafiles()) continue;
            iter.remove();
            remain.decrementAndGet();
            log.info("Ignore to query table entry for table: \"{}\" without datafiles. Remain: {}", (Object)tableInfo.getTable(), (Object)remain);
        }
        if (remain.get() <= 0L) {
            log.info("Ignore to query table entry for any tables as they are all without datafiles");
            return;
        }
        ExecutorTemplate<Void> template = new ExecutorTemplate<Void>("preload-table-entry-");
        ServerMode serverMode = this.database.getServerMode();
        boolean isSubsequentV4 = serverMode.isSubsequent("4.0.0.0");
        boolean isPubCloud = this.connectionKey.hasNoSysPrivileges();
        for (TableInfo tableInfoCopy : tableInfoCopies) {
            template.submit(() -> {
                String table = tableInfoCopy.getTable();
                TableEntryKey tableEntryKey = this.connectionKey.createTableEntryKey(table);
                TableEntry tableEntry = isPubCloud && !isSubsequentV4 ? this.getTableEntryForPubCloud(tableEntryKey) : (isSubsequentV4 ? this.extractor.queryTableEntry(this.sessionManager.getBusinessDataSource(), tableEntryKey, true) : this.extractor.queryTableEntry(this.sessionManager.getSystemDataSource(), tableEntryKey, false));
                ObTableCache.getInstance().addTableEntry(tableEntry);
                this.tableEntryMap.put(table, Optional.ofNullable(tableEntry));
                this.tableLoadStatusMap.put(table, new ConcurrentHashMap(16));
                this.fillingPrimaryKey(tableInfoCopy);
                if (this.parameter.isReplaceData()) {
                    this.fillingUniqueKey(tableInfoCopy);
                }
                long r = remain.decrementAndGet();
                log.info("Query table entry and primary key for table: \"{}\" finished. Remain: {}", (Object)table, (Object)r);
                return null;
            });
        }
        template.waitForResult();
    }

    private TableEntry getTableEntryForPubCloud(TableEntryKey tableEntryKey) {
        TableEntry tableEntry = new TableEntry();
        tableEntry.setPartitionNum(Long.valueOf(1L));
        tableEntry.setTableName(tableEntryKey.getTableName());
        tableEntry.setTenantName(tableEntryKey.getTenantName());
        tableEntry.setClusterName(tableEntryKey.getClusterName());
        tableEntry.setDatabaseName(tableEntryKey.getSchemaName());
        tableEntry.setServerMode(tableEntryKey.getServerMode());
        return tableEntry;
    }

    private void truncateTableIfNecessary() {
        if (!this.parameter.isTruncatable() && !this.parameter.isDeleteable()) {
            log.info("Ignore to clean any tables as --truncate-table or --delete-from-table is not specified");
            return;
        }
        HashSet tableInfoCopies = Sets.newHashSet(this.database.getTableInfoMap().values());
        AtomicLong remain = new AtomicLong(tableInfoCopies.size());
        Iterator iter = tableInfoCopies.iterator();
        while (iter.hasNext()) {
            TableInfo tableInfo = (TableInfo)iter.next();
            if (this.parameter.isTruncateWithDataFile()) {
                if (!tableInfo.isEmptyTable() && tableInfo.isWithDatafiles()) continue;
                log.info("Ignore to clean the empty table or table without data files: {}. Remain: {}", (Object)tableInfo.getSchemaTable(), (Object)remain);
            } else {
                if (!tableInfo.isEmptyTable()) continue;
                log.info("Ignore to clean the empty table: {}. Remain: {}", (Object)tableInfo.getSchemaTable(), (Object)remain);
            }
            iter.remove();
            remain.decrementAndGet();
        }
        if (remain.get() <= 0L) {
            log.info("Ignore to clean any tables as they are all empty in the schema");
            return;
        }
        ExecutorTemplate template = new ExecutorTemplate("truncate-table-");
        for (TableInfo tableInfoCopy : tableInfoCopies) {
            template.submit(() -> {
                String ddl = this.parameter.isTruncatable() ? "truncate table " + tableInfoCopy.getSchemaTable() : "delete from " + tableInfoCopy.getSchemaTable();
                long timeout = SessionProperties.hourToMicros("ob.timeout.for.exec.dml");
                try (Connection conn = this.sessionManager.createNewConnection(true, SessionOption.withQueryTimeout(timeout));
                     PreparedStatement ps = conn.prepareStatement(ddl);){
                    Stopwatch elapsed = Stopwatch.createStarted();
                    ps.execute();
                    remain.decrementAndGet();
                    log.info("Exec {} finished. Elapsed: {}. Remain: {}", new Object[]{ddl, elapsed.stop(), remain});
                }
                catch (Exception e) {
                    log.error("Exec {} failed. Error: {}", (Object)ddl, (Object)ExceptionUtils.getRootCauseMessage(e));
                }
            });
        }
        template.waitForResult();
        int refreshTime = tableInfoCopies.size() > 10 ? 10000 : 5000;
        TimeUtils.sleep(TimeUnit.MILLISECONDS, refreshTime);
        log.info("Wait {} ms for observer to refresh schema finished", (Object)refreshTime);
    }

    private void fillingPrimaryKey(TableInfo tableInfo) {
        try {
            String schema = tableInfo.getSchema();
            String table = tableInfo.getTable();
            ServerMode serverMode = this.database.getServerMode();
            tableInfo.setPrimaryKeyMap(this.metadataProvider.queryPrimaryKeyMap(serverMode, schema, table));
            tableInfo.preparePrimary();
        }
        catch (Exception e) {
            log.error("Get primary key for {} failed. Error: {}", (Object)tableInfo.getSchemaTable(), (Object)ExceptionUtils.getRootCauseMessage(e));
        }
    }

    private void fillingUniqueKey(TableInfo tableInfo) {
        try {
            String schema = tableInfo.getSchema();
            String table = tableInfo.getTable();
            ServerMode serverMode = this.database.getServerMode();
            tableInfo.setUniqueKeyMap(this.metadataProvider.queryUniqueKeyMap(serverMode, schema, table, true));
        }
        catch (Exception e) {
            log.error("Get unique key for {} failed. Error: {}", (Object)tableInfo.getSchemaTable(), (Object)ExceptionUtils.getRootCauseMessage(e));
        }
    }

    protected abstract Record parseFirstRecord(SubFile var1);

    protected void shuffleByLeaderLocation() throws Exception {
        HashSet tableNameCopies = Sets.newHashSet(this.database.getTableNames());
        boolean subsequentV4 = !this.database.getServerMode().isPrevious("4.0.0.0");
        AtomicInteger remain = new AtomicInteger(tableNameCopies.size());
        Iterator iter = tableNameCopies.iterator();
        while (iter.hasNext()) {
            String table = (String)iter.next();
            if (this.database.getTableInfo(table).isWithDatafiles()) continue;
            iter.remove();
            remain.decrementAndGet();
            log.info("Ignore to shuffle data files for table: \"{}\" without datafiles. Remain: {}", (Object)table, (Object)remain);
        }
        if (remain.get() <= 0) {
            log.info("Ignore to shuffle data files for any tables as they are all without datafiles");
            return;
        }
        if (this.connectionKey.hasNoSysPrivileges() && !subsequentV4) {
            for (String table : tableNameCopies) {
                this.tablePartLeaderMap.put(table, new HashMap());
            }
        } else {
            Optional<TableEntry> nullableEntry;
            DruidDataSource dataSource;
            ExecutorTemplate<Map> template = new ExecutorTemplate<Map>("query-leader-location-");
            if (subsequentV4) {
                dataSource = this.sessionManager.getBusinessDataSource();
                for (String table : tableNameCopies) {
                    this.globalContext.initContext(table);
                    nullableEntry = this.tableEntryMap.get(table);
                    template.submit(() -> {
                        if (!nullableEntry.isPresent()) {
                            HashMap map = new HashMap();
                            map.put(table, new HashMap());
                            return map;
                        }
                        TableEntry entry = (TableEntry)nullableEntry.get();
                        try {
                            try (DruidPooledConnection conn = dataSource.getConnection();){
                                HashMap<String, Map> map = new HashMap<String, Map>();
                                map.put(table, this.extractor.queryLeaderLocationMap((Connection)conn, (TableEntry)nullableEntry.get(), (List)Lists.newArrayList(entry.getPartIdNameMap().keySet()), subsequentV4));
                                HashMap<String, Map> hashMap = map;
                                return hashMap;
                            }
                            {
                                catch (Throwable throwable) {
                                    throw throwable;
                                }
                            }
                        }
                        finally {
                            remain.decrementAndGet();
                            log.info("Query the leader location of \"{}\" finished. Remain: {}", (Object)table, (Object)remain);
                        }
                    });
                }
                this.tablePartLeaderMap.putAll(template.waitForResult().stream().flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
            } else {
                dataSource = this.sessionManager.getSystemDataSource();
                for (String table : tableNameCopies) {
                    this.globalContext.initContext(table);
                    nullableEntry = this.tableEntryMap.get(table);
                    template.submit(() -> {
                        if (!nullableEntry.isPresent()) {
                            HashMap map = new HashMap();
                            map.put(table, new HashMap());
                            return map;
                        }
                        TableEntry entry = (TableEntry)nullableEntry.get();
                        try {
                            try (DruidPooledConnection conn = dataSource.getConnection();){
                                HashMap<String, Map> map = new HashMap<String, Map>();
                                map.put(table, this.extractor.queryLeaderLocationMap((Connection)conn, entry, (List)Lists.newArrayList(entry.getPartIdNameMap().keySet())));
                                HashMap<String, Map> hashMap = map;
                                return hashMap;
                            }
                            {
                                catch (Throwable throwable) {
                                    throw throwable;
                                }
                            }
                        }
                        finally {
                            remain.decrementAndGet();
                            log.info("Query the leader location of \"{}\" finished. Remain: {}", (Object)table, (Object)remain);
                        }
                    });
                }
                this.tablePartLeaderMap.putAll(template.waitForResult().stream().flatMap(e -> e.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
            }
        }
        ArrayList<SubFile> subfiles = new ArrayList<SubFile>();
        List<SubFile> subFiles = this.parameter.getSubFiles();
        AtomicInteger total = new AtomicInteger(subFiles.size());
        ExecutorTemplate template = new ExecutorTemplate("parse-first-record", Runtime.getRuntime().availableProcessors());
        HashMap locationMap = new HashMap(16);
        subFiles.stream().filter(Objects::nonNull).forEach(subFile -> template.submit(() -> {
            Record firstRecord = this.parseFirstRecord((SubFile)subFile);
            if (firstRecord == null) {
                log.warn("Ignore the first null record. File: {}", (Object)subFile.getUniquePath());
                subfiles.add((SubFile)subFile);
                return null;
            }
            String table = subFile.getObjectName();
            if (firstRecord.isParsed() && StringUtils.isNotBlank(firstRecord.getOriginContent())) {
                SqlUtils.parseStatement(firstRecord, this.database.getTableInfoMap().get(table));
            }
            TableEntry tableEntry = this.tableEntryMap.get(table).orElse(null);
            ObPartIdCalculator calculator = this.connectionKey.createPartitionIdCalculator(tableEntry, subsequentV4);
            TableInfo tableInfo = this.database.getTableInfo(table);
            long partId = DBUtils.calculatePartitionId(tableInfo, calculator, firstRecord);
            subFile.setPartitionId(partId);
            String leader = this.tablePartLeaderMap.get(table).get(partId);
            if (StringUtils.isBlank(leader)) {
                if (!this.connectionKey.hasNoSysPrivileges() && partId != Long.MIN_VALUE) {
                    log.debug("The leader is null. table: \"{}\", part: {} ", (Object)table, (Object)partId);
                }
                leader = "0.0.0.0";
                this.tablePartLeaderMap.get(table).putIfAbsent(partId, "0.0.0.0");
            }
            Map map = locationMap;
            synchronized (map) {
                locationMap.putIfAbsent(leader, new ArrayList());
                ((List)locationMap.get(leader)).add(subFile);
            }
            int r = total.decrementAndGet();
            if (!"0.0.0.0".equals(leader)) {
                log.info("Calculate leader: {} of table \"{}\", part: {}. Remain: {}", new Object[]{leader, table, partId, r});
            } else {
                log.warn("Calculate leader of table \"{}\" failed, treat it as non-partition table. Remain: {}", (Object)table, (Object)r);
            }
            return null;
        }));
        template.waitForResult();
        if (CollectionUtils.isNotEmpty(subfiles)) {
            Collections.shuffle(subfiles);
        }
        locationMap.values().stream().filter(Objects::nonNull).forEach(Collections::sort);
        ArrayList subfilesGroup = new ArrayList(locationMap.values());
        while (!subfilesGroup.isEmpty()) {
            int idx = 0;
            while (idx < subfilesGroup.size()) {
                if (((List)subfilesGroup.get(idx)).size() == 0) {
                    subfilesGroup.remove(idx);
                    continue;
                }
                SubFile subFile2 = (SubFile)((List)subfilesGroup.get(idx)).get(0);
                subfiles.add(subFile2);
                ((List)subfilesGroup.get(idx)).remove(0);
                ++idx;
            }
        }
        this.parameter.setSubFiles(subfiles);
    }

    @Override
    public TaskContext loadSchemaAsync() {
        throw new UnsupportedOperationException("Load schema is unsupported");
    }

    @Override
    public TaskContext loadRecordAsync() throws Exception {
        this.checkState();
        this.shuffleByLeaderLocation();
        this.startLoadRefresher();
        this.startWriter();
        this.startReader();
        this.startLogReporter();
        this.state = State.RUNNING;
        return this;
    }

    public int getReaderThreads() {
        int threads = this.parameter.getThreads();
        return (int)Math.max(this.readWriteRatio * (double)threads, 1.0);
    }

    public int getWriterThreads() {
        return Math.max(this.parameter.getThreads(), 1);
    }

    protected abstract void startReader() throws Exception;

    protected void startWriter() {
        Set leaderServers = this.tablePartLeaderMap.values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toSet());
        boolean supportBatch = !this.database.isLogicalDatabase();
        int threads = this.getWriterThreads();
        int bufferSize = this.parameter.getBufferSize();
        int totalBatchSize = Math.max(leaderServers.size(), 1) * this.parameter.getBatchSize();
        this.meter = this.registerMeter("2. Dequeue Performance Monitor: ");
        this.bufferGroup = new RingBufferGroup(bufferSize / totalBatchSize, threads);
        for (String leaderServer : leaderServers) {
            JdbcClientWriter[] writers = new JdbcClientWriter[threads];
            for (int i = 0; i < threads; ++i) {
                writers[i] = new JdbcClientWriter(this.supervisor, this.parameter);
                writers[i].setMeter(this.meter);
                writers[i].setBufferGroup(this.bufferGroup);
                writers[i].setGlobalContext(this.globalContext);
                writers[i].setTableLoadStatusMap(this.tableLoadStatusMap);
                SessionOption sessionOpts = writers[i].buildSessionOption();
                writers[i].setConnectionFuture(CompletableFuture.supplyAsync(() -> this.sessionManager.createNewConnection(true, supportBatch, sessionOpts)));
                this.temporaryWriters.add(writers[i]);
            }
            this.bufferGroup.register(leaderServer, writers, new ThreadPoolBuilder().setCorePoolSize(threads).setMaximumPoolSize(threads).setQueueSize(this.bufferGroup.getCapacity()).setThreadPrefixName(leaderServer.replace(':', '-') + "-writer-thread-").build());
            log.info("Start {} database writer threads finished. [{}]", (Object)threads, (Object)leaderServer);
        }
    }

    protected void startLogReporter() {
        Slf4jReporter.forRegistry(this.registry).scheduleOn(this.logReporterExecutor).shutdownExecutorOnStop(true).build().start(5L, TimeUnit.SECONDS);
    }

    protected Meter registerMeter(String name) {
        return this.registry.meter(name);
    }

    protected void startLoadRefresher() {
        log.info("Waiting to refresh observer load status ......");
        Stopwatch stopwatch = Stopwatch.createStarted();
        this.refreshObserverLoadStatus(true);
        log.info("Refresh observer load status finished. Elapsed: {}", (Object)stopwatch);
        this.loadRefreshExecutor.scheduleAtFixedRate(() -> {
            if (this.supervisor.get()) {
                this.refreshObserverLoadStatus(false);
            }
        }, 0L, 50L, TimeUnit.MILLISECONDS);
    }

    protected void refreshObserverLoadStatus(boolean isStartup) {
        block9: {
            try {
                ServerMode serverMode = this.database.getServerMode();
                boolean subsequentV4 = !serverMode.isPreviousV4();
                Map<Object, Object> serverLoadStatusMap = new HashMap<String, LoadStatus>(3);
                if (this.parameter.hasNoSysPrivileges() && !subsequentV4) {
                    serverLoadStatusMap.put("0.0.0.0", new LoadStatus(false, RandomUtils.nextDouble((double)0.3, (double)0.95)));
                } else {
                    serverLoadStatusMap = this.connectionKey.getMetadataProvider().queryObserverLoad(serverMode, this.connectionKey.getTenant());
                }
                AtomicLong remain = new AtomicLong(this.tableLoadStatusMap.size());
                for (Map.Entry<String, Map<Long, LoadStatus>> tentry : this.tableLoadStatusMap.entrySet()) {
                    String table = tentry.getKey();
                    Map<Long, LoadStatus> partitionLoadStatusMap = tentry.getValue();
                    Map<Long, String> partitionLeaderMap = this.tablePartLeaderMap.get(table);
                    if (MapUtils.isEmpty(partitionLeaderMap)) continue;
                    for (Map.Entry<Long, String> pentry : partitionLeaderMap.entrySet()) {
                        LoadStatus expiredLoadStatus;
                        Long partId = pentry.getKey();
                        String leaderServer = pentry.getValue();
                        if (StringUtils.isBlank(leaderServer)) {
                            log.error("Refreshing observer load status failed as leader server is null. Table: \"{}\", Partition: {}", (Object)table, (Object)partId);
                            continue;
                        }
                        LoadStatus refreshedLoadStatus = (LoadStatus)serverLoadStatusMap.get(leaderServer);
                        if (refreshedLoadStatus == null) {
                            refreshedLoadStatus = new LoadStatus(false, RandomUtils.nextDouble((double)0.3, (double)0.95));
                            if (!"0.0.0.0".equals(leaderServer)) {
                                log.warn("Refreshing observer load status failed. Maybe leader server is inactive? Leader Server: [{}], Table: \"{}\", Partition: {}", new Object[]{leaderServer, table, partId});
                            }
                        }
                        if ((expiredLoadStatus = partitionLoadStatusMap.putIfAbsent(partId, refreshedLoadStatus)) == null) continue;
                        expiredLoadStatus.setLeaderServer(leaderServer);
                        expiredLoadStatus.setMerging(refreshedLoadStatus.isMerging());
                        expiredLoadStatus.setMemUsedRatio(refreshedLoadStatus.getMemUsedRatio());
                        expiredLoadStatus.setSlowInsertThreshold(this.parameter.getSlowInsertThreshold());
                        expiredLoadStatus.setPauseInsertThreshold(this.parameter.getPauseInsertThreshold());
                    }
                    if (!isStartup) continue;
                    remain.decrementAndGet();
                    log.info("Refresh observer load status success. Table: \"{}\". Remain: {}", (Object)table, (Object)remain);
                }
            }
            catch (Exception e) {
                if (new Random().nextInt(1000) <= 800) break block9;
                log.warn("Refresh observer load status failed. Error: {}", (Object)ExceptionUtils.getRootCauseMessage(e));
            }
        }
    }

    @Override
    public ServerStatus getServerStatus() {
        return new ServerStatus(this.tableLoadStatusMap);
    }

    @Override
    public boolean isThreadPoolAlive() {
        return this.isAlive(this.readerExecutor) && this.isAlive(this.writerExecutor);
    }

    @Override
    public void shutdown() throws Exception {
        this.supervisor.compareAndSet(true, false);
        if (this.bufferGroup != null) {
            this.bufferGroup.drainAndHalt();
        }
        this.shutdownInternal(false, this.readerExecutor);
        this.shutdownInternal(false, this.writerExecutor);
        this.shutdownInternal(true, this.loadRefreshExecutor);
        this.shutdownInternal(true, this.logReporterExecutor);
        this.temporaryWriters.forEach(JdbcClientWriter::expireCache);
        this.sessionManager.close();
        this.state = State.TERMINATE;
    }

    @Override
    public void shutdownNow() throws Exception {
        this.supervisor.compareAndSet(true, false);
        if (this.bufferGroup != null) {
            this.bufferGroup.halt();
        }
        this.shutdownInternal(true, this.readerExecutor);
        this.shutdownInternal(true, this.writerExecutor);
        this.shutdownInternal(true, this.loadRefreshExecutor);
        this.shutdownInternal(true, this.logReporterExecutor);
        this.temporaryWriters.forEach(JdbcClientWriter::expireCache);
        this.sessionManager.close();
        this.state = State.TERMINATE;
    }

    @Override
    public void stopLogReporter() {
        super.shutdownInternal(true, this.logReporterExecutor);
    }
}

