/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.service;

import com.stratio.cassandra.lucene.IndexSearcher;
import com.stratio.cassandra.lucene.service.RowKey;
import com.stratio.cassandra.lucene.service.RowKeys;
import com.stratio.cassandra.lucene.service.RowMapper;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.AbstractRangeCommand;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IndexExpression;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RangeSliceCommand;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.SliceQueryFilter;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.locator.LocalStrategy;
import org.apache.cassandra.metrics.ClientRequestMetrics;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.DigestMismatchException;
import org.apache.cassandra.service.IReadCommand;
import org.apache.cassandra.service.IResponseResolver;
import org.apache.cassandra.service.RangeSliceResponseResolver;
import org.apache.cassandra.service.ReadCallback;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LuceneStorageProxy {
    private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
    private static final ClientRequestMetrics rangeMetrics = new ClientRequestMetrics("RangeSlice");
    private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.1;
    private static final Method getLiveSortedEndpoints;
    private static final Method intersection;
    private static final Method calculateResultRowsUsingEstimatedKeys;

    public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, RingPosition<?> pos) throws Exception {
        return (List)getLiveSortedEndpoints.invoke((Object)StorageProxy.instance, keyspace, pos);
    }

    public static List<InetAddress> intersection(List<InetAddress> l1, List<InetAddress> l2) throws Exception {
        return (List)intersection.invoke((Object)StorageProxy.instance, l1, l2);
    }

    private static float calculateResultRowsUsingEstimatedKeys(ColumnFamilyStore cfs) throws Exception {
        return ((Float)calculateResultRowsUsingEstimatedKeys.invoke((Object)StorageProxy.instance, cfs)).floatValue();
    }

    public static float estimateResultRowsPerRange(Keyspace keyspace, String columnFamily, List<IndexExpression> rowFilter, boolean countCQL3Rows) throws Exception {
        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(columnFamily);
        float resultRowsPerRange = Float.POSITIVE_INFINITY;
        if (rowFilter != null && !rowFilter.isEmpty()) {
            List searchers = cfs.indexManager.getIndexSearchersForQuery(rowFilter);
            if (searchers.isEmpty()) {
                resultRowsPerRange = LuceneStorageProxy.calculateResultRowsUsingEstimatedKeys(cfs);
            } else {
                for (SecondaryIndexSearcher searcher : searchers) {
                    SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(rowFilter);
                    resultRowsPerRange = Math.min(resultRowsPerRange, (float)highestSelectivityIndex.estimateResultRows());
                }
            }
        } else {
            resultRowsPerRange = !countCQL3Rows ? (float)cfs.estimateKeys() : LuceneStorageProxy.calculateResultRowsUsingEstimatedKeys(cfs);
        }
        return resultRowsPerRange / (float)DatabaseDescriptor.getNumTokens().intValue() / (float)keyspace.getReplicationStrategy().getReplicationFactor();
    }

    public static boolean ignoredTombstonedPartitions(IDiskAtomFilter predicate) {
        return predicate instanceof SliceQueryFilter && ((SliceQueryFilter)predicate).compositesToGroup == -2;
    }

    private static RowKey rowKey(AbstractBounds<RowPosition> range, RowKeys rowKeys) {
        if (rowKeys == null) {
            return null;
        }
        for (RowKey rowKey : rowKeys) {
            DecoratedKey key = rowKey.getPartitionKey();
            if (!range.contains((RingPosition)key)) continue;
            return rowKey;
        }
        return null;
    }

    public static RowKey last(RowMapper mapper, RowKey rowKey, List<Row> rows, List<Row> processedRows) {
        for (int i = rows.size() - 1; i >= 0; --i) {
            Row row = rows.get(i);
            if (!processedRows.contains(row)) continue;
            return mapper.rowKey(row);
        }
        return rowKey;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Pair<List<Row>, RowKeys> getRangeSlice(IndexSearcher searcher, String keyspaceName, String columnFamily, long timestamp, IDiskAtomFilter predicate, AbstractBounds<RowPosition> keyRange, List<IndexExpression> expressions, int limit, ConsistencyLevel consistency_level, RowKeys rowKeys, boolean countCQL3Rows) throws Exception {
        ArrayList<Row> rows;
        Tracing.trace((String)"Computing ranges to query");
        long startTime = System.nanoTime();
        Keyspace keyspace = Keyspace.open((String)keyspaceName);
        HashMap<AbstractBounds<RowPosition>, List<Row>> rowsPerRange = new HashMap<AbstractBounds<RowPosition>, List<Row>>();
        try {
            int concurrencyFactor;
            int liveRowCount = 0;
            boolean countLiveRows = countCQL3Rows || LuceneStorageProxy.ignoredTombstonedPartitions(predicate);
            rows = new ArrayList<Row>();
            List ranges = keyspace.getReplicationStrategy() instanceof LocalStrategy ? keyRange.unwrap() : StorageProxy.getRestrictedRanges(keyRange);
            int rowsToBeFetched = limit;
            if (searcher.requiresScanningAllRanges(expressions)) {
                rowsToBeFetched *= ranges.size();
                concurrencyFactor = ranges.size();
                logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}", new Object[]{limit, ranges.size(), concurrencyFactor});
                Tracing.trace((String)"Submitting range requests on {} ranges with a concurrency of {}", (Object[])new Object[]{ranges.size(), concurrencyFactor});
            } else {
                float resultRowsPerRange = LuceneStorageProxy.estimateResultRowsPerRange(keyspace, columnFamily, expressions, countCQL3Rows);
                concurrencyFactor = (double)(resultRowsPerRange = (float)((double)resultRowsPerRange - (double)resultRowsPerRange * 0.1)) == 0.0 ? 1 : Math.max(1, Math.min(ranges.size(), (int)Math.ceil((float)limit / resultRowsPerRange)));
                logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}", new Object[]{Float.valueOf(resultRowsPerRange), limit, ranges.size(), concurrencyFactor});
                Tracing.trace((String)"Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", (Object[])new Object[]{ranges.size(), concurrencyFactor, Float.valueOf(resultRowsPerRange)});
            }
            boolean haveSufficientRows = false;
            int i = 0;
            AbstractBounds nextRange = null;
            List<InetAddress> nextEndpoints = null;
            List nextFilteredEndpoints = null;
            while (i < ranges.size()) {
                float actualRowsPerRange;
                ArrayList<Pair> scanHandlers = new ArrayList<Pair>(concurrencyFactor);
                int concurrentFetchStartingIndex = i;
                int concurrentRequests = 0;
                while (i - concurrentFetchStartingIndex < concurrencyFactor) {
                    AbstractBounds range = nextRange == null ? (AbstractBounds)ranges.get(i) : nextRange;
                    List<InetAddress> liveEndpoints = nextEndpoints == null ? LuceneStorageProxy.getLiveSortedEndpoints(keyspace, range.right) : nextEndpoints;
                    List filteredEndpoints = nextFilteredEndpoints == null ? consistency_level.filterForQuery(keyspace, liveEndpoints) : nextFilteredEndpoints;
                    ++i;
                    ++concurrentRequests;
                    while (i < ranges.size()) {
                        List<InetAddress> merged;
                        nextRange = (AbstractBounds)ranges.get(i);
                        nextEndpoints = LuceneStorageProxy.getLiveSortedEndpoints(keyspace, nextRange.right);
                        nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
                        if (((RowPosition)range.right).isMinimum() || !consistency_level.isSufficientLiveNodes(keyspace, merged = LuceneStorageProxy.intersection(liveEndpoints, nextEndpoints))) break;
                        List filteredMerged = consistency_level.filterForQuery(keyspace, merged);
                        if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints)) break;
                        range = range.withNewRight(nextRange.right);
                        liveEndpoints = merged;
                        filteredEndpoints = filteredMerged;
                        ++i;
                    }
                    RowKey after = LuceneStorageProxy.rowKey((AbstractBounds<RowPosition>)range, rowKeys);
                    ArrayList<IndexExpression> decoratedExpressions = new ArrayList<IndexExpression>(expressions);
                    if (after != null) {
                        decoratedExpressions.add(new IndexExpression(IndexSearcher.AFTER, Operator.EQ, searcher.mapper().byteBuffer(after)));
                    }
                    RangeSliceCommand command = new RangeSliceCommand(keyspaceName, columnFamily, timestamp, predicate, range, decoratedExpressions, limit);
                    rowsPerRange.put((AbstractBounds<RowPosition>)range, new ArrayList());
                    AbstractRangeCommand nodeCmd = command.forSubRange(range);
                    RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, timestamp);
                    List minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
                    ReadCallback handler = new ReadCallback((IResponseResolver)resolver, consistency_level, (IReadCommand)nodeCmd, minimalEndpoints);
                    handler.assureSufficientLiveNodes();
                    resolver.setSources(filteredEndpoints);
                    if (filteredEndpoints.size() == 1 && ((InetAddress)filteredEndpoints.get(0)).equals(FBUtilities.getBroadcastAddress())) {
                        StageManager.getStage((Stage)Stage.READ).execute((Runnable)new StorageProxy.LocalRangeSliceRunnable(nodeCmd, handler));
                    } else {
                        MessageOut message = nodeCmd.createMessage();
                        for (InetAddress endpoint : filteredEndpoints) {
                            Tracing.trace((String)"Enqueuing request to {}", (Object)endpoint);
                            MessagingService.instance().sendRR(message, endpoint, (IAsyncCallback)handler);
                        }
                    }
                    scanHandlers.add(Pair.create((Object)nodeCmd, (Object)handler));
                }
                Tracing.trace((String)"Submitted {} concurrent range requests covering {} ranges", (Object)concurrentRequests, (Object)(i - concurrentFetchStartingIndex));
                ArrayList repairResponses = new ArrayList();
                for (Pair cmdPairHandler : scanHandlers) {
                    ReadCallback handler = (ReadCallback)cmdPairHandler.right;
                    RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
                    try {
                        for (Row row : (Iterable)handler.get()) {
                            rows.add(row);
                            ((List)rowsPerRange.get(((AbstractRangeCommand)cmdPairHandler.left).keyRange)).add(row);
                            if (!countLiveRows) continue;
                            liveRowCount += row.getLiveCount(predicate, timestamp);
                        }
                        repairResponses.addAll(resolver.repairResults);
                    }
                    catch (ReadTimeoutException ex) {
                        String gotData;
                        int blockFor = consistency_level.blockFor(keyspace);
                        int responseCount = resolver.responses.size();
                        String string = responseCount > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : (gotData = "");
                        if (Tracing.isTracing()) {
                            Tracing.trace((String)"Timed out; received {} of {} responses{} for range {} of {}", (Object[])new Object[]{responseCount, blockFor, gotData, i, ranges.size()});
                        } else if (logger.isDebugEnabled()) {
                            logger.debug("Range slice timeout; received {} of {} responses{} for range {} of {}", new Object[]{responseCount, blockFor, gotData, i, ranges.size()});
                        }
                        throw ex;
                    }
                    catch (DigestMismatchException e) {
                        throw new AssertionError((Object)e);
                    }
                    int count = countLiveRows ? liveRowCount : rows.size();
                    if (count < rowsToBeFetched) continue;
                    haveSufficientRows = true;
                    break;
                }
                try {
                    FBUtilities.waitOnFutures(repairResponses, (long)DatabaseDescriptor.getWriteRpcTimeout());
                }
                catch (TimeoutException ex) {
                    int blockFor = consistency_level.blockFor(keyspace);
                    if (Tracing.isTracing()) {
                        Tracing.trace((String)"Timed out while read-repairing after receiving all {} data and digest responses", (Object)blockFor);
                    } else {
                        logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", (Object)blockFor);
                    }
                    throw new ReadTimeoutException(consistency_level, blockFor - 1, blockFor, true);
                }
                if (haveSufficientRows) {
                    Pair<List<Row>, RowKeys> ex = LuceneStorageProxy.makeResult(rows, searcher, expressions, limit, rowsPerRange, rowKeys, searcher.mapper());
                    return ex;
                }
                if (i >= ranges.size()) continue;
                float fetchedRows = countLiveRows ? (float)liveRowCount : (float)rows.size();
                float remainingRows = (float)rowsToBeFetched - fetchedRows;
                if ((double)fetchedRows == 0.0) {
                    actualRowsPerRange = 0.0f;
                    concurrencyFactor = ranges.size() - i;
                } else {
                    actualRowsPerRange = fetchedRows / (float)i;
                    concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange)));
                }
                logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}", new Object[]{Float.valueOf(actualRowsPerRange), (int)remainingRows, concurrencyFactor});
            }
        }
        finally {
            long latency = System.nanoTime() - startTime;
            rangeMetrics.addNano(latency);
            Keyspace.open((String)keyspaceName).getColumnFamilyStore((String)columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
        }
        return LuceneStorageProxy.makeResult(rows, searcher, expressions, limit, rowsPerRange, rowKeys, searcher.mapper());
    }

    public static Pair<List<Row>, RowKeys> makeResult(List<Row> rows, IndexSearcher searcher, List<IndexExpression> expressions, int limit, Map<AbstractBounds<RowPosition>, List<Row>> rowsPerRange, RowKeys rowKeys, RowMapper mapper) {
        rows = (rows = searcher.postReconciliationProcessing(expressions, rows)).size() > limit ? rows.subList(0, limit) : rows;
        RowKeys newRowKeys = new RowKeys();
        for (Map.Entry<AbstractBounds<RowPosition>, List<Row>> entry : rowsPerRange.entrySet()) {
            RowKey rowKey = LuceneStorageProxy.rowKey(entry.getKey(), rowKeys);
            RowKey newRowKey = LuceneStorageProxy.last(mapper, rowKey, rows, entry.getValue());
            if (newRowKey == null) continue;
            newRowKeys.add(newRowKey);
        }
        return Pair.create(rows, (Object)newRowKeys);
    }

    static {
        try {
            Class<StorageProxy> clazz = StorageProxy.class;
            getLiveSortedEndpoints = clazz.getDeclaredMethod("getLiveSortedEndpoints", Keyspace.class, RingPosition.class);
            getLiveSortedEndpoints.setAccessible(true);
            intersection = clazz.getDeclaredMethod("intersection", List.class, List.class);
            intersection.setAccessible(true);
            calculateResultRowsUsingEstimatedKeys = clazz.getDeclaredMethod("calculateResultRowsUsingEstimatedKeys", ColumnFamilyStore.class);
            calculateResultRowsUsingEstimatedKeys.setAccessible(true);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

