/*
 * Decompiled with CFR 0.152.
 */
package com.pingcap.tikv.operation.iterator;

import com.pingcap.tidb.tipb.Chunk;
import com.pingcap.tidb.tipb.DAGRequest;
import com.pingcap.tidb.tipb.EncodeType;
import com.pingcap.tidb.tipb.SelectResponse;
import com.pingcap.tikv.ClientSession;
import com.pingcap.tikv.TiFlashClient;
import com.pingcap.tikv.meta.TiDAGRequest;
import com.pingcap.tikv.operation.SchemaInfer;
import com.pingcap.tikv.operation.iterator.CoprocessorIterator;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorCompletionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.exception.RegionTaskException;
import org.tikv.common.exception.TiClientInternalException;
import org.tikv.common.region.RegionStoreClient;
import org.tikv.common.region.TiRegion;
import org.tikv.common.region.TiStore;
import org.tikv.common.region.TiStoreType;
import org.tikv.common.util.BackOffFunction;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Coprocessor;

public abstract class DAGIterator<T>
extends CoprocessorIterator<T> {
    private static final Logger logger = LoggerFactory.getLogger((String)DAGIterator.class.getName());
    private final TiDAGRequest.PushDownType pushDownType;
    private final TiStoreType storeType;
    private final long startTs;
    protected EncodeType encodeType;
    private ExecutorCompletionService<Iterator<SelectResponse>> streamingService;
    private ExecutorCompletionService<SelectResponse> dagService;
    private SelectResponse response;
    private Iterator<SelectResponse> responseIterator;

    DAGIterator(DAGRequest req, List<RangeSplitter.RegionTask> regionTasks, ClientSession clientSession, SchemaInfer infer, TiDAGRequest.PushDownType pushDownType, TiStoreType storeType, long startTs) {
        super(req, regionTasks, clientSession, infer);
        this.pushDownType = pushDownType;
        this.storeType = storeType;
        this.startTs = startTs;
        switch (pushDownType) {
            case NORMAL: {
                this.dagService = new ExecutorCompletionService(clientSession.getTiKVSession().getThreadPoolForTableScan());
                break;
            }
            case STREAMING: {
                this.streamingService = new ExecutorCompletionService(clientSession.getTiKVSession().getThreadPoolForTableScan());
            }
        }
        this.submitTasks();
    }

    @Override
    void submitTasks() {
        for (RangeSplitter.RegionTask task : this.regionTasks) {
            switch (this.pushDownType) {
                case STREAMING: {
                    this.streamingService.submit(() -> this.processByStreaming(task));
                    break;
                }
                case NORMAL: {
                    this.dagService.submit(() -> this.process(task));
                }
            }
        }
    }

    @Override
    public boolean hasNext() {
        if (this.eof) {
            return false;
        }
        while (this.chunkList == null || this.chunkIndex >= this.chunkList.size() || this.dataInput.available() <= 0) {
            if (this.tryAdvanceChunkIndex()) {
                this.createDataInputReader();
                continue;
            }
            if (!(this.pushDownType == TiDAGRequest.PushDownType.STREAMING ? !this.advanceNextResponse() && !this.readNextRegionChunks() : !this.readNextRegionChunks())) continue;
            return false;
        }
        return true;
    }

    private boolean hasMoreResponse() {
        switch (this.pushDownType) {
            case STREAMING: {
                return this.responseIterator != null && this.responseIterator.hasNext();
            }
            case NORMAL: {
                return this.response != null;
            }
        }
        throw new IllegalArgumentException("Invalid push down type:" + (Object)((Object)this.pushDownType));
    }

    private boolean advanceNextResponse() {
        if (!this.hasMoreResponse()) {
            return false;
        }
        switch (this.pushDownType) {
            case STREAMING: {
                SelectResponse resp = this.responseIterator.next();
                this.chunkList = resp.getChunksList();
                this.encodeType = resp.getEncodeType();
                break;
            }
            case NORMAL: {
                this.chunkList = this.response.getChunksList();
                this.encodeType = this.response.getEncodeType();
            }
        }
        if (this.chunkList == null || this.chunkList.isEmpty()) {
            return false;
        }
        this.chunkIndex = 0;
        this.createDataInputReader();
        return true;
    }

    private boolean readNextRegionChunks() {
        while (this.hasNextRegionTask()) {
            if (!this.doReadNextRegionChunks()) continue;
            return true;
        }
        return false;
    }

    private boolean hasNextRegionTask() {
        return !this.eof && this.regionTasks != null && this.taskIndex < this.regionTasks.size();
    }

    private boolean doReadNextRegionChunks() {
        try {
            switch (this.pushDownType) {
                case STREAMING: {
                    this.responseIterator = this.streamingService.take().get();
                    break;
                }
                case NORMAL: {
                    this.response = this.dagService.take().get();
                }
            }
        }
        catch (Exception e) {
            throw new TiClientInternalException("Error reading region:", e);
        }
        ++this.taskIndex;
        return this.advanceNextResponse();
    }

    private SelectResponse process(RangeSplitter.RegionTask regionTask) {
        ArrayDeque<RangeSplitter.RegionTask> remainTasks = new ArrayDeque<RangeSplitter.RegionTask>();
        ArrayDeque<SelectResponse> responseQueue = new ArrayDeque<SelectResponse>();
        remainTasks.add(regionTask);
        ConcreteBackOffer backOffer = ConcreteBackOffer.newCopNextMaxBackOff();
        HashSet<Long> resolvedLocks = new HashSet<Long>();
        ConcreteBackOffer storeUnreachableBackOffer = ConcreteBackOffer.newCustomBackOff(60000);
        while (!remainTasks.isEmpty()) {
            RangeSplitter.RegionTask task = (RangeSplitter.RegionTask)remainTasks.poll();
            if (task == null) continue;
            List<Coprocessor.KeyRange> ranges = task.getRanges();
            TiRegion region = task.getRegion();
            TiStore store = task.getStore();
            try {
                if (store == null || !store.isReachable()) {
                    storeUnreachableBackOffer.doBackOffWithMaxSleep(BackOffFunction.BackOffFuncType.BoServerBusy, 2000L, new TiClientInternalException("retry timeout: store is null or unreachable"));
                    if (store == null) {
                        logger.warn("TiKV store is null, invalid cache and retry");
                    } else {
                        logger.warn("TiKV store " + store.getAddress() + " is unreachable, invalid cache and retry");
                    }
                    this.clientSession.getTiKVSession().getRegionManager().invalidateRegion(region);
                    try {
                        remainTasks.addAll(RangeSplitter.newSplitter(this.clientSession.getTiKVSession().getRegionManager()).splitRangeByRegion(ranges, this.storeType));
                    }
                    catch (Exception e) {
                        logger.warn("split range by region error, retry with the original task", (Throwable)e);
                        remainTasks.add(task);
                    }
                    continue;
                }
                RegionStoreClient client = this.clientSession.getTiKVSession().getRegionStoreClientBuilder().build(region, store, this.storeType);
                if (this.storeType == TiStoreType.TiFlash && !this.isMppStoreAlive(store.getAddress(), this.clientSession.getConf().getHealthCheckTimeout()).booleanValue()) {
                    logger.info("Re-splitting region task due to TiFlash is unavailable");
                    remainTasks.addAll(RangeSplitter.newSplitter(this.clientSession.getTiKVSession().getRegionManager()).splitRangeByRegion(ranges, this.storeType));
                    continue;
                }
                client.addResolvedLocks(this.startTs, resolvedLocks);
                List<RangeSplitter.RegionTask> tasks = client.coprocess(backOffer, this.dagRequest, ranges, responseQueue, this.startTs);
                if (tasks != null) {
                    remainTasks.addAll(tasks);
                }
                resolvedLocks.addAll(client.getResolvedLocks(this.startTs));
            }
            catch (Throwable e) {
                logger.error("Process region tasks failed, remain " + remainTasks.size() + " tasks not executed due to", e);
                throw new RegionTaskException("Handle region task failed:", e);
            }
        }
        ArrayList<Chunk> resultChunk = new ArrayList<Chunk>();
        EncodeType encodeType = null;
        while (!responseQueue.isEmpty()) {
            SelectResponse response = (SelectResponse)responseQueue.poll();
            if (response == null) continue;
            encodeType = response.getEncodeType();
            resultChunk.addAll(response.getChunksList());
        }
        return SelectResponse.newBuilder().addAllChunks(resultChunk).setEncodeType(encodeType).build();
    }

    private Iterator<SelectResponse> processByStreaming(RangeSplitter.RegionTask regionTask) {
        List<Coprocessor.KeyRange> ranges = regionTask.getRanges();
        TiRegion region = regionTask.getRegion();
        TiStore store = regionTask.getStore();
        try {
            RegionStoreClient client = this.clientSession.getTiKVSession().getRegionStoreClientBuilder().build(region, store, this.storeType);
            Iterator<SelectResponse> responseIterator = client.coprocessStreaming(this.dagRequest, ranges, this.startTs);
            if (responseIterator == null) {
                this.eof = true;
                return null;
            }
            return responseIterator;
        }
        catch (Exception e) {
            throw new TiClientInternalException("Error Closing Store client.", e);
        }
    }

    public Boolean isMppStoreAlive(String address, int timeout) {
        try {
            Map<String, Boolean> storeStatusCache = this.clientSession.getStoreStatusCache();
            return storeStatusCache.computeIfAbsent(address, key -> TiFlashClient.isMppAlive(this.clientSession.getTiKVSession().getChannelFactory().getChannel(address, this.clientSession.getTiKVSession().getPDClient().getHostMapping()), timeout));
        }
        catch (Exception e) {
            throw new TiClientInternalException("Error get MppStore Status.", e);
        }
    }
}

