/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.storage.stream.rpc;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.zip.DataFormatException;
import org.apache.commons.codec.binary.Base64;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.metadata.filter.IFilterCodeSystem;
import org.apache.kylin.metadata.filter.StringCodeSystem;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilterSerializer;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.shaded.com.google.common.base.Stopwatch;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.storage.stream.rpc.IStreamDataSearchClient;
import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
import org.apache.kylin.stream.core.model.DataRequest;
import org.apache.kylin.stream.core.model.DataResponse;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.query.ResponseResultSchema;
import org.apache.kylin.stream.core.query.StreamingTupleConverter;
import org.apache.kylin.stream.core.query.StreamingTupleIterator;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.apache.kylin.stream.core.util.RecordsSerializer;
import org.apache.kylin.stream.core.util.RestService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpStreamDataSearchClient
implements IStreamDataSearchClient {
    public static final Logger logger = LoggerFactory.getLogger(HttpStreamDataSearchClient.class);
    public static final long WAIT_DURATION = 120000L;
    private static ExecutorService executorService = new ThreadPoolExecutor(20, 100, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), (ThreadFactory)new NamedThreadFactory("stream-rpc-pool-t"));
    private AssignmentsCache assignmentsCache;
    private RestService restService;
    private Map<Node, Long> failedReceivers = Maps.newConcurrentMap();

    public HttpStreamDataSearchClient() {
        this.assignmentsCache = AssignmentsCache.getInstance();
        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
        int connectionTimeout = kylinConfig.getStreamingRPCHttpConnTimeout();
        int readTimeout = kylinConfig.getStreamingRPCHttpReadTimeout();
        this.restService = new RestService(connectionTimeout, readTimeout);
    }

    @Override
    public ITupleIterator search(long minSegmentTime, final CubeInstance cube, final TupleInfo tupleInfo, TupleFilter tupleFilter, Set<TblColRef> dimensions, Set<TblColRef> groups, Set<FunctionDesc> metrics, int storagePushDownLimit, boolean allowStorageAggregation) {
        List replicaSetsOfCube = this.assignmentsCache.getReplicaSetsByCube(cube.getName());
        int timeout = cube.getConfig().getStreamingRPCHttpReadTimeout() * 2;
        final QueuedStreamingTupleIterator result = new QueuedStreamingTupleIterator(replicaSetsOfCube.size(), timeout);
        QueryContext query = QueryContextFacade.current();
        CubeDesc cubeDesc = cube.getDescriptor();
        ResponseResultSchema schema = new ResponseResultSchema(cubeDesc, dimensions, metrics);
        final StreamingTupleConverter tupleConverter = new StreamingTupleConverter(schema, tupleInfo);
        final RecordsSerializer recordsSerializer = new RecordsSerializer(schema);
        final DataRequest dataRequest = this.createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime, tupleInfo, tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation);
        logger.info("Query-{}:send request to stream receivers", (Object)query.getQueryId());
        for (final ReplicaSet rs : replicaSetsOfCube) {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        Iterator<ITuple> tuplesBlock = HttpStreamDataSearchClient.this.search(dataRequest, cube, tupleConverter, recordsSerializer, rs, tupleInfo);
                        result.addBlock(tuplesBlock);
                    }
                    catch (Exception e) {
                        result.setEndpointException(e);
                    }
                }
            });
        }
        return result;
    }

    public Iterator<ITuple> search(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter, RecordsSerializer recordsSerializer, ReplicaSet rs, TupleInfo tupleInfo) throws Exception {
        ArrayList receivers = Lists.newArrayList((Iterable)rs.getNodes());
        Node queryReceiver = this.findBestReceiverServeQuery(receivers, cube.getName());
        try {
            return this.doSearch(dataRequest, cube, tupleConverter, recordsSerializer, queryReceiver, tupleInfo);
        }
        catch (IOException e) {
            IOException exception = e;
            this.failedReceivers.put(queryReceiver, System.currentTimeMillis());
            logger.error("exception throws for receiver:" + queryReceiver + " retry another receiver");
            for (int i = 0; i < receivers.size(); ++i) {
                Node receiver = (Node)receivers.get(i);
                if (receiver.equals((Object)queryReceiver)) continue;
                try {
                    return this.doSearch(dataRequest, cube, tupleConverter, recordsSerializer, receiver, tupleInfo);
                }
                catch (IOException e2) {
                    exception = e2;
                    this.failedReceivers.put(receiver, System.currentTimeMillis());
                    logger.error("exception throws for receiver:" + receiver + " retry another receiver");
                }
            }
            throw exception;
        }
    }

    private Node findBestReceiverServeQuery(List<Node> receivers, String cubeName) {
        int receiversSize = receivers.size();
        int receiverNo = Math.abs(cubeName.hashCode()) % receiversSize;
        Node foundReceiver = receivers.get(receiverNo);
        Long lastFailTime = this.failedReceivers.get(foundReceiver);
        if (lastFailTime == null) {
            return foundReceiver;
        }
        if (System.currentTimeMillis() - lastFailTime > 120000L) {
            return foundReceiver;
        }
        return receivers.get((receiverNo + 1) % receiversSize);
    }

    public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter, RecordsSerializer recordsSerializer, Node receiver, TupleInfo tupleInfo) throws Exception {
        String queryId = dataRequest.getQueryId();
        String url = "http://" + receiver.getHost() + ":" + receiver.getPort() + "/kylin/api/data/query";
        try {
            int connTimeout = cube.getConfig().getStreamingRPCHttpConnTimeout();
            int readTimeout = cube.getConfig().getStreamingRPCHttpReadTimeout();
            dataRequest.setDeadline(System.currentTimeMillis() + (long)((int)((double)readTimeout * 1.5)));
            String content = JsonUtil.writeValueAsString((Object)dataRequest);
            Stopwatch sw = Stopwatch.createUnstarted();
            sw.start();
            String msg = this.restService.postRequest(url, content, connTimeout, readTimeout);
            logger.info("query-{}: receive response from {} take time:{}", new Object[]{queryId, receiver, sw.elapsed(TimeUnit.MILLISECONDS)});
            if (this.failedReceivers.containsKey(receiver)) {
                this.failedReceivers.remove(receiver);
            }
            DataResponse response = (DataResponse)JsonUtil.readValue((String)msg, DataResponse.class);
            logger.info("query-{}: receiver {} profile info:{}", new Object[]{queryId, receiver, response.getProfile()});
            return this.deserializeResponse(tupleConverter, recordsSerializer, cube.getName(), tupleInfo, response);
        }
        catch (Exception e) {
            logger.error("error when search data from receiver:" + url, (Throwable)e);
            throw e;
        }
    }

    public Iterator<ITuple> deserializeResponse(StreamingTupleConverter tupleConverter, RecordsSerializer recordsSerializer, String cubeName, TupleInfo tupleInfo, DataResponse response) throws IOException, DataFormatException {
        Iterator records = recordsSerializer.deserialize(Base64.decodeBase64((String)response.getData()));
        return new StreamingTupleIterator(records, tupleConverter, tupleInfo);
    }

    private DataRequest createDataRequest(String queryId, String cubeName, long minSegmentTime, TupleInfo tupleInfo, TupleFilter tupleFilter, Set<TblColRef> dimensions, Set<TblColRef> groups, Set<FunctionDesc> metrics, int storagePushDownLimit, boolean allowStorageAggregation) {
        DataRequest request = new DataRequest();
        request.setCubeName(cubeName);
        request.setQueryId(queryId);
        request.setMinSegmentTime(minSegmentTime);
        request.setTupleFilter(Base64.encodeBase64String((byte[])TupleFilterSerializer.serialize((TupleFilter)tupleFilter, (IFilterCodeSystem)StringCodeSystem.INSTANCE)));
        request.setStoragePushDownLimit(storagePushDownLimit);
        request.setAllowStorageAggregation(allowStorageAggregation);
        request.setRequestSendTime(System.currentTimeMillis());
        request.setEnableDetailProfile(BackdoorToggles.isStreamingProfileEnable());
        request.setStorageBehavior(BackdoorToggles.getCoprocessorBehavior());
        HashSet dimensionSet = Sets.newHashSet();
        for (TblColRef dimension : dimensions) {
            dimensionSet.add(dimension.getCanonicalName());
        }
        request.setDimensions((Set)dimensionSet);
        HashSet groupSet = Sets.newHashSet();
        for (TblColRef group : groups) {
            groupSet.add(group.getCanonicalName());
        }
        request.setGroups((Set)groupSet);
        request.setMetrics((List)Lists.newArrayList(metrics));
        return request;
    }

    public static class QueuedStreamingTupleIterator
    implements ITupleIterator {
        private BlockingQueue<Iterator<ITuple>> queue;
        private Iterator<ITuple> currentBlock = Collections.emptyIterator();
        private int totalBlockNum;
        private int numConsumeBlocks = 0;
        private int timeout;
        private long timeoutTS;
        private volatile Exception endpointException;

        public QueuedStreamingTupleIterator(int blockNum, int timeout) {
            this.queue = new LinkedBlockingQueue<Iterator<ITuple>>(blockNum);
            this.totalBlockNum = blockNum;
            this.timeout = (int)((double)this.timeout * 1.1);
            this.timeoutTS = System.currentTimeMillis() + (long)timeout;
        }

        public void addBlock(Iterator<ITuple> tuples) {
            try {
                this.queue.put(tuples);
            }
            catch (InterruptedException e) {
                logger.error("interrupted", (Throwable)e);
                throw new RuntimeException("interrupted", e);
            }
        }

        public void setEndpointException(Exception e) {
            this.endpointException = e;
        }

        private boolean hasEndpointFail() {
            return this.endpointException != null;
        }

        public void close() {
        }

        public boolean hasNext() {
            try {
                if (this.currentBlock.hasNext()) {
                    return true;
                }
                if (this.numConsumeBlocks < this.totalBlockNum) {
                    while (this.numConsumeBlocks < this.totalBlockNum) {
                        if (this.hasEndpointFail()) {
                            throw new RuntimeException("endpoint fail", this.endpointException);
                        }
                        Iterator<ITuple> ret = null;
                        while (ret == null && this.endpointException == null && this.timeoutTS > System.currentTimeMillis()) {
                            ret = this.queue.poll(1000L, TimeUnit.MILLISECONDS);
                        }
                        this.currentBlock = ret;
                        if (this.currentBlock == null) {
                            throw new RuntimeException("timeout when call stream rpc");
                        }
                        ++this.numConsumeBlocks;
                        if (!this.currentBlock.hasNext()) continue;
                        return true;
                    }
                }
            }
            catch (InterruptedException e) {
                logger.error("interrupted", (Throwable)e);
                throw new RuntimeException("interrupted", e);
            }
            return false;
        }

        public ITuple next() {
            return this.currentBlock.next();
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

