/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.loaddump.writer.oceanbase;

import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategy;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Stopwatch;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WorkHandler;
import com.oceanbase.tools.loaddump.common.enums.LoadStatus;
import com.oceanbase.tools.loaddump.common.enums.ServerMode;
import com.oceanbase.tools.loaddump.common.exception.NonRetryableException;
import com.oceanbase.tools.loaddump.common.exception.RetryableException;
import com.oceanbase.tools.loaddump.common.model.ConnectionKey;
import com.oceanbase.tools.loaddump.common.model.Database;
import com.oceanbase.tools.loaddump.common.model.Insertion;
import com.oceanbase.tools.loaddump.common.model.LoadParameter;
import com.oceanbase.tools.loaddump.common.model.SubFile;
import com.oceanbase.tools.loaddump.common.model.TableInfo;
import com.oceanbase.tools.loaddump.common.model.TaskState;
import com.oceanbase.tools.loaddump.concurrent.FlowLimiter;
import com.oceanbase.tools.loaddump.concurrent.SmartStopStrategy;
import com.oceanbase.tools.loaddump.context.GlobalContext;
import com.oceanbase.tools.loaddump.context.SqlContext;
import com.oceanbase.tools.loaddump.manager.session.SessionOption;
import com.oceanbase.tools.loaddump.manager.session.SessionProperties;
import com.oceanbase.tools.loaddump.metrics.Meter;
import com.oceanbase.tools.loaddump.parser.record.Record;
import com.oceanbase.tools.loaddump.ringbuffer.RingBufferGroup;
import com.oceanbase.tools.loaddump.utils.CollectionUtils;
import com.oceanbase.tools.loaddump.utils.ExceptionUtils;
import com.oceanbase.tools.loaddump.utils.JdbcUtils;
import com.oceanbase.tools.loaddump.utils.SqlUtils;
import com.oceanbase.tools.loaddump.vmoption.JvmArgs;
import com.oceanbase.tools.loaddump.writer.AbstractWriter;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.NonNull;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcClientWriter
extends AbstractWriter
implements WorkHandler<Insertion> {
    private static final Logger log = LoggerFactory.getLogger(JdbcClientWriter.class);
    private static final int MIN_RETRY_TIMES = 1;
    private static final int MAX_RETRY_TIMES = 5;
    private static final Logger BAD_RECORD_LOGGER = LoggerFactory.getLogger((String)"BadRecordLogger");
    private static final Logger DIS_RECORD_LOGGER = LoggerFactory.getLogger((String)"DisRecordLogger");
    protected final Retryer<Boolean> retryer;
    protected final RetryCallback retryCallback;
    protected Meter meter;
    protected Database database;
    protected boolean replaceData;
    protected LoadParameter parameter;
    protected FlowLimiter flowLimiter;
    protected AtomicBoolean supervisor;
    protected ConnectionKey connectionKey;
    protected Connection connectionHolder;
    protected RingBufferGroup bufferGroup;
    protected CompletableFuture<Connection> connectionFuture;
    protected Map<String, Map<Long, LoadStatus>> tableLoadStatusMap;
    protected final Cache<String, PreparedStatement> pstmtCache;

    public JdbcClientWriter(AtomicBoolean supervisor, LoadParameter parameter) {
        this.parameter = parameter;
        this.flowLimiter = parameter.createFlowLimiter();
        this.connectionKey = parameter.getConnectionKey();
        this.supervisor = supervisor;
        this.pstmtCache = this.createCache();
        this.retryCallback = new RetryCallback(this);
        this.database = parameter.getDatabase();
        this.replaceData = parameter.isReplaceData();
        this.retryer = this.createRetryer(parameter.isFailFast() ? 1 : 5);
    }

    private Retryer<Boolean> createRetryer(int maxAttempts) {
        return RetryerBuilder.newBuilder().retryIfExceptionOfType(RetryableException.class).withWaitStrategy(WaitStrategies.exponentialWait((long)1L, (long)60L, (TimeUnit)TimeUnit.SECONDS)).withStopStrategy((StopStrategy)new SmartStopStrategy(this.supervisor, maxAttempts)).build();
    }

    public Cache<String, PreparedStatement> createCache() {
        return CacheBuilder.newBuilder().initialCapacity(64).maximumSize(8192L).softValues().concurrencyLevel(256).expireAfterAccess(5L, TimeUnit.MINUTES).removalListener(n -> {
            JdbcUtils.close((AutoCloseable)n.getValue());
            log.debug("Remove the cached prepared statement as the key has been marked: {}", (Object)n.getCause());
        }).build();
    }

    public void expireCache() {
        this.getPstmtCache().invalidateAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEvent(Insertion insertion) throws Exception {
        String table = insertion.getTable();
        SubFile subFile = insertion.getSubFile();
        try {
            if (this.getGlobalContext().isExceedMaxErrors(table)) {
                subFile.markdone();
                return;
            }
            long partitionId = insertion.getPartitionId();
            LoadStatus loadStatus = this.tableLoadStatusMap.get(table).get(partitionId);
            if (loadStatus == null) {
                loadStatus = new LoadStatus(false, RandomUtils.nextDouble((double)0.3, (double)0.95));
                log.warn("Load status is not refreshed. Table: \"{}\", Partition: {}. Use random ratio instead.", (Object)table, (Object)partitionId);
            }
            if (LoadStatus.LoadMode.SLOW.equals((Object)loadStatus.getLoadMode())) {
                log.debug("[SLOW] {}, Table: \"{}\", Partition: {}", new Object[]{loadStatus, table, partitionId});
                Thread.sleep(100L);
            }
            AtomicLong beginTimeMillis = new AtomicLong(0L);
            while (LoadStatus.LoadMode.PAUSE.equals((Object)loadStatus.getLoadMode())) {
                log.debug("[PAUSE] {}, Table: \"{}\", Partition: {}", new Object[]{loadStatus, table, partitionId});
                Thread.sleep(100L);
                long currentTimeMillis = System.currentTimeMillis();
                beginTimeMillis.compareAndSet(0L, currentTimeMillis);
                long elapsedTimeMillis = currentTimeMillis - beginTimeMillis.get();
                if (elapsedTimeMillis < this.parameter.getMaxWaitTimeMillis()) continue;
                super.getGlobalContext().incrementErrorCount(table);
                throw new IllegalStateException("The memory of leader server: " + loadStatus.getLeaderServer() + " is not released for more than " + elapsedTimeMillis + " millis. Table: \"" + table + "\", Partition: " + partitionId);
            }
            int recordCount = insertion.getRecordCount();
            while (!this.flowLimiter.tryAcquire(recordCount)) {
                Thread.sleep(50L);
                log.debug("Retry to acquire permits: {}. Maximum Permits: {}, Table: \"{}\", Partition: {}", new Object[]{recordCount, this.flowLimiter.getPermits(), table, partitionId});
            }
            this.retryer.call((Callable)this.retryCallback.withInsertion(insertion));
        }
        catch (Exception ex) {
            subFile.setTaskState(TaskState.FAILURE);
            subFile.setMessage(String.format("Load data into \"%s\".\"%s\" failed! %s", subFile.getSchemaName(), subFile.getObjectName(), org.apache.commons.lang3.exception.ExceptionUtils.getRootCauseMessage((Throwable)ex)));
            subFile.markdone();
        }
        finally {
            insertion.clearAll();
        }
    }

    public Connection getConnection() {
        try {
            if (this.connectionFuture != null) {
                this.connectionHolder = this.connectionFuture.get();
            }
            if (this.connectionHolder != null && !this.connectionHolder.isClosed()) {
                Connection connection = this.connectionHolder;
                return connection;
            }
            this.resetConnection();
            Connection connection = this.connectionHolder;
            return connection;
        }
        catch (Throwable e) {
            if (e instanceof InterruptedException) {
                throw new NonRetryableException(e);
            }
            throw new RetryableException(e);
        }
        finally {
            this.connectionFuture = null;
        }
    }

    public void resetConnection() throws Throwable {
        if (this.connectionHolder != null && !this.connectionHolder.isClosed()) {
            return;
        }
        JdbcUtils.close(this.connectionHolder);
        log.warn("The connection holder is invalid or closed, reset a new direct connection now...");
        try {
            Stopwatch stopwatch = Stopwatch.createStarted();
            boolean supportBatch = !this.database.isLogicalDatabase();
            SessionOption sessionOpts = this.buildSessionOption();
            this.connectionHolder = this.connectionKey.getSessionManager().createNewConnection(true, supportBatch, sessionOpts);
            this.expireCache();
            log.warn("Reset a new direct connection and init session variables finished. Elapsed: {}", (Object)stopwatch);
        }
        catch (Exception e) {
            throw org.apache.commons.lang3.exception.ExceptionUtils.getRootCause((Throwable)e);
        }
    }

    public SessionOption buildSessionOption() {
        SessionOption opt = SessionOption.join(SessionOption.withOpenAutoCommit(), SessionOption.withQueryTimeout(SessionProperties.minToMicros("ob.timeout.for.query.metadata")), SessionOption.withTrxTimeout(SessionProperties.minToMicros("ob.timeout.for.exec.dml")));
        if (ServerMode.ORACLE.equals((Object)this.database.getServerMode())) {
            opt = SessionOption.join(opt, SessionOption.withNlsDateFormat(this.parameter.getNlsDateFormat()), SessionOption.withNlsTimestampFormat(this.parameter.getNlsTimestampFormat()), SessionOption.withNlsTimestampTzFormat(this.parameter.getNlsTimestampTzFormat()));
        }
        return opt;
    }

    protected long getConsumedSlots(String leaderServer) {
        RingBuffer<Insertion> buffer = this.bufferGroup.getBuffer(leaderServer);
        return (long)buffer.getBufferSize() - buffer.remainingCapacity();
    }

    protected PreparedStatement cachedPreparedStatement(Connection conn, String sql) throws Exception {
        return (PreparedStatement)this.getPstmtCache().get((Object)sql, () -> conn.prepareStatement(sql));
    }

    public void setMeter(Meter meter) {
        this.meter = meter;
    }

    public void setBufferGroup(RingBufferGroup bufferGroup) {
        this.bufferGroup = bufferGroup;
    }

    public void setConnectionFuture(CompletableFuture<Connection> connectionFuture) {
        this.connectionFuture = connectionFuture;
    }

    public void setTableLoadStatusMap(Map<String, Map<Long, LoadStatus>> tableLoadStatusMap) {
        this.tableLoadStatusMap = tableLoadStatusMap;
    }

    public Cache<String, PreparedStatement> getPstmtCache() {
        return this.pstmtCache;
    }

    public static class RetryCallback
    implements Callable<Boolean> {
        protected JdbcClientWriter writer;
        protected Insertion insertion;

        public RetryCallback(JdbcClientWriter writer) {
            this.writer = writer;
        }

        public RetryCallback withInsertion(Insertion insertion) {
            this.insertion = insertion;
            return this;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Boolean call() throws Exception {
            String table = this.insertion.getTable();
            long bytes = this.insertion.getByteSize();
            SubFile subFile = this.insertion.getSubFile();
            String leaderServer = this.insertion.getLeaderServer();
            List<Record> recordList = this.insertion.getRecordList();
            GlobalContext globalContext = this.writer.getGlobalContext();
            TableInfo tableInfo = this.writer.database.getTableInfo(table);
            boolean status = true;
            int affectRows = 0;
            int recordCount = recordList.size();
            try {
                if (globalContext.isExceedMaxErrors(table)) {
                    log.error("Too many errors occurred while loading into table \"{}\"", (Object)table);
                    subFile.setTaskState(TaskState.FAILURE);
                    subFile.markdone();
                    Boolean bl = true;
                    return bl;
                }
                Connection conn = this.writer.getConnection();
                boolean isReplaceData = this.writer.replaceData;
                if (isReplaceData && tableInfo.isOracleMode() && !this.insertion.isRetried()) {
                    if (tableInfo.hasPrimaryKey() || tableInfo.hasUniqueKey()) {
                        SqlContext ctx = this.buildBatchDeleteSqlContext(tableInfo, recordList);
                        affectRows = this.executeUpdate(conn, ctx);
                        log.info("Delete {} conflict records in oracle with --replace-data success", (Object)affectRows);
                    }
                } else if (isReplaceData && tableInfo.isMySqlMode()) {
                    tableInfo.buildReplaceIntoPrefix();
                }
                long begin = 0L;
                if (JvmArgs.isDebugable) {
                    begin = System.nanoTime();
                }
                boolean logicalDatabase = this.writer.database.isLogicalDatabase();
                if (Insertion.Mode.BATCH == this.insertion.getMode()) {
                    SqlContext ctx = this.buildBatchSqlContext(tableInfo, recordList);
                    try {
                        affectRows = logicalDatabase ? this.executeBatch(conn, ctx) : (JvmArgs.isDisableImport ? ctx.getCount() : this.executeUpdate(conn, ctx));
                    }
                    catch (Exception ex) {
                        this.translateBatchModeException(tableInfo, ex);
                    }
                    status = ctx.isCompleted();
                } else {
                    for (SqlContext ctx : this.buildSerialSqlContext(tableInfo, recordList)) {
                        status &= ctx.isCompleted();
                        List<List<String>> batchArgs = ctx.getBatchArgs();
                        try {
                            affectRows += this.executeUpdate(conn, ctx);
                        }
                        catch (Exception ex) {
                            this.translateSerialModeException(tableInfo, batchArgs, Math.min(affectRows, recordCount), ex);
                        }
                        finally {
                            ctx.clearAll();
                        }
                    }
                }
                if (JvmArgs.isDebugable) {
                    String schemaTable = tableInfo.getSchemaTable();
                    String threadName = Thread.currentThread().getName();
                    long elapsed = (System.nanoTime() - begin) / 1000000L;
                    log.warn("[{}] JDBC insert {} rows elapsed: {} ms. Table: {}", new Object[]{threadName, recordCount, elapsed, schemaTable});
                }
                int loadedCount = Math.min(affectRows, recordCount);
                long consumed = this.writer.getConsumedSlots(leaderServer);
                this.writer.meter.mark(leaderServer, loadedCount, bytes, consumed);
                this.insertion.clearAll();
                subFile.addLoadedBytes(bytes);
                subFile.addLoadedCount(loadedCount);
                if (!status || loadedCount < recordCount) {
                    log.warn("Bad records were found in file: \"{}\". Check \"ob-loader-dumper.bad\" for details", (Object)subFile.getUniquePath());
                }
                subFile.markdone();
                Boolean bl = true;
                return bl;
            }
            finally {
                this.writer.flowLimiter.release(recordCount);
            }
        }

        protected List<SqlContext> buildSerialSqlContext(TableInfo tableInfo, List<Record> recordList) {
            return SqlUtils.translateInsertSingleValueSqlContext(tableInfo, recordList);
        }

        protected SqlContext buildBatchSqlContext(TableInfo tableInfo, List<Record> recordList) {
            SqlContext ctx = this.insertion.getSqlContext();
            if (ctx != null && this.insertion.isRetried() && !this.writer.replaceData) {
                return ctx;
            }
            if (this.writer.database.isLogicalDatabase()) {
                ctx = SqlUtils.translateInsertBatchValueSqlContext(tableInfo, recordList);
            } else {
                try {
                    ctx = SqlUtils.translateInsertMultiValuesSqlContext(tableInfo, recordList);
                }
                catch (Exception e) {
                    ctx = SqlUtils.translateInsertBatchValueSqlContext(tableInfo, recordList);
                }
            }
            this.insertion.setRetried(true);
            this.insertion.setSqlContext(ctx);
            return this.insertion.getSqlContext();
        }

        protected SqlContext buildBatchDeleteSqlContext(TableInfo tableInfo, List<Record> recordList) {
            SqlContext ctx = this.insertion.getSqlContext();
            if (ctx != null && this.insertion.isRetried() && this.writer.replaceData) {
                return ctx;
            }
            ctx = SqlUtils.translateDeleteSqlContext(tableInfo, recordList);
            this.insertion.setRetried(true);
            this.insertion.setSqlContext(ctx);
            return this.insertion.getSqlContext();
        }

        public int executeUpdate(Connection conn, SqlContext ctx) throws Exception {
            if (ctx.getCount() == 0) {
                return 0;
            }
            String sql = ctx.getSql();
            List<List<String>> batchArgs = ctx.getBatchArgs();
            PreparedStatement pstmt = this.writer.cachedPreparedStatement(conn, sql);
            pstmt.clearParameters();
            try {
                int index = 1;
                for (List<String> args : batchArgs) {
                    for (String arg : args) {
                        pstmt.setObject(index++, arg);
                    }
                }
                return pstmt.executeUpdate();
            }
            catch (Exception ex) {
                log.debug("SQL: {}", (Object)sql);
                throw new SQLException(ex);
            }
        }

        public int executeBatch(Connection conn, SqlContext ctx) throws Exception {
            if (ctx.getCount() == 0) {
                return 0;
            }
            String sql = ctx.getSql();
            List<List<String>> batchArgs = ctx.getBatchArgs();
            conn.setAutoCommit(false);
            PreparedStatement pstmt = this.writer.cachedPreparedStatement(conn, sql);
            pstmt.clearParameters();
            try {
                for (List<String> args : batchArgs) {
                    for (int i = 0; i < args.size(); ++i) {
                        pstmt.setObject(i + 1, args.get(i));
                    }
                    pstmt.addBatch();
                }
                int[] arr = pstmt.executeBatch();
                conn.commit();
                pstmt.clearBatch();
                int n = Arrays.stream(arr).map(s -> s == -2 ? 1 : (s == -3 ? 0 : s)).sum();
                return n;
            }
            catch (Exception ex) {
                log.debug("SQL: {}", (Object)sql);
                conn.rollback();
                throw new SQLException(ex);
            }
            finally {
                conn.setAutoCommit(true);
            }
        }

        private void translateBatchModeException(@NonNull TableInfo tableInfo, @NonNull Exception ex) throws Exception {
            boolean isEnvError;
            if (tableInfo == null) {
                throw new NullPointerException("tableInfo is marked non-null but is null");
            }
            if (ex == null) {
                throw new NullPointerException("ex is marked non-null but is null");
            }
            if (ex instanceof NonRetryableException) {
                throw ex;
            }
            String message = ExceptionUtils.getRootCauseMessage(ex);
            boolean isDataError = ExceptionUtils.isBadRecord(message) || ExceptionUtils.isDiscardRecord(message) && !this.writer.replaceData;
            boolean bl = isEnvError = ExceptionUtils.isObserverError(message) || ExceptionUtils.isNetworkError(message);
            if (isDataError) {
                this.insertion.setRetry(true);
                this.insertion.setRetried(false);
                this.insertion.setMode(Insertion.Mode.SERIAL);
            } else if (isEnvError && MapUtils.isNotEmpty(tableInfo.getPrimaryKeyMap())) {
                this.insertion.setMode(Insertion.Mode.BATCH);
                this.insertion.setRetried(true);
                this.insertion.setRetry(true);
            } else {
                log.error("Other Error: ", (Throwable)ex);
                throw new NonRetryableException(ex);
            }
            String table = this.insertion.getTable();
            long partId = this.insertion.getPartitionId();
            int count = this.insertion.getRecordCount();
            Insertion.Mode mode = this.insertion.getMode();
            log.warn("Retry Table: \"{}\", Partition: {}. Records: {}. Error: {}. Retry Mode: {}.", new Object[]{table, partId, count, message, mode});
            throw new RetryableException(ex);
        }

        private void translateSerialModeException(TableInfo tableInfo, List<List<String>> batchArgs, int affectRows, Exception ex) throws Exception {
            if (ex instanceof NonRetryableException) {
                throw ex;
            }
            SubFile subFile = this.insertion.getSubFile();
            GlobalContext ctx = this.writer.getGlobalContext();
            String message = ExceptionUtils.getRootCauseMessage(ex);
            ArrayList args = new ArrayList();
            if (CollectionUtils.isEmpty(batchArgs)) {
                log.error("Other Error: {}", (Object)message);
            } else {
                args.addAll(batchArgs.get(0));
            }
            StringBuilder sb = new StringBuilder(256);
            for (int i = 0; i < args.size(); ++i) {
                Integer offset = tableInfo.getPhysicOffset(i);
                if (offset == null) continue;
                String data = (String)args.get(i);
                sb.append(data == null ? "null" : "'" + data + "'");
                if (i == args.size() - 1) continue;
                sb.append(",");
            }
            if (ExceptionUtils.isDiscardRecord(message)) {
                DIS_RECORD_LOGGER.error("{} VALUES ({});", (Object)tableInfo.getInsertPrefix(), (Object)sb);
                if (ctx.incrementAndIsExceedMaxDiscards(tableInfo.getTable())) {
                    subFile.addLoadedBytes(this.insertion.getByteSize());
                    subFile.addLoadedCount(affectRows);
                    throw new NonRetryableException("Too many discard records. Table: {}", tableInfo.getSchemaTable());
                }
            } else if (ExceptionUtils.isBadRecord(message)) {
                BAD_RECORD_LOGGER.error("{} VALUES ({});\nCause: {}\n", new Object[]{tableInfo.getInsertPrefix(), sb, message});
                if (ctx.incrementAndIsExceedMaxErrors(tableInfo.getTable())) {
                    subFile.addLoadedBytes(this.insertion.getByteSize());
                    subFile.addLoadedCount(affectRows);
                    throw new NonRetryableException("Too many bad records. Table: {}", tableInfo.getSchemaTable());
                }
            } else {
                log.error("Other Error: \"{}\". SQL: {} VALUES ({});", new Object[]{message, tableInfo.getInsertPrefix(), sb});
                throw new NonRetryableException(ex);
            }
            args.clear();
            sb.setLength(0);
        }
    }
}

