/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hudi.org.apache.hadoop.hbase.RegionLocations;
import org.apache.hudi.org.apache.hadoop.hbase.ServerName;
import org.apache.hudi.org.apache.hadoop.hbase.TableName;
import org.apache.hudi.org.apache.hadoop.hbase.client.AbstractResponse;
import org.apache.hudi.org.apache.hadoop.hbase.client.Action;
import org.apache.hudi.org.apache.hadoop.hbase.client.Append;
import org.apache.hudi.org.apache.hadoop.hbase.client.AsyncProcessTask;
import org.apache.hudi.org.apache.hadoop.hbase.client.AsyncRequestFuture;
import org.apache.hudi.org.apache.hadoop.hbase.client.AsyncRequestFutureImpl;
import org.apache.hudi.org.apache.hadoop.hbase.client.CancellableRegionServerCallable;
import org.apache.hudi.org.apache.hadoop.hbase.client.CheckAndMutate;
import org.apache.hudi.org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hudi.org.apache.hadoop.hbase.client.ConnectionImplementation;
import org.apache.hudi.org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hudi.org.apache.hadoop.hbase.client.Consistency;
import org.apache.hudi.org.apache.hadoop.hbase.client.Get;
import org.apache.hudi.org.apache.hadoop.hbase.client.Increment;
import org.apache.hudi.org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hudi.org.apache.hadoop.hbase.client.Mutation;
import org.apache.hudi.org.apache.hadoop.hbase.client.NonceGenerator;
import org.apache.hudi.org.apache.hadoop.hbase.client.Put;
import org.apache.hudi.org.apache.hadoop.hbase.client.RequestController;
import org.apache.hudi.org.apache.hadoop.hbase.client.RequestControllerFactory;
import org.apache.hudi.org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hudi.org.apache.hadoop.hbase.client.Row;
import org.apache.hudi.org.apache.hadoop.hbase.client.RowAccess;
import org.apache.hudi.org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hudi.org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hudi.org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hudi.org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hudi.org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
class AsyncProcess {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncProcess.class);
    private static final AtomicLong COUNTER = new AtomicLong();
    public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
    public static final String START_LOG_ERRORS_AFTER_COUNT_KEY = "hbase.client.start.log.errors.counter";
    public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 5;
    public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
    private static final AsyncRequestFuture NO_REQS_RESULT = new AsyncRequestFuture(){
        final Object[] result = new Object[0];

        @Override
        public boolean hasError() {
            return false;
        }

        @Override
        public RetriesExhaustedWithDetailsException getErrors() {
            return null;
        }

        @Override
        public List<? extends Row> getFailedOperations() {
            return null;
        }

        @Override
        public Object[] getResults() {
            return this.result;
        }

        @Override
        public void waitUntilDone() throws InterruptedIOException {
        }
    };
    final long id;
    final ClusterConnection connection;
    private final RpcRetryingCallerFactory rpcCallerFactory;
    final RpcControllerFactory rpcFactory;
    final int startLogErrorsCnt;
    final long pause;
    final long pauseForCQTBE;
    final int numTries;
    long serverTrackerTimeout;
    final long primaryCallTimeoutMicroseconds;
    final boolean logBatchErrorDetails;
    final RequestController requestController;
    public static final String LOG_DETAILS_PERIOD = "hbase.client.log.detail.period.ms";
    private static final int DEFAULT_LOG_DETAILS_PERIOD = 10000;
    private final int periodToLog;

    AsyncProcess(ClusterConnection hc, Configuration conf, RpcRetryingCallerFactory rpcCaller, RpcControllerFactory rpcFactory) {
        if (hc == null) {
            throw new IllegalArgumentException("ClusterConnection cannot be null.");
        }
        this.connection = hc;
        this.id = COUNTER.incrementAndGet();
        this.pause = conf.getLong("hbase.client.pause", 100L);
        long configuredPauseForCQTBE = conf.getLong("hbase.client.pause.cqtbe", this.pause);
        if (configuredPauseForCQTBE < this.pause) {
            LOG.warn("The hbase.client.pause.cqtbe setting: " + configuredPauseForCQTBE + " is smaller than " + "hbase.client.pause" + ", will use " + this.pause + " instead.");
            this.pauseForCQTBE = this.pause;
        } else {
            this.pauseForCQTBE = configuredPauseForCQTBE;
        }
        this.numTries = conf.getInt("hbase.client.retries.number", 15) + 1;
        this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
        this.startLogErrorsCnt = conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 5);
        this.periodToLog = conf.getInt(LOG_DETAILS_PERIOD, 10000);
        this.serverTrackerTimeout = 0L;
        for (int i = 0; i < this.numTries; ++i) {
            this.serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
        }
        this.rpcCallerFactory = rpcCaller;
        this.rpcFactory = rpcFactory;
        this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
        this.requestController = RequestControllerFactory.create(conf);
    }

    public <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task) throws InterruptedIOException {
        AsyncRequestFuture reqFuture = this.checkTask(task);
        if (reqFuture != null) {
            return reqFuture;
        }
        AsyncProcessTask.SubmittedRows submittedRows = task.getSubmittedRows() == null ? AsyncProcessTask.SubmittedRows.ALL : task.getSubmittedRows();
        switch (submittedRows) {
            case ALL: {
                return this.submitAll(task);
            }
            case AT_LEAST_ONE: {
                return this.submit(task, true);
            }
        }
        return this.submit(task, false);
    }

    private <CResult> AsyncRequestFuture submit(AsyncProcessTask<CResult> task, boolean atLeastOne) throws InterruptedIOException {
        TableName tableName = task.getTableName();
        RowAccess<Row> rows = task.getRowAccess();
        HashMap<ServerName, MultiAction> actionsByServer = new HashMap<ServerName, MultiAction>();
        ArrayList<Action> retainedActions = new ArrayList<Action>(rows.size());
        NonceGenerator ng = this.connection.getNonceGenerator();
        long nonceGroup = ng.getNonceGroup();
        ArrayList<Exception> locationErrors = null;
        ArrayList<Integer> locationErrorRows = null;
        RequestController.Checker checker = this.requestController.newChecker();
        boolean firstIter = true;
        do {
            this.requestController.waitForFreeSlot(this.id, this.periodToLog, this.getLogger(tableName, -1L));
            int posInList = -1;
            if (!firstIter) {
                checker.reset();
            }
            Iterator it = rows.iterator();
            while (it.hasNext()) {
                int priority;
                HRegionLocation loc;
                Row r = (Row)it.next();
                try {
                    if (r == null) {
                        throw new IllegalArgumentException("#" + this.id + ", row cannot be null");
                    }
                    RegionLocations locs = this.connection.locateRegion(tableName, r.getRow(), true, true, 0);
                    if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
                        throw new IOException("#" + this.id + ", no location found, aborting submit for tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
                    }
                    loc = locs.getDefaultRegionLocation();
                }
                catch (IOException ex) {
                    locationErrors = new ArrayList<Exception>(1);
                    locationErrorRows = new ArrayList<Integer>(1);
                    LOG.error("Failed to get region location ", (Throwable)ex);
                    priority = 0;
                    if (r instanceof Mutation) {
                        priority = ((Mutation)r).getPriority();
                    }
                    retainedActions.add(new Action(r, ++posInList, priority));
                    locationErrors.add(ex);
                    locationErrorRows.add(posInList);
                    it.remove();
                    break;
                }
                RequestController.ReturnCode code = checker.canTakeRow(loc, r);
                if (code == RequestController.ReturnCode.END) break;
                if (code != RequestController.ReturnCode.INCLUDE) continue;
                priority = 0;
                if (r instanceof Mutation) {
                    priority = ((Mutation)r).getPriority();
                }
                Action action = new Action(r, ++posInList, priority);
                this.setNonce(ng, r, action);
                retainedActions.add(action);
                byte[] regionName = loc.getRegionInfo().getRegionName();
                AsyncProcess.addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup);
                it.remove();
            }
            firstIter = false;
        } while (retainedActions.isEmpty() && atLeastOne && locationErrors == null);
        if (retainedActions.isEmpty()) {
            return NO_REQS_RESULT;
        }
        return this.submitMultiActions(task, retainedActions, nonceGroup, locationErrors, locationErrorRows, actionsByServer);
    }

    <CResult> AsyncRequestFuture submitMultiActions(AsyncProcessTask task, List<Action> retainedActions, long nonceGroup, List<Exception> locationErrors, List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer) {
        AsyncRequestFutureImpl<CResult> ars = this.createAsyncRequestFuture(task, retainedActions, nonceGroup);
        if (locationErrors != null) {
            for (int i = 0; i < locationErrors.size(); ++i) {
                int originalIndex = locationErrorRows.get(i);
                Row row = retainedActions.get(originalIndex).getAction();
                ars.manageError(originalIndex, row, AsyncRequestFutureImpl.Retry.NO_LOCATION_PROBLEM, locationErrors.get(i), null);
            }
        }
        ars.sendMultiAction(actionsByServer, 1, null, false);
        return ars;
    }

    static void addAction(ServerName server, byte[] regionName, Action action, Map<ServerName, MultiAction> actionsByServer, long nonceGroup) {
        MultiAction multiAction = actionsByServer.get(server);
        if (multiAction == null) {
            multiAction = new MultiAction();
            actionsByServer.put(server, multiAction);
        }
        if (action.hasNonce() && !multiAction.hasNonceGroup()) {
            multiAction.setNonceGroup(nonceGroup);
        }
        multiAction.add(regionName, action);
    }

    private <CResult> AsyncRequestFuture submitAll(AsyncProcessTask task) {
        RowAccess<Row> rows = task.getRowAccess();
        ArrayList<Action> actions = new ArrayList<Action>(rows.size());
        int posInList = -1;
        NonceGenerator ng = this.connection.getNonceGenerator();
        int highestPriority = -1;
        for (Row r : rows) {
            ++posInList;
            if (r instanceof Put) {
                Put put2 = (Put)r;
                if (put2.isEmpty()) {
                    throw new IllegalArgumentException("No columns to insert for #" + (posInList + 1) + " item");
                }
                highestPriority = Math.max(put2.getPriority(), highestPriority);
            }
            Action action = new Action(r, posInList, highestPriority);
            this.setNonce(ng, r, action);
            actions.add(action);
        }
        AsyncRequestFutureImpl<CResult> ars = this.createAsyncRequestFuture(task, actions, ng.getNonceGroup());
        ars.groupAndSendMultiAction(actions, 1);
        return ars;
    }

    private <CResult> AsyncRequestFuture checkTask(AsyncProcessTask<CResult> task) {
        if (task.getRowAccess() == null || task.getRowAccess().isEmpty()) {
            return NO_REQS_RESULT;
        }
        Objects.requireNonNull(task.getPool(), "The pool can't be NULL");
        this.checkOperationTimeout(task.getOperationTimeout());
        this.checkRpcTimeout(task.getRpcTimeout());
        return null;
    }

    private void setNonce(NonceGenerator ng, Row r, Action action) {
        if (AsyncProcess.hasIncrementOrAppend(r)) {
            action.setNonce(ng.newNonce());
        }
    }

    private static boolean hasIncrementOrAppend(Row action) {
        if (action instanceof Append || action instanceof Increment) {
            return true;
        }
        if (action instanceof RowMutations) {
            return AsyncProcess.hasIncrementOrAppend((RowMutations)action);
        }
        if (action instanceof CheckAndMutate) {
            return AsyncProcess.hasIncrementOrAppend(((CheckAndMutate)action).getAction());
        }
        return false;
    }

    private static boolean hasIncrementOrAppend(RowMutations mutations) {
        for (Mutation mutation : mutations.getMutations()) {
            if (!(mutation instanceof Append) && !(mutation instanceof Increment)) continue;
            return true;
        }
        return false;
    }

    private int checkTimeout(String name, int timeout) {
        if (timeout < 0) {
            throw new RuntimeException("The " + name + " must be bigger than zero,current value is" + timeout);
        }
        return timeout;
    }

    private int checkOperationTimeout(int operationTimeout) {
        return this.checkTimeout("operation timeout", operationTimeout);
    }

    private int checkRpcTimeout(int rpcTimeout) {
        return this.checkTimeout("rpc timeout", rpcTimeout);
    }

    <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(AsyncProcessTask task, List<Action> actions, long nonceGroup) {
        return new AsyncRequestFutureImpl(task, actions, nonceGroup, this);
    }

    protected void waitForMaximumCurrentTasks(int max, TableName tableName) throws InterruptedIOException {
        this.requestController.waitForMaximumCurrentTasks(max, this.id, this.periodToLog, this.getLogger(tableName, max));
    }

    private Consumer<Long> getLogger(TableName tableName, long max) {
        return currentInProgress -> LOG.info("#" + this.id + (max < 0L ? ", waiting for any free slot" : ", waiting for some tasks to finish. Expected max=" + max) + ", tasksInProgress=" + currentInProgress + (tableName == null ? "" : ", tableName=" + tableName));
    }

    void incTaskCounters(Collection<byte[]> regions, ServerName sn) {
        this.requestController.incTaskCounters(regions, sn);
    }

    void decTaskCounters(Collection<byte[]> regions, ServerName sn) {
        this.requestController.decTaskCounters(regions, sn);
    }

    protected RpcRetryingCaller<AbstractResponse> createCaller(CancellableRegionServerCallable callable, int rpcTimeout) {
        return this.rpcCallerFactory.newCaller(this.checkRpcTimeout(rpcTimeout));
    }

    ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
        return new ConnectionImplementation.ServerErrorTracker(this.serverTrackerTimeout, this.numTries);
    }

    static boolean isReplicaGet(Row row) {
        return row instanceof Get && ((Get)row).getConsistency() == Consistency.TIMELINE;
    }
}

