/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.loaddump.writer.oceanbase;

import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategy;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Stopwatch;
import com.lmax.disruptor.RingBuffer;
import com.oceanbase.tools.loaddump.common.enums.LoadStatus;
import com.oceanbase.tools.loaddump.common.exception.NonRetryableException;
import com.oceanbase.tools.loaddump.common.exception.RetryableException;
import com.oceanbase.tools.loaddump.common.model.ConnectionKey;
import com.oceanbase.tools.loaddump.common.model.Database;
import com.oceanbase.tools.loaddump.common.model.Insertion;
import com.oceanbase.tools.loaddump.common.model.LoadParameter;
import com.oceanbase.tools.loaddump.common.model.SubFile;
import com.oceanbase.tools.loaddump.common.model.TableInfo;
import com.oceanbase.tools.loaddump.common.model.TaskState;
import com.oceanbase.tools.loaddump.concurrent.FlowLimiter;
import com.oceanbase.tools.loaddump.concurrent.SmartStopStrategy;
import com.oceanbase.tools.loaddump.context.LoadContext;
import com.oceanbase.tools.loaddump.context.SqlContext;
import com.oceanbase.tools.loaddump.parser.record.Record;
import com.oceanbase.tools.loaddump.utils.CollectionUtils;
import com.oceanbase.tools.loaddump.utils.ExceptionUtils;
import com.oceanbase.tools.loaddump.utils.JdbcUtils;
import com.oceanbase.tools.loaddump.utils.SqlUtils;
import com.oceanbase.tools.loaddump.vmoption.JavaOpts;
import com.oceanbase.tools.loaddump.writer.oceanbase.AbstractOceanBaseWriter;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcClientWriter
extends AbstractOceanBaseWriter {
    private static final Logger log = LoggerFactory.getLogger(JdbcClientWriter.class);
    private static final int MIN_RETRY_TIMES = 1;
    private static final int MAX_RETRY_TIMES = 5;
    private static final Logger BAD_RECORD_LOGGER = LoggerFactory.getLogger((String)"BadRecordLogger");
    private static final Logger DIS_RECORD_LOGGER = LoggerFactory.getLogger((String)"DisRecordLogger");
    protected final Retryer<Boolean> retryer;
    protected final RetryCallback retryCallback;
    protected Database database;
    protected boolean replaceData;
    protected FlowLimiter flowLimiter;
    protected AtomicBoolean supervisor;
    protected ConnectionKey connectionKey;
    protected Connection connectionHolder;
    protected CompletableFuture<Connection> connectionFuture;
    protected Map<String, Map<Long, LoadStatus>> tableLoadStatusMap;

    public JdbcClientWriter(AtomicBoolean supervisor, LoadParameter parameter) {
        super(parameter);
        this.flowLimiter = parameter.createFlowLimiter();
        this.connectionKey = parameter.getConnectionKey();
        this.supervisor = supervisor;
        this.retryCallback = new RetryCallback(this);
        this.database = parameter.getDatabase();
        this.replaceData = parameter.isReplaceData();
        this.retryer = this.createRetryer(parameter.isFailFast() ? 1 : 5);
    }

    private Retryer<Boolean> createRetryer(int maxAttempts) {
        return RetryerBuilder.newBuilder().retryIfExceptionOfType(RetryableException.class).withWaitStrategy(WaitStrategies.exponentialWait((long)1L, (long)60L, (TimeUnit)TimeUnit.SECONDS)).withStopStrategy((StopStrategy)new SmartStopStrategy(this.supervisor, maxAttempts)).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onEvent(Insertion insertion) throws Exception {
        String table = insertion.getTable();
        SubFile subFile = insertion.getSubFile();
        try {
            if (this.loadCtx.isExceedMaxErrors(table)) {
                this.loadCtx.batchConsumed(insertion);
                return;
            }
            long partitionId = insertion.getPartitionId();
            LoadStatus loadStatus = this.tableLoadStatusMap.get(table).get(partitionId);
            if (LoadStatus.LoadMode.SLOW.equals((Object)loadStatus.getLoadMode())) {
                log.debug("[SLOW] {}, Table: \"{}\", Partition: {}", new Object[]{loadStatus, table, partitionId});
                Thread.sleep(100L);
            }
            AtomicLong beginTimeMillis = new AtomicLong(0L);
            while (LoadStatus.LoadMode.PAUSE.equals((Object)loadStatus.getLoadMode())) {
                log.debug("[PAUSE] {}, Table: \"{}\", Partition: {}", new Object[]{loadStatus, table, partitionId});
                Thread.sleep(100L);
                long currentTimeMillis = System.currentTimeMillis();
                beginTimeMillis.compareAndSet(0L, currentTimeMillis);
                long elapsedTimeMillis = currentTimeMillis - beginTimeMillis.get();
                if (elapsedTimeMillis < this.parameter.getMaxWaitTimeMillis()) continue;
                throw new IllegalStateException("The memory of leader server: " + loadStatus.getLeaderServer() + " is not released for more than " + elapsedTimeMillis + " millis. Table: \"" + table + "\", Partition: " + partitionId);
            }
            int recordCount = insertion.getRecordCount();
            while (!this.flowLimiter.tryAcquire(recordCount)) {
                Thread.sleep(50L);
                if (!log.isDebugEnabled()) continue;
                log.debug("Retry to acquire permits: {}. Maximum Permits: {}, Table: \"{}\", Partition: {}", new Object[]{recordCount, this.flowLimiter.getPermits(), table, partitionId});
            }
            this.retryer.call((Callable)this.retryCallback.withInsertion(insertion));
        }
        catch (Exception ex) {
            this.loadCtx.batchConsumed(insertion);
            subFile.setTaskState(TaskState.FAILURE);
            subFile.setMessage(MessageFormat.format("Load data into \"{0}\".\"{1}\" failed! {2}", subFile.getSchemaName(), subFile.getObjectName(), org.apache.commons.lang3.exception.ExceptionUtils.getRootCauseMessage((Throwable)ex)));
        }
        finally {
            insertion.clearAll();
        }
    }

    public Connection getConnection() {
        try {
            if (this.connectionFuture != null) {
                this.connectionHolder = this.connectionFuture.get();
            }
            if (this.connectionHolder != null && !this.connectionHolder.isClosed()) {
                Connection connection = this.connectionHolder;
                return connection;
            }
            this.resetConnection();
            Connection connection = this.connectionHolder;
            return connection;
        }
        catch (Throwable e) {
            if (e instanceof InterruptedException) {
                throw new NonRetryableException(e);
            }
            throw new RetryableException(e);
        }
        finally {
            this.connectionFuture = null;
        }
    }

    public void resetConnection() throws Throwable {
        if (this.connectionHolder != null && !this.connectionHolder.isClosed()) {
            return;
        }
        JdbcUtils.close(this.connectionHolder);
        log.warn("The connection holder is invalid or closed, reset a new direct connection now...");
        try {
            Stopwatch stopwatch = Stopwatch.createStarted();
            this.connectionHolder = this.connectionKey.getSessionManager().createNewConnection();
            log.warn("Reset a new direct connection and init session variables finished. Elapsed: {}", (Object)stopwatch);
        }
        catch (Exception e) {
            throw org.apache.commons.lang3.exception.ExceptionUtils.getRootCause((Throwable)e);
        }
    }

    @Override
    protected long getConsumedSlots(String leaderServer) {
        RingBuffer<Insertion> buffer = this.bufferGroup.getBuffer(leaderServer);
        return (long)buffer.getBufferSize() - buffer.remainingCapacity();
    }

    @Override
    public void shutdown() {
        JdbcUtils.close(this.getConnection());
    }

    public void setConnectionFuture(CompletableFuture<Connection> connectionFuture) {
        this.connectionFuture = connectionFuture;
    }

    public void setTableLoadStatusMap(Map<String, Map<Long, LoadStatus>> tableLoadStatusMap) {
        this.tableLoadStatusMap = tableLoadStatusMap;
    }

    public static class RetryCallback
    implements Callable<Boolean> {
        protected JdbcClientWriter writer;
        protected Insertion insertion;

        public RetryCallback(JdbcClientWriter writer) {
            this.writer = writer;
        }

        public RetryCallback withInsertion(Insertion insertion) {
            this.insertion = insertion;
            return this;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Boolean call() throws Exception {
            SubFile subFile = this.insertion.getSubFile();
            List<Record> recordList = this.insertion.getRecordList();
            LoadContext loadCtx = this.writer.loadCtx;
            if (CollectionUtils.isEmpty(recordList)) {
                loadCtx.batchConsumed(this.insertion);
                return true;
            }
            String table = this.insertion.getTable();
            long bytes = this.insertion.getByteSize();
            String leaderServer = this.insertion.getLeaderServer();
            TableInfo tableInfo = this.writer.database.getTableInfo(table);
            boolean status = true;
            int affectRows = 0;
            int recordCount = recordList.size();
            try {
                if (loadCtx.isExceedMaxErrors(table)) {
                    log.error("Too many errors occurred while loading into table \"{}\"", (Object)table);
                    subFile.setTaskState(TaskState.FAILURE);
                    loadCtx.batchConsumed(this.insertion);
                    Boolean bl = true;
                    return bl;
                }
                Connection conn = this.writer.getConnection();
                boolean isReplaceData = this.writer.replaceData;
                if (isReplaceData && tableInfo.isOracleMode() && tableInfo.getServer().isPrevious("2.2.76") && !this.insertion.isRetried()) {
                    if (tableInfo.hasPrimaryKey() || tableInfo.hasUniqueKey()) {
                        SqlContext ctx = this.buildBatchDeleteSqlContext(tableInfo, recordList);
                        affectRows = this.executeUpdate(conn, ctx);
                        log.info("Delete {} conflict records in oracle with --replace-data success", (Object)affectRows);
                    }
                } else if (isReplaceData && tableInfo.isMySqlMode()) {
                    tableInfo.buildReplaceIntoPrefix();
                }
                long begin = 0L;
                if (JavaOpts.isDebugable) {
                    begin = System.nanoTime();
                }
                boolean logicalDatabase = this.writer.database.isLogicalDatabase();
                if (Insertion.Mode.BATCH == this.insertion.getMode()) {
                    SqlContext ctx = this.buildBatchSqlContext(tableInfo, recordList);
                    try {
                        affectRows = logicalDatabase ? this.executeBatch(conn, ctx) : (JavaOpts.isDryRunMode ? ctx.getCount() : this.executeUpdate(conn, ctx));
                    }
                    catch (Exception ex) {
                        this.translateBatchModeException(tableInfo, ex);
                    }
                    status = ctx.isCompleted();
                } else {
                    for (SqlContext ctx : this.buildSerialSqlContext(tableInfo, recordList)) {
                        status &= ctx.isCompleted();
                        List<List<String>> batchArgs = ctx.getBatchArgs();
                        try {
                            affectRows += Math.min(this.executeUpdate(conn, ctx), 1);
                        }
                        catch (Exception ex) {
                            this.translateSerialModeException(tableInfo, batchArgs, affectRows, ex);
                        }
                        finally {
                            ctx.clearAll();
                        }
                    }
                }
                if (JavaOpts.isDebugable) {
                    String schemaTable = tableInfo.getSchemaTable();
                    String threadName = Thread.currentThread().getName();
                    long elapsed = (System.nanoTime() - begin) / 1000000L;
                    log.warn("[{}] JDBC insert {} rows elapsed: {} ms. Table: {}", new Object[]{threadName, recordCount, elapsed, schemaTable});
                }
                int loadedCount = Math.min(affectRows, recordCount);
                long consumed = this.writer.getConsumedSlots(leaderServer);
                this.writer.meter.mark(leaderServer, loadedCount, bytes, consumed);
                subFile.addLoadedBytes(bytes);
                subFile.addLoadedCount(loadedCount);
                loadCtx.batchConsumed(this.insertion);
                this.insertion.clearAll();
                Boolean bl = true;
                return bl;
            }
            finally {
                this.writer.flowLimiter.release(recordCount);
            }
        }

        protected List<SqlContext> buildSerialSqlContext(TableInfo tableInfo, List<Record> recordList) {
            if (this.writer.replaceData && tableInfo.isOracleMode() && !tableInfo.getServer().isPrevious("2.2.76")) {
                return SqlUtils.translateMergeIntoSingleStatement(tableInfo, recordList);
            }
            return SqlUtils.translateInsertSingleValueSqlContext(tableInfo, recordList);
        }

        protected SqlContext buildBatchSqlContext(TableInfo tableInfo, List<Record> recordList) {
            SqlContext ctx = this.insertion.getSqlContext();
            if (ctx != null && this.insertion.isRetried() && !this.writer.replaceData) {
                return ctx;
            }
            if (this.writer.database.isLogicalDatabase()) {
                ctx = SqlUtils.translateInsertBatchValueSqlContext(tableInfo, recordList);
            } else {
                try {
                    ctx = SqlUtils.translateInsertMultiValuesSqlContext(tableInfo, recordList);
                }
                catch (Exception e) {
                    ctx = SqlUtils.translateInsertBatchValueSqlContext(tableInfo, recordList);
                }
            }
            this.insertion.setRetried(true);
            this.insertion.setSqlContext(ctx);
            return this.insertion.getSqlContext();
        }

        protected SqlContext buildBatchDeleteSqlContext(TableInfo tableInfo, List<Record> recordList) {
            SqlContext ctx = this.insertion.getSqlContext();
            if (ctx != null && this.insertion.isRetried() && this.writer.replaceData) {
                return ctx;
            }
            ctx = SqlUtils.translateDeleteSqlContext(tableInfo, recordList);
            this.insertion.setRetried(true);
            this.insertion.setSqlContext(ctx);
            return this.insertion.getSqlContext();
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        public int executeUpdate(Connection conn, SqlContext ctx) throws Exception {
            if (ctx.getCount() == 0) {
                return 0;
            }
            String sql = ctx.getSql();
            List<List<String>> batchArgs = ctx.getBatchArgs();
            try (PreparedStatement pstmt = conn.prepareStatement(sql);){
                int index = 1;
                for (List<String> args : batchArgs) {
                    for (String arg : args) {
                        pstmt.setObject(index++, arg);
                    }
                }
                int n = pstmt.executeUpdate();
                return n;
            }
            catch (Exception ex) {
                if (!log.isDebugEnabled()) throw new SQLException(ex);
                log.debug("SQL: {}, args: {}", (Object)sql, batchArgs);
                throw new SQLException(ex);
            }
        }

        /*
         * Exception decompiling
         */
        public int executeBatch(Connection conn, SqlContext ctx) throws Exception {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }

        private void translateBatchModeException(@NonNull TableInfo tableInfo, @NonNull Exception ex) throws Exception {
            if (tableInfo == null) {
                throw new NullPointerException("tableInfo is marked non-null but is null");
            }
            if (ex == null) {
                throw new NullPointerException("ex is marked non-null but is null");
            }
            if (ex instanceof NonRetryableException) {
                throw ex;
            }
            String message = ExceptionUtils.getRootCauseMessage(ex);
            if (ExceptionUtils.isBadRecord(message) || ExceptionUtils.isDiscardRecord(message)) {
                boolean shouldRetry;
                boolean bl = shouldRetry = !ExceptionUtils.isDuplicateRecord(message) || !this.writer.replaceData || tableInfo.isOracleMode() && !tableInfo.getServer().isPrevious("2.2.76");
                if (shouldRetry) {
                    this.insertion.setRetry(true);
                    this.insertion.setRetried(false);
                    this.insertion.setMode(Insertion.Mode.SERIAL);
                }
            } else if (ExceptionUtils.isObserverError(message) || ExceptionUtils.isNetworkError(message) && CollectionUtils.isNotEmpty(tableInfo.getPrimaryCols())) {
                this.insertion.setMode(Insertion.Mode.BATCH);
                this.insertion.setRetried(true);
                this.insertion.setRetry(true);
            } else {
                log.error("Other Error: ", (Throwable)ex);
                throw new NonRetryableException(ex);
            }
            String table = this.insertion.getTable();
            long partId = this.insertion.getPartitionId();
            int count = this.insertion.getRecordCount();
            Insertion.Mode mode = this.insertion.getMode();
            log.warn("Retry Table: \"{}\", Partition: {}. Records: {}. Error: {}. Retry Mode: {}.", new Object[]{table, partId, count, message, mode});
            throw new RetryableException(ex);
        }

        private void translateSerialModeException(TableInfo tableInfo, List<List<String>> batchArgs, int affectRows, Exception ex) throws Exception {
            if (ex instanceof NonRetryableException) {
                throw ex;
            }
            SubFile subFile = this.insertion.getSubFile();
            LoadContext ctx = this.writer.loadCtx;
            String message = ExceptionUtils.getRootCauseMessage(ex);
            ArrayList args = new ArrayList();
            if (CollectionUtils.isEmpty(batchArgs)) {
                log.error("Other Error: {}", (Object)message);
            } else {
                args.addAll(batchArgs.get(0));
            }
            StringBuilder sb = new StringBuilder(256);
            for (int i = 0; i < args.size(); ++i) {
                Integer offset = tableInfo.getPhysicOffset(i);
                if (offset == null) continue;
                String data = (String)args.get(i);
                sb.append(data == null ? "null" : "'" + data + "'");
                if (i == args.size() - 1) continue;
                sb.append(",");
            }
            if (ExceptionUtils.isDiscardRecord(message)) {
                DIS_RECORD_LOGGER.error("{} VALUES ({});", (Object)tableInfo.getInsertPrefix(), (Object)sb);
                if (ctx.incrementAndIsExceedMaxDiscards(tableInfo.getTable())) {
                    subFile.addLoadedBytes(this.insertion.getByteSize());
                    subFile.addLoadedCount(affectRows);
                    throw new NonRetryableException("Too many discard records. Table: {}", tableInfo.getSchemaTable());
                }
            } else if (ExceptionUtils.isBadRecord(message)) {
                BAD_RECORD_LOGGER.error("{} VALUES ({});\nCause: {}\n", new Object[]{tableInfo.getInsertPrefix(), sb, message});
                if (ctx.incrementAndIsExceedMaxErrors(tableInfo.getTable())) {
                    subFile.addLoadedBytes(this.insertion.getByteSize());
                    subFile.addLoadedCount(affectRows);
                    throw new NonRetryableException("Too many bad records. Table: {}", tableInfo.getSchemaTable());
                }
            } else {
                log.error("Other Error: \"{}\". SQL: {} VALUES ({});", new Object[]{message, tableInfo.getInsertPrefix(), sb});
                throw new NonRetryableException(ex);
            }
            args.clear();
            sb.setLength(0);
        }

        private static /* synthetic */ int lambda$executeBatch$0(int s) {
            return s == -2 ? 1 : (s == -3 ? 0 : s);
        }
    }
}

