/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.connector.flink.sink;

import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObjType;
import com.alipay.oceanbase.rpc.table.ObDirectLoadBucket;
import com.alipay.oceanbase.rpc.table.ObTableDirectLoad;
import com.oceanbase.connector.flink.OceanBaseConnectorOptions;
import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider;
import com.oceanbase.connector.flink.connection.OceanBaseTablePartInfo;
import com.oceanbase.connector.flink.connection.OceanBaseUserInfo;
import com.oceanbase.connector.flink.connection.OceanBaseVersion;
import com.oceanbase.connector.flink.dialect.OceanBaseDialect;
import com.oceanbase.connector.flink.dialect.OceanBaseMySQLDialect;
import com.oceanbase.connector.flink.sink.RecordFlusher;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.SchemaChangeRecord;
import com.oceanbase.connector.flink.table.TableId;
import com.oceanbase.connector.flink.table.TableInfo;
import com.oceanbase.connector.flink.table.TransactionRecord;
import com.oceanbase.connector.flink.utils.TableCache;
import com.oceanbase.partition.calculator.enums.ObServerMode;
import com.oceanbase.partition.calculator.helper.TableEntryExtractor;
import com.oceanbase.partition.calculator.model.TableEntry;
import com.oceanbase.partition.calculator.model.TableEntryKey;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.flink.util.function.SerializableFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OceanBaseRecordFlusher
implements RecordFlusher {
    private static final Logger LOG = LoggerFactory.getLogger(OceanBaseRecordFlusher.class);
    private static final long serialVersionUID = 1L;
    private final OceanBaseConnectorOptions options;
    private final OceanBaseConnectionProvider connectionProvider;
    private final OceanBaseDialect dialect;
    private final TableCache<OceanBaseTablePartInfo> tablePartInfoCache;
    private final TableCache<ObTableDirectLoad> directLoadCache;
    private volatile long lastCheckMemStoreTime;

    public OceanBaseRecordFlusher(OceanBaseConnectorOptions options) {
        this(options, new OceanBaseConnectionProvider(options));
    }

    public OceanBaseRecordFlusher(OceanBaseConnectorOptions options, OceanBaseConnectionProvider connectionProvider) {
        this.options = options;
        this.connectionProvider = connectionProvider;
        this.dialect = connectionProvider.getDialect();
        this.tablePartInfoCache = new TableCache();
        this.directLoadCache = new TableCache();
    }

    public void flush(@Nonnull TransactionRecord record) throws Exception {
        if (this.options.getDirectLoadEnabled()) {
            switch (record.getType()) {
                case BEGIN: {
                    this.getTableDirectLoad(record.getTableId()).begin();
                    break;
                }
                case COMMIT: {
                    this.getTableDirectLoad(record.getTableId()).commit();
                    break;
                }
                case ROLLBACK: {
                    this.getTableDirectLoad(record.getTableId()).abort();
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unsupported transaction record type: " + record.getType());
                }
            }
        }
    }

    public synchronized void flush(@Nonnull SchemaChangeRecord record) throws Exception {
        if (this.options.getDirectLoadEnabled()) {
            throw new UnsupportedOperationException();
        }
        try (Connection connection = this.connectionProvider.getConnection();
             Statement statement = connection.createStatement();){
            statement.execute(record.getSql());
        }
        if (record.shouldRefreshSchema()) {
            this.tablePartInfoCache.remove(record.getTableId().identifier());
        }
        LOG.info("Flush SchemaChangeRecord successfully: {}", (Object)record);
    }

    public synchronized void flush(List<DataChangeRecord> records) throws Exception {
        if (records == null || records.isEmpty()) {
            return;
        }
        this.checkMemStore();
        TableInfo tableInfo = (TableInfo)records.get(0).getTable();
        TableId tableId = tableInfo.getTableId();
        ArrayList<DataChangeRecord> upsertBatch = new ArrayList<DataChangeRecord>();
        ArrayList<DataChangeRecord> deleteBatch = new ArrayList<DataChangeRecord>();
        records.forEach(data -> {
            if (data.isUpsert()) {
                upsertBatch.add((DataChangeRecord)data);
            } else {
                deleteBatch.add((DataChangeRecord)data);
            }
        });
        if (!upsertBatch.isEmpty()) {
            if (this.options.getDirectLoadEnabled()) {
                this.directLoad(tableId, tableInfo.getFieldNames(), upsertBatch);
            } else if (CollectionUtils.isEmpty((Collection)tableInfo.getKey())) {
                this.flush(this.dialect.getInsertIntoStatement(tableId.getSchemaName(), tableId.getTableName(), tableInfo.getFieldNames(), (SerializableFunction<String, String>)tableInfo.getPlaceholderFunc()), tableInfo.getFieldNames(), upsertBatch);
            } else {
                this.flush(this.dialect.getUpsertStatement(tableId.getSchemaName(), tableId.getTableName(), tableInfo.getFieldNames(), tableInfo.getKey(), (SerializableFunction<String, String>)tableInfo.getPlaceholderFunc()), tableInfo.getFieldNames(), upsertBatch);
            }
        }
        if (!deleteBatch.isEmpty()) {
            if (CollectionUtils.isEmpty((Collection)tableInfo.getKey())) {
                throw new RuntimeException("There should be no delete records when the table does not contain PK");
            }
            if (this.options.getDirectLoadEnabled()) {
                throw new RuntimeException("There should be no delete records when direct load is enabled");
            }
            this.flush(this.dialect.getDeleteStatement(tableId.getSchemaName(), tableId.getTableName(), tableInfo.getKey()), tableInfo.getKey(), deleteBatch);
        }
    }

    private void checkMemStore() throws SQLException {
        if (!this.options.getMemStoreCheckEnabled()) {
            return;
        }
        long now = System.currentTimeMillis();
        if (this.lastCheckMemStoreTime != 0L && now - this.lastCheckMemStoreTime < this.options.getMemStoreCheckInterval()) {
            return;
        }
        while (this.hasMemStoreReachedThreshold()) {
            LOG.warn("Memstore reaches threshold {}, thread will sleep {} milliseconds", (Object)this.options.getMemStoreThreshold(), (Object)this.options.getMemStoreCheckInterval());
            try {
                Thread.sleep(this.options.getMemStoreCheckInterval());
            }
            catch (InterruptedException e) {
                LOG.warn(e.getMessage());
            }
        }
        this.lastCheckMemStoreTime = System.currentTimeMillis();
    }

    /*
     * Exception decompiling
     */
    private boolean hasMemStoreReachedThreshold() throws SQLException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void directLoad(TableId tableId, List<String> fields, List<DataChangeRecord> records) throws Exception {
        ObDirectLoadBucket bucket = new ObDirectLoadBucket();
        for (DataChangeRecord record : records) {
            bucket.addRow((ObObj[])fields.stream().map(f -> this.toObObj(record.getFieldValue(f))).toArray(ObObj[]::new));
        }
        this.getTableDirectLoad(tableId).insert(bucket);
    }

    private ObTableDirectLoad getTableDirectLoad(TableId tableId) {
        return (ObTableDirectLoad)this.directLoadCache.get(tableId.identifier(), () -> this.connectionProvider.getDirectLoad(tableId));
    }

    private ObObj toObObj(Object value) {
        Object obObjValue = this.toObObjValue(value);
        return new ObObj(ObObjType.valueOfType((Object)obObjValue).getDefaultObjMeta(), obObjValue);
    }

    private Object toObObjValue(Object obj) {
        if (obj == null) {
            return null;
        }
        if (obj instanceof Time) {
            return new Timestamp(((Time)obj).getTime());
        }
        if (obj instanceof BigDecimal || obj instanceof BigInteger) {
            return obj.toString();
        }
        return obj;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void flush(String sql, List<String> statementFields, List<DataChangeRecord> records) throws Exception {
        Map<Long, List<DataChangeRecord>> group = this.groupRecords(records);
        if (group == null) {
            return;
        }
        try (Connection connection = this.connectionProvider.getConnection();
             PreparedStatement statement = connection.prepareStatement(sql);){
            for (List<DataChangeRecord> groupRecords : group.values()) {
                try {
                    for (DataChangeRecord record : groupRecords) {
                        for (int i = 0; i < statementFields.size(); ++i) {
                            statement.setObject(i + 1, record.getFieldValue(statementFields.get(i)));
                        }
                        statement.addBatch();
                    }
                    statement.executeBatch();
                }
                catch (SQLException e) {
                    throw new RuntimeException("Failed to execute batch with sql: " + sql + ", records: " + groupRecords, e);
                    return;
                }
            }
        }
    }

    private Map<Long, List<DataChangeRecord>> groupRecords(List<DataChangeRecord> records) {
        if (CollectionUtils.isEmpty(records)) {
            return null;
        }
        HashMap<Long, List<DataChangeRecord>> group = new HashMap<Long, List<DataChangeRecord>>();
        for (DataChangeRecord record : records) {
            Long partId = this.getPartId(record);
            group.computeIfAbsent(partId == null ? -1L : partId, k -> new ArrayList()).add(record);
        }
        return group;
    }

    private Long getPartId(DataChangeRecord record) {
        TableInfo tableInfo = (TableInfo)record.getTable();
        OceanBaseTablePartInfo tablePartInfo = this.getTablePartInfo(tableInfo.getTableId());
        if (tablePartInfo == null || MapUtils.isEmpty(tablePartInfo.getPartColumnIndexMap())) {
            return null;
        }
        Object[] obj = new Object[tableInfo.getFieldNames().size()];
        for (Map.Entry<String, Integer> entry : tablePartInfo.getPartColumnIndexMap().entrySet()) {
            obj[entry.getValue().intValue()] = record.getFieldValue(entry.getKey());
        }
        return tablePartInfo.getPartIdCalculator().calculatePartId(obj);
    }

    private OceanBaseTablePartInfo getTablePartInfo(TableId tableId) {
        if (this.options.getSyncWrite() || !this.options.getPartitionEnabled()) {
            return null;
        }
        return (OceanBaseTablePartInfo)this.tablePartInfoCache.get(tableId.identifier(), () -> this.queryTablePartInfo(tableId.getSchemaName(), tableId.getTableName()));
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private OceanBaseTablePartInfo queryTablePartInfo(String schemaName, String tableName) {
        OceanBaseVersion version = this.connectionProvider.getVersion();
        OceanBaseUserInfo userInfo = this.connectionProvider.getUserInfo();
        if (version.isV4() && "sys".equalsIgnoreCase(userInfo.getTenant()) || !version.isV4() && !"sys".equalsIgnoreCase(userInfo.getTenant())) {
            LOG.warn("Can't query table entry on OceanBase version {} with account of tenant {}.", (Object)version.getVersion(), (Object)userInfo.getTenant());
            return null;
        }
        TableEntryKey tableEntryKey = new TableEntryKey(userInfo.getCluster(), userInfo.getTenant(), schemaName, tableName, this.dialect instanceof OceanBaseMySQLDialect ? ObServerMode.fromMySql((String)version.getVersion()) : ObServerMode.fromOracle((String)version.getVersion()));
        LOG.debug("Query table entry by tableEntryKey: {}", (Object)tableEntryKey);
        try (Connection connection = this.connectionProvider.getConnection();){
            TableEntry tableEntry = new TableEntryExtractor().queryTableEntry(connection, tableEntryKey, version.isV4());
            if (tableEntry == null) {
                throw new RuntimeException("Failed to get table entry with key: " + tableEntryKey);
            }
            LOG.info("Query tableEntry by {}, got: {}", (Object)tableEntryKey, (Object)tableEntry);
            OceanBaseTablePartInfo oceanBaseTablePartInfo = new OceanBaseTablePartInfo(tableEntry, version.isV4());
            return oceanBaseTablePartInfo;
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to get table partition info", e);
        }
    }

    public void close() throws Exception {
        this.connectionProvider.close();
        if (this.tablePartInfoCache != null) {
            this.tablePartInfoCache.clear();
        }
        if (this.directLoadCache != null) {
            for (ObTableDirectLoad directLoad : this.directLoadCache.getAll()) {
                directLoad.getTable().close();
            }
            this.directLoadCache.clear();
        }
    }
}

