/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.loaddump.reader.record;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.lmax.disruptor.RingBuffer;
import com.oceanbase.partition.calculator.ObPartIdCalculator;
import com.oceanbase.partition.calculator.model.TableEntry;
import com.oceanbase.tools.loaddump.common.model.Insertion;
import com.oceanbase.tools.loaddump.common.model.LoadParameter;
import com.oceanbase.tools.loaddump.common.model.MapObject;
import com.oceanbase.tools.loaddump.common.model.SubFile;
import com.oceanbase.tools.loaddump.common.model.TableInfo;
import com.oceanbase.tools.loaddump.common.model.TaskState;
import com.oceanbase.tools.loaddump.context.LoadContext;
import com.oceanbase.tools.loaddump.manager.ControlManager;
import com.oceanbase.tools.loaddump.metrics.Meter;
import com.oceanbase.tools.loaddump.parser.record.AbstractRecordParser;
import com.oceanbase.tools.loaddump.parser.record.Record;
import com.oceanbase.tools.loaddump.reader.AbstractFileReader;
import com.oceanbase.tools.loaddump.resource.Payload;
import com.oceanbase.tools.loaddump.ringbuffer.RingBufferGroup;
import com.oceanbase.tools.loaddump.strategy.AdaptiveBatchStrategy;
import com.oceanbase.tools.loaddump.utils.CollectionUtils;
import com.oceanbase.tools.loaddump.utils.DBUtils;
import com.oceanbase.tools.loaddump.utils.ExceptionUtils;
import com.oceanbase.tools.loaddump.utils.SerializeUtils;
import com.oceanbase.tools.loaddump.utils.SqlUtils;
import com.oceanbase.tools.loaddump.utils.StringUtils;
import java.io.File;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordFileReader
extends AbstractFileReader {
    private static final Logger log = LoggerFactory.getLogger(RecordFileReader.class);
    protected Meter meter;
    protected SubFile subFile;
    protected TableInfo tableInfo;
    protected TableEntry tableEntry;
    protected RingBufferGroup bufferGroup;
    protected ControlManager controlManager;
    protected AdaptiveBatchStrategy adaptiveBatchStrategy;
    protected Map<String, Map<Long, String>> tablePartLeaderMap;
    protected int batchSize;
    protected LoadContext loadCtx;

    public RecordFileReader(LoadParameter parameter) {
        super(parameter);
        this.subFiles = parameter.getSubFiles();
        this.controlManager = parameter.getControlManager();
        this.batchSize = parameter.getBatchSize();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        int totalBatch = 0;
        try {
            boolean isShouldRetry;
            if (this.subFile.isSuccess()) {
                return;
            }
            this.subFile.setTaskState(TaskState.RUNNING);
            if (!this.subFile.isVisited()) {
                this.subFile.setVisited(true);
                this.updateCheckpoint();
            }
            boolean bl = isShouldRetry = this.parameter.isRetry() && this.subFile.isVisited();
            if (isShouldRetry && CollectionUtils.isEmpty(this.tableInfo.getPrimaryCols())) {
                log.error("Ignore to retry insert into table: \"{}\" without primary key....", (Object)this.subFile.getObjectName());
                return;
            }
            totalBatch = this.runInternal(isShouldRetry);
            log.info("File: \"{}\" has been parsed finished", (Object)this.subFile.getUniquePath());
            if (this.supervisor.get()) {
                this.loadCtx.waitForAllBatchesConsumed(this.subFile, totalBatch);
            }
        }
        catch (Exception e) {
            String cause = ExceptionUtils.getRootCauseMessage(e);
            this.subFile.setMessage(cause);
            this.subFile.setTaskState(TaskState.FAILURE);
            log.error("Load data file: \"{}\" failed. Error: {}", (Object)this.subFile.getUniquePath(), (Object)cause);
        }
        finally {
            this.updateCurrentTaskState();
            this.updateCheckpoint();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    protected int runInternal(boolean isShouldRetry) throws Exception {
        int totalBatch = 0;
        String tableName = this.tableInfo.getTable();
        String schemaName = this.tableInfo.getSchema();
        HashMap<Long, Payload> payloadMap = new HashMap<Long, Payload>(16);
        boolean isDirectMode = this.parameter.isDirectMode();
        ObPartIdCalculator calculator = null;
        if (!isDirectMode) {
            calculator = this.connectionKey.createPartitionIdCalculator(this.tableEntry, this.subsequentV4);
        }
        try (AbstractRecordParser currentRecordParser = this.subFile.createRecordParser(this.parameter);){
            Iterator<Record> iter = currentRecordParser.iterator();
            boolean isFirstRecord = true;
            while (this.supervisor.get() && !this.subFile.isFailure() && iter.hasNext()) {
                if (this.loadCtx.shouldFail(this.tableInfo.getTable())) {
                    log.error("Fatal error occurred. Stop loading \"{}\" into table: \"{}\".\"{}\"", new Object[]{this.subFile.getFilePath(), schemaName, tableName});
                    int n = totalBatch;
                    return n;
                }
                Record record = iter.next();
                if (this.subFile.isSkipFooter() && !iter.hasNext()) {
                    String footer = record == null ? "<No Footer>" : (record.isValid() ? record.getValues().toString() : record.getOriginContent());
                    log.debug("Skip footer: \"{}\" finish", (Object)footer);
                    continue;
                }
                this.subFile.addParsedCount(1);
                if (record.isParsed() && StringUtils.isNotBlank(record.getOriginContent())) {
                    SqlUtils.parseStatement(record, this.tableInfo);
                }
                if (isFirstRecord) {
                    this.batchSize = this.adaptiveBatchStrategy.getBatchSize(tableName, this.getByteSize(record, this.fileEncoding), this.tableInfo.getColumnIndexMapping().size(), this.parameter.getBatchSize());
                    isFirstRecord = false;
                    if (log.isDebugEnabled()) {
                        log.info("Batch size is set to {} for loading table \"{}\"", (Object)this.batchSize, (Object)tableName);
                    }
                }
                long partId = isDirectMode ? 0L : DBUtils.calculatePartitionId(this.tableInfo, calculator, record);
                if (!(record = this.doCleaning(schemaName, tableName, record)).isValid() || record.getValues().size() != this.tableInfo.getColumnIndexMapping().size()) {
                    String errRecord = !record.isParsed() && record.getValues() == null ? record.getOriginContent() : MessageFormat.format("{0} VALUES ({1});", this.tableInfo.getInsertPrefix(), record.getValues().stream().map(e -> e == null ? "null" : "'" + e + "'").collect(Collectors.joining(",")));
                    BAD_RECORD_LOGGER.error("{}\nCause: {}\n", (Object)errRecord, (Object)(org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)record.getBadCause()) ? "The number of columns parsed does not match the number of columns in the table" : record.getBadCause()));
                    if (!this.loadCtx.incrementAndIsExceedMaxErrors(tableName)) continue;
                    String errMsg = "Too many bad records were found in file: " + this.subFile.getFilePath() + ". See the bad record in \"ob-loader-dumper.bad\"";
                    log.error(errMsg);
                    this.subFile.setMessage(errMsg);
                    int n = totalBatch;
                    return n;
                }
                Payload payload = payloadMap.computeIfAbsent(partId, v -> new Payload());
                payload.addRecord(record);
                payload.addByteSize(this.getByteSize(record, this.fileEncoding));
                if (!payload.isShouldCommit(this.batchSize)) continue;
                this.enqueue(payload, partId, isDirectMode, isShouldRetry);
                ++totalBatch;
            }
            if (payloadMap.isEmpty()) return totalBatch;
            Iterator iterator = payloadMap.entrySet().iterator();
            while (iterator.hasNext()) {
                Map.Entry entry = iterator.next();
                if (((Payload)entry.getValue()).getRecordCount() == 0) continue;
                this.enqueue((Payload)entry.getValue(), (Long)entry.getKey(), isDirectMode, isShouldRetry);
                ++totalBatch;
            }
            return totalBatch;
        }
        finally {
            payloadMap.clear();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void enqueue(Insertion insertion) throws Exception {
        Preconditions.checkArgument((insertion != null ? 1 : 0) != 0, (Object)"The insertion is null");
        String leaderServer = insertion.getLeaderServer();
        RingBuffer<Insertion> buffer = this.bufferGroup.getBuffer(leaderServer);
        long sequence = buffer.next();
        try {
            Insertion event = (Insertion)buffer.get(sequence);
            event.setRetry(insertion.isRetry());
            event.setSubFile(insertion.getSubFile());
            event.setByteSize(insertion.getByteSize());
            event.setRecordList(insertion.getRecordList());
            event.setPartitionId(insertion.getPartitionId());
            event.setLeaderServer(insertion.getLeaderServer());
            event.setCommit(insertion.isCommit());
            long produced = (long)buffer.getBufferSize() - buffer.remainingCapacity();
            this.meter.mark(leaderServer, insertion.getRecordCount(), insertion.getByteSize(), produced);
        }
        finally {
            buffer.publish(sequence);
        }
    }

    protected void enqueue(Payload payload, long partId, boolean isDirectMode, boolean isRetryMode) throws Exception {
        long payloadSize = payload.getByteSize();
        Insertion insertion = new Insertion(this.subFile, partId, isRetryMode);
        insertion.setRecordList(payload.getRecordList());
        insertion.setByteSize(payloadSize);
        insertion.setLeaderServer(isDirectMode ? "0.0.0.0" : this.getLeaderServer(this.tableInfo.getTable(), partId));
        payload.reset();
        this.loadCtx.batchProduced(insertion);
        this.enqueue(insertion);
    }

    protected Record doCleaning(String schemaName, String table, Record record) throws Exception {
        if (!record.isValid() || this.controlManager == null || !this.controlManager.isControlDefined(schemaName, table)) {
            return record;
        }
        Map<String, MapObject> columnIndexMapping = this.tableInfo.getColumnIndexMapping();
        ArrayList newValues = Lists.newArrayListWithCapacity((int)columnIndexMapping.size());
        try {
            for (Map.Entry<String, MapObject> entry : columnIndexMapping.entrySet()) {
                String columnName = entry.getKey();
                MapObject mapObj = entry.getValue();
                Integer srcOffset = mapObj.getSource();
                if (srcOffset == null && mapObj.getGeneratedDefine() != null) {
                    newValues.add(mapObj.getGeneratedDefine().getValue());
                    continue;
                }
                Objects.requireNonNull(srcOffset, MessageFormat.format("Cannot find a match for column {0} of table {1}. It may caused by invalid definitions in control file", columnName, this.tableInfo.getSchemaTable()));
                newValues.add(this.controlManager.transform(schemaName, table, columnName, record.get(srcOffset)));
            }
            record.replaceAll(newValues);
        }
        catch (Exception e) {
            record.setParsed(false);
            record.setBadCause(ExceptionUtils.getRootCauseMessage(e));
            String values = String.join((CharSequence)",", record.getValues());
            log.error("Clean data failed. Record: [{}]. Reason: {}.", (Object)values, (Object)ExceptionUtils.getRootCauseMessage(e));
        }
        return record;
    }

    protected void updateCurrentTaskState() {
        long failedRecordCnt = this.subFile.getParsedCount().get() - this.subFile.getLoadedCount().get();
        if (StringUtils.isNotBlank(this.subFile.getMessage())) {
            this.subFile.compareAndSetState(TaskState.RUNNING, TaskState.FAILURE);
            String errMsg = MessageFormat.format("Fatal error occurred while loading data from \"{0}\" into table {1}. Reason: {2}", this.subFile.getFilePath(), this.tableInfo.getSchemaTable(), this.subFile.getMessage());
            log.error(errMsg);
            this.subFile.setMessage(errMsg);
        } else if (failedRecordCnt > 0L) {
            String errMsg = MessageFormat.format("Failed to load {0} records from \"{1}\" into table {2}. Check \"ob-loader-dumper.bad\" and \"ob-loader-dumper.discard\" for details", failedRecordCnt, this.subFile.getFilePath(), this.tableInfo.getSchemaTable());
            log.warn(errMsg);
            if (org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)this.subFile.getMessage())) {
                this.subFile.setMessage(errMsg);
            }
            this.subFile.compareAndSetState(TaskState.RUNNING, this.loadCtx.isStrict() ? TaskState.FAILURE : TaskState.SUCCESS);
        } else {
            this.subFile.compareAndSetState(TaskState.RUNNING, TaskState.SUCCESS);
        }
        this.updateTaskDetail(this.subFile);
    }

    private void updateCheckpoint() {
        if (!new File(this.checkpointPath).exists()) {
            return;
        }
        try {
            SerializeUtils.serializeListByKryo(this.subFiles, this.checkpointPath);
        }
        catch (Exception e) {
            log.warn("Unable to serialize the checkpoint file. But it won't affect the data integrity. Reason: {}", (Object)ExceptionUtils.getRootCauseMessage(e));
        }
    }

    protected String getLeaderServer(String table, Long partId) {
        Map<Long, String> partLeaderMap = this.tablePartLeaderMap.get(table);
        Preconditions.checkState((boolean)MapUtils.isNotEmpty(partLeaderMap), (String)"Leader server map is empty. Table: \"%s\"", (Object)table);
        if (this.parameter.hasNoSysPrivileges() || partId == Long.MIN_VALUE || this.parameter.isDirectMode()) {
            return partLeaderMap.computeIfAbsent(partId, v -> "0.0.0.0");
        }
        String leaderServer = partLeaderMap.get(partId);
        Preconditions.checkState((boolean)StringUtils.isNotBlank(leaderServer), (String)"Get leader server is null. Table: \"%s\", Part: %s", (Object)table, (Object)partId);
        return leaderServer;
    }

    protected int getByteSize(Record record, String charset) {
        List<String> values = record.getValues();
        if (CollectionUtils.isEmpty(values)) {
            return 0;
        }
        int total = values.size() * 2 + 4;
        return total + values.stream().mapToInt(value -> {
            try {
                String fe = StringUtils.isBlank(charset) ? "UTF-8" : charset;
                return value == null ? 1 : value.getBytes(fe).length;
            }
            catch (Exception e) {
                return (int)(value == null ? 1.0 : (double)value.length() * 1.5);
            }
        }).sum();
    }

    public void setMeter(Meter meter) {
        this.meter = meter;
    }

    public void setSubFile(SubFile subFile) {
        this.subFile = subFile;
    }

    public void setTableInfo(TableInfo tableInfo) {
        this.tableInfo = tableInfo;
    }

    public void setTableEntry(TableEntry tableEntry) {
        this.tableEntry = tableEntry;
    }

    public void setBufferGroup(RingBufferGroup bufferGroup) {
        this.bufferGroup = bufferGroup;
    }

    public void setControlManager(ControlManager controlManager) {
        this.controlManager = controlManager;
    }

    public void setAdaptiveBatchStrategy(AdaptiveBatchStrategy adaptiveBatchStrategy) {
        this.adaptiveBatchStrategy = adaptiveBatchStrategy;
    }

    public void setTablePartLeaderMap(Map<String, Map<Long, String>> tablePartLeaderMap) {
        this.tablePartLeaderMap = tablePartLeaderMap;
    }

    public void setLoadCtx(LoadContext loadCtx) {
        this.loadCtx = loadCtx;
    }
}

