/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.loaddump.reader.record;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.lmax.disruptor.RingBuffer;
import com.oceanbase.partition.calculator.ObPartIdCalculator;
import com.oceanbase.partition.calculator.model.TableEntry;
import com.oceanbase.tools.loaddump.common.model.Insertion;
import com.oceanbase.tools.loaddump.common.model.LoadParameter;
import com.oceanbase.tools.loaddump.common.model.MapObject;
import com.oceanbase.tools.loaddump.common.model.SubFile;
import com.oceanbase.tools.loaddump.common.model.TableInfo;
import com.oceanbase.tools.loaddump.common.model.TaskState;
import com.oceanbase.tools.loaddump.manager.ControlManager;
import com.oceanbase.tools.loaddump.metrics.Meter;
import com.oceanbase.tools.loaddump.parser.record.AbstractRecordParser;
import com.oceanbase.tools.loaddump.parser.record.Record;
import com.oceanbase.tools.loaddump.reader.AbstractFileReader;
import com.oceanbase.tools.loaddump.resource.Payload;
import com.oceanbase.tools.loaddump.ringbuffer.RingBufferGroup;
import com.oceanbase.tools.loaddump.strategy.AdaptiveBatchStrategy;
import com.oceanbase.tools.loaddump.utils.CollectionUtils;
import com.oceanbase.tools.loaddump.utils.DBUtils;
import com.oceanbase.tools.loaddump.utils.ExceptionUtils;
import com.oceanbase.tools.loaddump.utils.SerializeUtils;
import com.oceanbase.tools.loaddump.utils.SqlUtils;
import com.oceanbase.tools.loaddump.utils.StringUtils;
import com.oceanbase.tools.loaddump.utils.TimeUtils;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordFileReader
extends AbstractFileReader {
    private static final Logger log = LoggerFactory.getLogger(RecordFileReader.class);
    protected Meter meter;
    protected SubFile subFile;
    protected TableInfo tableInfo;
    protected TableEntry tableEntry;
    protected RingBufferGroup bufferGroup;
    protected ControlManager controlManager;
    protected AdaptiveBatchStrategy adaptiveBatchStrategy;
    protected Map<String, Map<Long, String>> tablePartLeaderMap;

    public RecordFileReader(LoadParameter parameter) {
        super(parameter);
        this.subFiles = parameter.getSubFiles();
        this.controlManager = parameter.getControlManager();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        int totalBatch = 0;
        try {
            boolean isShouldRetry;
            if (this.subFile.isSuccess()) {
                return;
            }
            this.subFile.setTaskState(TaskState.RUNNING);
            if (!this.subFile.isVisited()) {
                this.subFile.setVisited(true);
                SerializeUtils.serializeListByKryoSafely(this.subFiles, this.checkpointPath);
            }
            boolean bl = isShouldRetry = this.parameter.isRetry() && this.subFile.isVisited();
            if (isShouldRetry && MapUtils.isEmpty(this.tableInfo.getPrimaryKeyMap())) {
                log.error("Ignore to retry insert into table: \"{}\" without primary key....", (Object)this.subFile.getObjectName());
                return;
            }
            totalBatch = this.runInternal(isShouldRetry);
        }
        catch (Exception e) {
            String cause = ExceptionUtils.getRootCauseMessage(e);
            this.subFile.setMessage(cause);
            this.subFile.setTaskState(TaskState.FAILURE);
            this.globalContext.incrementErrorCount(this.subFile.getObjectName());
            log.error("Load data file: \"{}\" failed. Error: {}", (Object)this.subFile.getUniquePath(), (Object)cause);
        }
        finally {
            log.info("File: \"{}\" has been parsed finished", (Object)this.subFile.getUniquePath());
            this.waitConsumerDone(totalBatch);
            this.updateCurrentTaskState();
            SerializeUtils.serializeListByKryoSafely(this.subFiles, this.checkpointPath);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected int runInternal(boolean isShouldRetry) throws Exception {
        int totalBatch = 0;
        String tableName = this.tableInfo.getTable();
        String schemaName = this.tableInfo.getSchema();
        HashMap<Long, Payload> payloadMap = new HashMap<Long, Payload>(16);
        ObPartIdCalculator calculator = this.connectionKey.createPartitionIdCalculator(this.tableEntry, this.subsequentV4);
        try (AbstractRecordParser currentRecordParser = this.subFile.createRecordParser(this.parameter);){
            Insertion insertion;
            Payload payload;
            Iterator<Record> iter = currentRecordParser.iterator();
            while (this.supervisor.get() && !this.subFile.isFailure() && iter.hasNext()) {
                if (this.globalContext.shouldFail(this.tableInfo.getTable())) {
                    log.error("Too many errors. Table: {}.{}", (Object)schemaName, (Object)tableName);
                    int n = totalBatch;
                    return n;
                }
                Record record = iter.next();
                if (this.subFile.isSkipFooter() && !iter.hasNext()) {
                    String footer = record == null ? "<No Footer>" : (record.isValid() ? record.getValues().toString() : record.getOriginContent());
                    log.debug("Skip footer: \"{}\" finish", (Object)footer);
                    continue;
                }
                this.subFile.addParsedCount(1);
                if (record.isParsed() && StringUtils.isNotBlank(record.getOriginContent())) {
                    SqlUtils.parseStatement(record, this.tableInfo);
                }
                long partId = DBUtils.calculatePartitionId(this.tableInfo, calculator, record);
                if (!(record = this.doCleaning(schemaName, tableName, record)).isValid() || record.getValues().size() != this.tableInfo.getColumnIndexMapping().size()) {
                    BAD_RECORD_LOGGER.error("{} VALUES ({});\nCause: {}\n", new Object[]{this.tableInfo.getInsertPrefix(), record.getValues().stream().map(e -> e == null ? "null" : "'" + e + "'").collect(Collectors.joining(",")), record.getBadCause()});
                    if (!this.globalContext.incrementAndIsExceedMaxErrors(tableName)) continue;
                    this.subFile.setMessage("Too many bad records were found in file: " + this.subFile.getFilePath() + ". Check \"ob-loader-dumper.bad\" for details");
                    break;
                }
                payload = payloadMap.computeIfAbsent(partId, v -> new Payload(this.parameter.getBatchSize()));
                payload.addRecord(record);
                payload.addByteSize(this.getByteSize(record, this.fileEncoding));
                if (!payload.isShouldCommit()) continue;
                insertion = new Insertion(this.subFile, partId, isShouldRetry);
                insertion.setRecordList(payload.getRecordList());
                insertion.setByteSize(payload.getBatchByteSize());
                insertion.setLeaderServer(this.getLeaderServer(tableName, partId));
                payloadMap.remove(partId);
                ++totalBatch;
                this.enqueue(insertion);
            }
            if (!payloadMap.isEmpty()) {
                for (Map.Entry entry : payloadMap.entrySet()) {
                    Long partId = (Long)entry.getKey();
                    payload = (Payload)entry.getValue();
                    insertion = new Insertion(this.subFile, partId, isShouldRetry);
                    insertion.setRecordList(payload.getRecordList());
                    insertion.setByteSize(payload.getBatchByteSize());
                    insertion.setLeaderServer(this.getLeaderServer(tableName, partId));
                    ++totalBatch;
                    this.enqueue(insertion);
                }
            }
        }
        finally {
            payloadMap.clear();
        }
        return totalBatch;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void enqueue(Insertion insertion) throws Exception {
        Preconditions.checkArgument((insertion != null ? 1 : 0) != 0, (Object)"The insertion is null");
        String leaderServer = insertion.getLeaderServer();
        RingBuffer<Insertion> buffer = this.bufferGroup.getBuffer(leaderServer);
        long sequence = buffer.next();
        try {
            Insertion event = (Insertion)buffer.get(sequence);
            event.setRetry(insertion.isRetry());
            event.setSubFile(insertion.getSubFile());
            event.setByteSize(insertion.getByteSize());
            event.setRecordList(insertion.getRecordList());
            event.setPartitionId(insertion.getPartitionId());
            event.setLeaderServer(insertion.getLeaderServer());
            long produced = (long)buffer.getBufferSize() - buffer.remainingCapacity();
            this.meter.mark(leaderServer, insertion.getRecordCount(), insertion.getByteSize(), produced);
        }
        finally {
            buffer.publish(sequence);
        }
    }

    protected Record doCleaning(String schemaName, String table, Record record) throws Exception {
        if (!record.isValid() || this.controlManager == null || !this.controlManager.isControlDefined(schemaName, table)) {
            return record;
        }
        Map<String, MapObject> columnIndexMapping = this.tableInfo.getColumnIndexMapping();
        ArrayList newValues = Lists.newArrayListWithCapacity((int)columnIndexMapping.size());
        try {
            for (Map.Entry<String, MapObject> entry : columnIndexMapping.entrySet()) {
                String columnName = entry.getKey();
                MapObject mapObj = entry.getValue();
                Integer srcOffset = mapObj.getSource();
                if (srcOffset == null && mapObj.getGeneratedDefine() != null) {
                    newValues.add(mapObj.getGeneratedDefine().getValue());
                    continue;
                }
                Objects.requireNonNull(srcOffset, MessageFormat.format("Cannot find a match for column {0} of table {1}. It may caused by invalid definition in control file", columnName, this.tableInfo.getSchemaTable()));
                newValues.add(this.controlManager.transform(schemaName, table, columnName, record.get(srcOffset)));
            }
            record.replaceAll(newValues);
        }
        catch (Exception e) {
            record.setParsed(false);
            record.setBadCause(e.getMessage());
            String values = String.join((CharSequence)",", record.getValues());
            log.error("Clean data failed. Record: [{}]. Reason: {}.", (Object)values, (Object)ExceptionUtils.getRootCauseMessage(e));
        }
        return record;
    }

    protected void updateCurrentTaskState() {
        String table = this.subFile.getObjectName();
        AtomicInteger threshold = this.globalContext.getErrorCount(table);
        boolean hasErrCnt = threshold != null && threshold.get() > 0;
        boolean hasErrMsg = StringUtils.isNotBlank(this.subFile.getMessage());
        long invalidRecordCnt = this.subFile.getParsedCount().get() - this.subFile.getLoadedCount().get();
        if (hasErrCnt && hasErrMsg) {
            this.subFile.compareAndSetState(TaskState.RUNNING, TaskState.FAILURE);
        } else if (invalidRecordCnt > 0L) {
            String errMsg = String.format("Skipped %s problematic records for table %s. Check \"logs\" folder for details", invalidRecordCnt, this.tableInfo.getSchemaTable());
            log.warn(errMsg);
            if (org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)this.subFile.getMessage())) {
                this.subFile.setMessage(errMsg);
            }
            this.subFile.compareAndSetState(TaskState.RUNNING, this.globalContext.isStrict() ? TaskState.FAILURE : TaskState.SUCCESS);
        } else {
            this.subFile.compareAndSetState(TaskState.RUNNING, TaskState.SUCCESS);
        }
        this.updateTaskDetail(this.subFile);
    }

    protected String getLeaderServer(String table, Long partId) {
        Map<Long, String> partLeaderMap = this.tablePartLeaderMap.get(table);
        Preconditions.checkState((boolean)MapUtils.isNotEmpty(partLeaderMap), (String)"Leader server map is empty. Table: \"%s\"", (Object)table);
        if (this.parameter.hasNoSysPrivileges() || partId == Long.MIN_VALUE) {
            return partLeaderMap.computeIfAbsent(partId, v -> "0.0.0.0");
        }
        String leaderServer = partLeaderMap.get(partId);
        Preconditions.checkState((boolean)StringUtils.isNotBlank(leaderServer), (String)"Get leader server is null. Table: \"%s\", Part: %s", (Object)table, (Object)partId);
        return leaderServer;
    }

    protected int getByteSize(Record record, String charset) {
        List<String> values = record.getValues();
        if (CollectionUtils.isEmpty(values)) {
            return 0;
        }
        int total = values.size() * 2 + 4;
        return total + values.stream().mapToInt(value -> {
            try {
                String fe = StringUtils.isBlank(charset) ? "UTF-8" : charset;
                return value == null ? 1 : value.getBytes(fe).length;
            }
            catch (Exception e) {
                return (int)(value == null ? 1.0 : (double)value.length() * 1.5);
            }
        }).sum();
    }

    public void waitConsumerDone(int totalBatch) {
        while (this.supervisor.get() && !this.subFile.isFinished() && totalBatch != this.subFile.getMarkdone().get()) {
            TimeUtils.sleep(TimeUnit.MILLISECONDS, 50L);
        }
    }

    public void setMeter(Meter meter) {
        this.meter = meter;
    }

    public void setSubFile(SubFile subFile) {
        this.subFile = subFile;
    }

    public void setTableInfo(TableInfo tableInfo) {
        this.tableInfo = tableInfo;
    }

    public void setTableEntry(TableEntry tableEntry) {
        this.tableEntry = tableEntry;
    }

    public void setBufferGroup(RingBufferGroup bufferGroup) {
        this.bufferGroup = bufferGroup;
    }

    public void setControlManager(ControlManager controlManager) {
        this.controlManager = controlManager;
    }

    public void setAdaptiveBatchStrategy(AdaptiveBatchStrategy adaptiveBatchStrategy) {
        this.adaptiveBatchStrategy = adaptiveBatchStrategy;
    }

    public void setTablePartLeaderMap(Map<String, Map<Long, String>> tablePartLeaderMap) {
        this.tablePartLeaderMap = tablePartLeaderMap;
    }
}

