/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.loaddump.directpath;

import com.alipay.oceanbase.rpc.property.Property;
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObLoadDupActionType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableLoadClientStatus;
import com.alipay.oceanbase.rpc.table.ObDirectLoadBucket;
import com.alipay.oceanbase.rpc.table.ObDirectLoadParameter;
import com.alipay.oceanbase.rpc.table.ObTable;
import com.alipay.oceanbase.rpc.table.ObTableDirectLoad;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.oceanbase.tools.loaddump.directpath.AbstractRestrictedConnection;
import com.oceanbase.tools.loaddump.directpath.DirectPathPreparedStatement;
import com.oceanbase.tools.loaddump.utils.StringUtils;
import com.oceanbase.tools.loaddump.utils.TimeUtils;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DirectPathConnection
extends AbstractRestrictedConnection {
    private static final Logger log = LoggerFactory.getLogger(DirectPathConnection.class);
    private State state;
    private final ObTableDirectLoad load;
    private final Object lock = new Object();
    private Stopwatch sw;
    private long commitElapsed;
    private long writeElapsed;

    private DirectPathConnection(ObTableDirectLoad load) {
        this.load = load;
        this.state = State.INIT;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void begin() throws SQLException {
        Object object = this.lock;
        synchronized (object) {
            if (this.state == State.INIT) {
                try {
                    this.load.begin();
                    this.state = State.BEGIN;
                    this.sw = Stopwatch.createStarted();
                }
                catch (Exception ex) {
                    throw new SQLException(ex);
                }
            } else if (this.state == State.CLOSED) {
                throw new IllegalStateException("Attempt to begin a closed connection");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void commit() throws SQLException {
        Object object = this.lock;
        synchronized (object) {
            if (this.state == State.BEGIN) {
                try {
                    this.writeElapsed = this.sw.elapsed(TimeUnit.MILLISECONDS);
                    this.load.commit();
                    this.state = State.COMMITTING;
                    log.info("Commit load task on table \"{}\". This might take a while. Please wait...", (Object)this.getTableName());
                    this.waitServerCommit();
                    log.info("Load task on table \"{}\" is committed successfully! Elapsed: {}", (Object)this.getTableName(), (Object)this.getCommitElapsed());
                }
                catch (Exception e) {
                    throw new SQLException(e);
                }
                finally {
                    this.sw.stop();
                }
            } else {
                throw new IllegalStateException("Commit transaction failed as connection state is not BEGIN");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rollback() throws SQLException {
        Object object = this.lock;
        synchronized (object) {
            this.sw.stop();
            if (this.state == State.BEGIN || this.state == State.COMMITTING) {
                try {
                    this.load.abort();
                }
                catch (Exception ex) {
                    throw new SQLException(ex);
                }
            } else {
                throw new IllegalStateException("Rollback transaction failed as connection state is not BEGIN or COMMITTING");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Object object = this.lock;
        synchronized (object) {
            if (this.state != null && this.state != State.CLOSED) {
                try {
                    ObTableLoadClientStatus status = this.load.getStatus();
                    if (status == ObTableLoadClientStatus.RUNNING || status == ObTableLoadClientStatus.ERROR) {
                        log.warn("Unexpected status: {}, aborting load task on table \"{}\"...", (Object)this.load.getStatus(), (Object)this.getTableName());
                        this.load.abort();
                    }
                }
                catch (Exception e) {
                    log.error("Close direct load connection failed. Error: ", (Throwable)e);
                }
                this.load.getTable().close();
                this.state = State.CLOSED;
            }
        }
    }

    @Override
    public DirectPathPreparedStatement createStatement() throws SQLException {
        return this.prepareStatement(null);
    }

    @Override
    public DirectPathPreparedStatement prepareStatement(String sql) throws SQLException {
        if (this.state == State.BEGIN) {
            return new DirectPathPreparedStatement(this);
        }
        throw new IllegalStateException("Create statement failed as connection state is not BEGIN");
    }

    @Override
    public String getSchema() {
        return this.load.getTable().getDatabase();
    }

    public String getTableName() {
        return this.load.getTableName();
    }

    @Override
    public boolean isClosed() {
        return this.state == State.CLOSED;
    }

    public boolean isCommitting() {
        return this.state == State.COMMITTING;
    }

    public boolean isInit() {
        return this.state == State.INIT;
    }

    int[] insert(ObDirectLoadBucket bucket) throws SQLException {
        try {
            this.load.insert(bucket);
            int[] result = new int[bucket.getRowCount()];
            Arrays.fill(result, 1);
            return result;
        }
        catch (Exception ex) {
            throw new SQLException(ex);
        }
    }

    private void waitServerCommit() throws SQLException {
        try {
            Retryer retryer = RetryerBuilder.newBuilder().retryIfResult(r -> Objects.equals(r, false)).withWaitStrategy(WaitStrategies.fixedWait((long)500L, (TimeUnit)TimeUnit.MILLISECONDS)).withStopStrategy(StopStrategies.neverStop()).build();
            retryer.call(() -> {
                ObTableLoadClientStatus status = this.load.getStatus();
                switch (status) {
                    case ERROR: 
                    case ABORT: {
                        throw new IllegalStateException("Load task is aborted with error code: " + this.load.getErrorCode());
                    }
                    case COMMITTING: {
                        return false;
                    }
                    case COMMIT: {
                        return true;
                    }
                }
                this.load.abort();
                throw new IllegalStateException("Unexpected load status: " + status);
            });
            this.commitElapsed = this.sw.elapsed(TimeUnit.MILLISECONDS) - this.writeElapsed;
        }
        catch (Exception ex) {
            throw new SQLException(ex);
        }
    }

    public String getTotalElapsed() {
        return TimeUtils.formatMilliseconds(this.sw.elapsed(TimeUnit.MILLISECONDS));
    }

    public String getCommitElapsed() {
        return TimeUtils.formatMilliseconds(this.commitElapsed);
    }

    public String getWriteElapsed() {
        return TimeUtils.formatMilliseconds(this.writeElapsed);
    }

    public static class Builder {
        private String host;
        private int port;
        private String user;
        private String tenant;
        private String password;
        private String schema;
        private String table;
        private int parallel;
        private int thread;
        private long maxErrorCount;
        private ObLoadDupActionType duplicateKeyAction;
        private long serverTimeout;
        private long heartBeatTimeout;
        private String cluster;
        private boolean publicCloud;

        public Builder host(String host) {
            this.host = host;
            return this;
        }

        public Builder port(int port) {
            this.port = port;
            return this;
        }

        public Builder user(String user) {
            this.user = user;
            return this;
        }

        public Builder tenant(String tenant) {
            this.tenant = tenant;
            return this;
        }

        public Builder password(String password) {
            this.password = password;
            return this;
        }

        public Builder schema(String schema) {
            this.schema = schema;
            return this;
        }

        public Builder table(String table) {
            this.table = table;
            return this;
        }

        public Builder parallel(int parallel) {
            this.parallel = parallel;
            return this;
        }

        public Builder thread(int thread) {
            this.thread = thread;
            return this;
        }

        public Builder maxErrorCount(long maxErrorCount) {
            this.maxErrorCount = maxErrorCount;
            return this;
        }

        public Builder duplicateKeyAction(ObLoadDupActionType duplicateKeyAction) {
            this.duplicateKeyAction = duplicateKeyAction;
            return this;
        }

        public Builder serverTimeout(long serverTimeout) {
            this.serverTimeout = serverTimeout;
            return this;
        }

        public Builder heartBeatTimeout(long heartBeatTimeout) {
            this.heartBeatTimeout = heartBeatTimeout;
            return this;
        }

        public Builder cluster(String cluster) {
            this.cluster = cluster;
            return this;
        }

        public Builder publicCloud(boolean publicCloud) {
            this.publicCloud = publicCloud;
            return this;
        }

        public DirectPathConnection build() throws Exception {
            if (this.cluster != null && !this.publicCloud) {
                this.user = this.user + "@" + this.tenant + "#" + this.cluster;
            }
            return this.createConnection(this.host, this.port, this.user, this.tenant, this.password, this.schema, this.table, this.parallel, this.thread, this.maxErrorCount, this.duplicateKeyAction, this.serverTimeout, this.heartBeatTimeout);
        }

        DirectPathConnection createConnection(String host, int port, String user, String tenant, String password, String schema, String table, int parallel, int thread, long maxErrorCount, ObLoadDupActionType action, long serverTimeout, long heartBeatTimeout) throws Exception {
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank(host), (String)"Host is null.(host=%s)", (Object)host);
            Preconditions.checkArgument((port > 0 && port < 65535 ? 1 : 0) != 0, (String)"Port is invalid.(port=%s)", (int)port);
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank(user), (String)"User Name is null.(user=%s)", (Object)user);
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank(tenant), (String)"Tenant Name is null.(tenant=%s)", (Object)tenant);
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank(schema), (String)"Schema Name is null.(schema=%s)", (Object)schema);
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank(table), (String)"Table Name is null.(table=%s)", (Object)table);
            Preconditions.checkArgument((parallel > 0 ? 1 : 0) != 0, (String)"Server Parallel is invalid.(--parallel=%s)", (int)parallel);
            Preconditions.checkArgument((maxErrorCount > -1L ? 1 : 0) != 0, (String)"MaxErrorCount is invalid.(--max-errors=%s)", (long)maxErrorCount);
            Preconditions.checkArgument((action != null ? 1 : 0) != 0, (String)"ObLoadDupActionType is null.(obLoadDupActionType=%s)", (Object)action);
            Preconditions.checkArgument((serverTimeout > 0L ? 1 : 0) != 0, (String)"Server timeout is invalid.(timeout=%s)", (long)serverTimeout);
            ObDirectLoadParameter parameter = new ObDirectLoadParameter();
            parameter.setParallel(parallel);
            parameter.setMaxErrorRowCount(maxErrorCount);
            parameter.setDupAction(action);
            parameter.setTimeout(serverTimeout);
            parameter.setHeartBeatTimeout(heartBeatTimeout);
            Properties props = new Properties();
            props.setProperty(Property.RPC_CONNECT_TIMEOUT.getKey(), "15000");
            props.setProperty(Property.RPC_EXECUTE_TIMEOUT.getKey(), "20000");
            props.setProperty(Property.SERVER_CONNECTION_POOL_SIZE.getKey(), String.valueOf(thread));
            props.setProperty(Property.RUNTIME_RETRY_TIMES.getKey(), "5");
            props.setProperty(Property.RUNTIME_RETRY_INTERVAL.getKey(), "50");
            props.setProperty(Property.NETTY_BUFFER_HIGH_WATERMARK.getKey(), String.valueOf(0x4000000));
            props.setProperty(Property.NETTY_BLOCKING_WAIT_INTERVAL.getKey(), "10");
            ObTable obTable = new ObTable.Builder(host, port).setLoginInfo(tenant, user, password, schema).setProperties(props).build();
            try {
                return new DirectPathConnection(new ObTableDirectLoad(obTable, table, parameter, true));
            }
            catch (Exception e) {
                obTable.close();
                throw e;
            }
        }
    }

    static enum State {
        INIT,
        BEGIN,
        COMMITTING,
        CLOSED;

    }
}

