/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.loaddump.directpath;

import com.alipay.oceanbase.rpc.property.Property;
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObLoadDupActionType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObTableLoadClientStatus;
import com.alipay.oceanbase.rpc.table.ObDirectLoadBucket;
import com.alipay.oceanbase.rpc.table.ObDirectLoadParameter;
import com.alipay.oceanbase.rpc.table.ObTable;
import com.alipay.oceanbase.rpc.table.ObTableDirectLoad;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.oceanbase.tools.loaddump.directpath.AbstractRestrictedConnection;
import com.oceanbase.tools.loaddump.directpath.DirectPathPreparedStatement;
import com.oceanbase.tools.loaddump.utils.StringUtils;
import com.oceanbase.tools.loaddump.utils.TimeUtils;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DirectPathConnection
extends AbstractRestrictedConnection {
    private static final Logger log = LoggerFactory.getLogger(DirectPathConnection.class);
    private static final int COMMIT_TIMEOUT = 3600000;
    private State state;
    private int commiters;
    private final int blocks;
    private final ObTableDirectLoad load;
    private final Object lock = new Object();

    private DirectPathConnection(ObTableDirectLoad load, int blocks) {
        this.load = load;
        this.blocks = blocks;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    DirectPathConnection begin() throws SQLException {
        Object object = this.lock;
        synchronized (object) {
            if (this.state == null || this.state == State.CLOSED) {
                try {
                    this.load.begin();
                    this.state = State.BEGIN;
                }
                catch (Exception ex) {
                    throw new SQLException(ex);
                }
            } else {
                throw new IllegalStateException("Begin transaction failed as connection state is already BEGIN");
            }
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void compareAndCommit(AtomicLong expect, AtomicLong actual) throws SQLException {
        Object object = this.lock;
        synchronized (object) {
            if (this.state == State.BEGIN) {
                ++this.commiters;
                if (this.commiters == this.blocks) {
                    long begin = System.nanoTime();
                    long timeoutNanos = TimeUnit.SECONDS.toNanos(60L);
                    while (expect.get() != actual.get()) {
                        TimeUtils.sleep(TimeUnit.MILLISECONDS, 50L);
                        if (begin <= 0L || System.nanoTime() - begin < timeoutNanos) continue;
                        throw new IllegalStateException("Wait client consuming timeout. (>60s)");
                    }
                    try {
                        this.load.commit();
                        log.info("Commit load task on table \"{}\". This might take a few minutes. Please wait...", (Object)this.getTableName());
                        this.waitServerCommit();
                    }
                    catch (Exception e) {
                        throw new SQLException(e);
                    }
                } else if (this.commiters > this.blocks) {
                    throw new IllegalStateException("Your commit have exceed the limit. (" + this.commiters + ">" + this.blocks + ")");
                }
            } else {
                throw new IllegalStateException("Commit transaction failed as connection state is not BEGIN");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rollback() throws SQLException {
        Object object = this.lock;
        synchronized (object) {
            if (this.state == State.BEGIN) {
                try {
                    this.load.abort();
                }
                catch (Exception ex) {
                    throw new SQLException(ex);
                }
            } else {
                throw new IllegalStateException("Rollback transaction failed as connection state is not BEGIN");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Object object = this.lock;
        synchronized (object) {
            if (this.state != null && this.state != State.CLOSED) {
                try {
                    ObTableLoadClientStatus status = this.load.getStatus();
                    if (status == ObTableLoadClientStatus.RUNNING || status == ObTableLoadClientStatus.ERROR) {
                        log.warn("Unexpected status: {}, aborting load task on table \"{}\"...", (Object)this.load.getStatus(), (Object)this.getTableName());
                        this.load.abort();
                    }
                }
                catch (Exception e) {
                    log.error("Close direct load connection failed. Error: {}", (Throwable)e);
                }
                this.load.getTable().close();
                this.state = State.CLOSED;
            }
        }
    }

    @Override
    public DirectPathPreparedStatement createStatement() throws SQLException {
        return this.prepareStatement(null);
    }

    @Override
    public DirectPathPreparedStatement prepareStatement(String sql) throws SQLException {
        if (this.state == State.BEGIN) {
            return new DirectPathPreparedStatement(this);
        }
        throw new IllegalStateException("Create statement failed as connection state is not BEGIN");
    }

    @Override
    public void commit() throws SQLException {
        throw new NotImplementedException("Method not implemented");
    }

    @Override
    public String getSchema() {
        if (this.state == State.BEGIN) {
            return this.load.getTable().getDatabase();
        }
        throw new IllegalStateException("Get schema failed as connection state is not BEGIN");
    }

    public String getTableName() {
        if (this.state == State.BEGIN) {
            return this.load.getTableName();
        }
        throw new IllegalStateException("Get table failed as connection state is not BEGIN");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isClosed() {
        Object object = this.lock;
        synchronized (object) {
            return this.state == State.CLOSED;
        }
    }

    int[] insert(ObDirectLoadBucket bucket) throws SQLException {
        try {
            this.load.insert(bucket);
            int[] result = new int[bucket.getRowCount()];
            Arrays.fill(result, 1);
            return result;
        }
        catch (Exception ex) {
            throw new SQLException(ex);
        }
    }

    private void waitServerCommit() throws SQLException {
        try {
            Stopwatch stopwatch = Stopwatch.createStarted();
            long begin = System.nanoTime();
            long timeoutNanos = TimeUnit.MILLISECONDS.toNanos(3600000L);
            ObTableLoadClientStatus status = null;
            block9: while (true) {
                status = this.load.getStatus();
                switch (status) {
                    case ABORT: {
                        throw new IllegalStateException("Load aborted with error code " + this.load.getErrorCode());
                    }
                    case ERROR: 
                    case COMMITTING: {
                        break;
                    }
                    case COMMIT: {
                        break block9;
                    }
                    default: {
                        this.load.abort();
                        break block9;
                    }
                }
                if (begin > 0L && System.nanoTime() - begin >= timeoutNanos) {
                    throw new IllegalStateException("Wait server abort timeout. (>3600000ms)");
                }
                try {
                    Thread.sleep(200L);
                    begin += TimeUnit.MILLISECONDS.toNanos(50L);
                }
                catch (InterruptedException e) {
                    this.load.abort();
                    throw new IllegalStateException("Unexpected error occurred: " + e.getMessage());
                }
            }
            log.info("Load task on table \"{}\" is successfully committed! Elapsed: {}", (Object)this.getTableName(), (Object)stopwatch.stop());
        }
        catch (Exception ex) {
            throw new SQLException(ex);
        }
    }

    public static class Builder {
        private String host;
        private int port;
        private String user;
        private String tenant;
        private String password;
        private String schema;
        private String table;
        private int blocks;
        private int parallel;
        private long maxErrorCount;
        private ObLoadDupActionType duplicateKeyAction;
        private long serverTimeout;

        public Builder host(String host) {
            this.host = host;
            return this;
        }

        public Builder port(int port) {
            this.port = port;
            return this;
        }

        public Builder user(String user) {
            this.user = user;
            return this;
        }

        public Builder tenant(String tenant) {
            this.tenant = tenant;
            return this;
        }

        public Builder password(String password) {
            this.password = password;
            return this;
        }

        public Builder schema(String schema) {
            this.schema = schema;
            return this;
        }

        public Builder table(String table) {
            this.table = table;
            return this;
        }

        public Builder blocks(int blocks) {
            this.blocks = blocks;
            return this;
        }

        public Builder parallel(int parallel) {
            this.parallel = parallel;
            return this;
        }

        public Builder maxErrorCount(long maxErrorCount) {
            this.maxErrorCount = maxErrorCount;
            return this;
        }

        public Builder duplicateKeyAction(ObLoadDupActionType duplicateKeyAction) {
            this.duplicateKeyAction = duplicateKeyAction;
            return this;
        }

        public Builder serverTimeout(long serverTimeout) {
            this.serverTimeout = serverTimeout;
            return this;
        }

        public DirectPathConnection build() throws Exception {
            return this.createConnection(this.host, this.port, this.user, this.tenant, this.password, this.schema, this.table, this.blocks, this.parallel, this.maxErrorCount, this.duplicateKeyAction, this.serverTimeout).begin();
        }

        DirectPathConnection createConnection(String host, int port, String user, String tenant, String password, String schema, String table, int blocks, int parallel, long maxErrorCount, ObLoadDupActionType action, long serverTimeout) throws Exception {
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank(host), (String)"Host is null.(host=%s)", (Object)host);
            Preconditions.checkArgument((port > 0 && port < 65535 ? 1 : 0) != 0, (String)"Port is invalid.(port=%s)", (int)port);
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank(user), (String)"User Name is null.(user=%s)", (Object)user);
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank(tenant), (String)"Tenant Name is null.(tenant=%s)", (Object)tenant);
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank(schema), (String)"Schema Name is null.(schema=%s)", (Object)schema);
            Preconditions.checkArgument((boolean)StringUtils.isNotBlank(table), (String)"Table Name is null.(table=%s)", (Object)table);
            Preconditions.checkArgument((blocks > 0 ? 1 : 0) != 0, (String)"Client Blocks is invalid.(blocks=%s)", (int)blocks);
            Preconditions.checkArgument((parallel > 0 ? 1 : 0) != 0, (String)"Server Parallel is invalid.(parallel=%s)", (int)parallel);
            Preconditions.checkArgument((maxErrorCount > -1L ? 1 : 0) != 0, (String)"MaxErrorCount is invalid.(maxErrorCount=%s)", (long)maxErrorCount);
            Preconditions.checkArgument((action != null ? 1 : 0) != 0, (String)"ObLoadDupActionType is null.(obLoadDupActionType=%s)", (Object)action);
            Preconditions.checkArgument((serverTimeout > 0L ? 1 : 0) != 0, (String)"Server timeout is invalid.(timeout=%s)", (long)serverTimeout);
            ObDirectLoadParameter parameter = new ObDirectLoadParameter();
            parameter.setParallel(parallel);
            parameter.setMaxErrorRowCount(maxErrorCount);
            parameter.setDupAction(action);
            parameter.setTimeout(serverTimeout);
            Properties props = new Properties();
            props.setProperty(Property.RPC_CONNECT_TIMEOUT.getKey(), "3000");
            props.setProperty(Property.RPC_EXECUTE_TIMEOUT.getKey(), "10000");
            ObTable obTable = new ObTable.Builder(host, port).setLoginInfo(tenant, user, password, schema).setProperties(props).build();
            return new DirectPathConnection(new ObTableDirectLoad(obTable, table, parameter, true), blocks);
        }
    }

    static enum State {
        BEGIN,
        CLOSED;

    }
}

