/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.action.search;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.search.AbstractSearchAsyncAction;
import org.elasticsearch.action.search.CanMatchPreFilterSearchPhase;
import org.elasticsearch.action.search.SearchDfsQueryThenFetchAsyncAction;
import org.elasticsearch.action.search.SearchPhase;
import org.elasticsearch.action.search.SearchPhaseController;
import org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchResponseMerger;
import org.elasticsearch.action.search.SearchShardIterator;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

public class TransportSearchAction
extends HandledTransportAction<SearchRequest, SearchResponse> {
    public static final Setting<Long> SHARD_COUNT_LIMIT_SETTING = Setting.longSetting("action.search.shard_count.limit", Long.MAX_VALUE, 1L, Setting.Property.Dynamic, Setting.Property.NodeScope);
    private final ThreadPool threadPool;
    private final ClusterService clusterService;
    private final SearchTransportService searchTransportService;
    private final RemoteClusterService remoteClusterService;
    private final SearchPhaseController searchPhaseController;
    private final SearchService searchService;
    private final IndexNameExpressionResolver indexNameExpressionResolver;

    @Inject
    public TransportSearchAction(ThreadPool threadPool, TransportService transportService, SearchService searchService, SearchTransportService searchTransportService, SearchPhaseController searchPhaseController, ClusterService clusterService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
        super("indices:data/read/search", transportService, actionFilters, SearchRequest::new);
        this.threadPool = threadPool;
        this.searchPhaseController = searchPhaseController;
        this.searchTransportService = searchTransportService;
        this.remoteClusterService = searchTransportService.getRemoteClusterService();
        SearchTransportService.registerRequestHandler(transportService, searchService);
        this.clusterService = clusterService;
        this.searchService = searchService;
        this.indexNameExpressionResolver = indexNameExpressionResolver;
    }

    private Map<String, AliasFilter> buildPerIndexAliasFilter(SearchRequest request, ClusterState clusterState, Index[] concreteIndices, Map<String, AliasFilter> remoteAliasMap) {
        HashMap<String, AliasFilter> aliasFilterMap = new HashMap<String, AliasFilter>();
        Set<String> indicesAndAliases = this.indexNameExpressionResolver.resolveExpressions(clusterState, request.indices());
        for (Index index : concreteIndices) {
            clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index.getName());
            AliasFilter aliasFilter = this.searchService.buildAliasFilter(clusterState, index.getName(), indicesAndAliases);
            assert (aliasFilter != null);
            aliasFilterMap.put(index.getUUID(), aliasFilter);
        }
        aliasFilterMap.putAll(remoteAliasMap);
        return aliasFilterMap;
    }

    private Map<String, Float> resolveIndexBoosts(SearchRequest searchRequest, ClusterState clusterState) {
        if (searchRequest.source() == null) {
            return Collections.emptyMap();
        }
        SearchSourceBuilder source = searchRequest.source();
        if (source.indexBoosts() == null) {
            return Collections.emptyMap();
        }
        HashMap<String, Float> concreteIndexBoosts = new HashMap<String, Float>();
        for (SearchSourceBuilder.IndexBoost ib : source.indexBoosts()) {
            Index[] concreteIndices;
            for (Index concreteIndex : concreteIndices = this.indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(), ib.getIndex())) {
                concreteIndexBoosts.putIfAbsent(concreteIndex.getUUID(), Float.valueOf(ib.getBoost()));
            }
        }
        return Collections.unmodifiableMap(concreteIndexBoosts);
    }

    @Override
    protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        long relativeStartNanos = System.nanoTime();
        SearchTimeProvider timeProvider = new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
        ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
            if (source != searchRequest.source()) {
                searchRequest.source((SearchSourceBuilder)source);
            }
            ClusterState clusterState = this.clusterService.state();
            Map<String, OriginalIndices> remoteClusterIndices = this.remoteClusterService.groupIndices(searchRequest.indicesOptions(), searchRequest.indices(), idx -> this.indexNameExpressionResolver.hasIndexOrAlias((String)idx, clusterState));
            OriginalIndices localIndices = remoteClusterIndices.remove("");
            if (remoteClusterIndices.isEmpty()) {
                this.executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener);
            } else if (TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)) {
                TransportSearchAction.ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider, this.searchService::createReduceContext, this.remoteClusterService, this.threadPool, listener, (r, l) -> this.executeLocalSearch(task, timeProvider, (SearchRequest)r, localIndices, clusterState, (ActionListener<SearchResponse>)l));
            } else {
                AtomicInteger skippedClusters = new AtomicInteger(0);
                TransportSearchAction.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), skippedClusters, remoteClusterIndices, this.remoteClusterService, this.threadPool, ActionListener.wrap(searchShardsResponses -> {
                    ArrayList<SearchShardIterator> remoteShardIterators = new ArrayList<SearchShardIterator>();
                    HashMap<String, AliasFilter> remoteAliasFilters = new HashMap<String, AliasFilter>();
                    BiFunction<String, String, DiscoveryNode> clusterNodeLookup = TransportSearchAction.processRemoteShards(searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
                    int localClusters = localIndices == null ? 0 : 1;
                    int totalClusters = remoteClusterIndices.size() + localClusters;
                    int successfulClusters = searchShardsResponses.size() + localClusters;
                    this.executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));
                }, listener::onFailure));
            }
        }, listener::onFailure);
        if (searchRequest.source() == null) {
            rewriteListener.onResponse(searchRequest.source());
        } else {
            Rewriteable.rewriteAndFetch(searchRequest.source(), this.searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis), rewriteListener);
        }
    }

    static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
        if (!searchRequest.isCcsMinimizeRoundtrips()) {
            return false;
        }
        if (searchRequest.scroll() != null) {
            return false;
        }
        if (searchRequest.searchType() == SearchType.DFS_QUERY_THEN_FETCH) {
            return false;
        }
        SearchSourceBuilder source = searchRequest.source();
        return source == null || source.collapse() == null || source.collapse().getInnerHits() == null || source.collapse().getInnerHits().isEmpty();
    }

    static void ccsRemoteReduce(SearchRequest searchRequest, OriginalIndices localIndices, Map<String, OriginalIndices> remoteIndices, final SearchTimeProvider timeProvider, Function<Boolean, InternalAggregation.ReduceContext> reduceContext, RemoteClusterService remoteClusterService, ThreadPool threadPool, final ActionListener<SearchResponse> listener, BiConsumer<SearchRequest, ActionListener<SearchResponse>> localSearchConsumer) {
        if (localIndices == null && remoteIndices.size() == 1) {
            Map.Entry<String, OriginalIndices> entry = remoteIndices.entrySet().iterator().next();
            final String clusterAlias = entry.getKey();
            final boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
            OriginalIndices indices = entry.getValue();
            SearchRequest ccsSearchRequest = SearchRequest.subSearchRequest(searchRequest, indices.indices(), clusterAlias, timeProvider.getAbsoluteStartMillis(), true);
            Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
            remoteClusterClient.search(ccsSearchRequest, new ActionListener<SearchResponse>(){

                @Override
                public void onResponse(SearchResponse searchResponse) {
                    Map<String, ProfileShardResult> profileResults = searchResponse.getProfileResults();
                    SearchProfileShardResults profile = profileResults == null || profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
                    InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchResponse.getHits(), (InternalAggregations)searchResponse.getAggregations(), searchResponse.getSuggest(), profile, searchResponse.isTimedOut(), searchResponse.isTerminatedEarly(), searchResponse.getNumReducePhases());
                    listener.onResponse(new SearchResponse(internalSearchResponse, searchResponse.getScrollId(), searchResponse.getTotalShards(), searchResponse.getSuccessfulShards(), searchResponse.getSkippedShards(), timeProvider.buildTookInMillis(), searchResponse.getShardFailures(), new SearchResponse.Clusters(1, 1, 0)));
                }

                @Override
                public void onFailure(Exception e) {
                    if (skipUnavailable) {
                        listener.onResponse(SearchResponse.empty(timeProvider::buildTookInMillis, new SearchResponse.Clusters(1, 0, 1)));
                    } else {
                        listener.onFailure(TransportSearchAction.wrapRemoteClusterFailure(clusterAlias, e));
                    }
                }
            });
        } else {
            SearchResponseMerger searchResponseMerger = TransportSearchAction.createSearchResponseMerger(searchRequest.source(), timeProvider, reduceContext);
            AtomicInteger skippedClusters = new AtomicInteger(0);
            AtomicReference<Exception> exceptions = new AtomicReference<Exception>();
            int totalClusters = remoteIndices.size() + (localIndices == null ? 0 : 1);
            CountDown countDown = new CountDown(totalClusters);
            for (Map.Entry<String, OriginalIndices> entry : remoteIndices.entrySet()) {
                String clusterAlias = entry.getKey();
                boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
                OriginalIndices indices = entry.getValue();
                SearchRequest ccsSearchRequest = SearchRequest.subSearchRequest(searchRequest, indices.indices(), clusterAlias, timeProvider.getAbsoluteStartMillis(), false);
                ActionListener<SearchResponse> ccsListener = TransportSearchAction.createCCSListener(clusterAlias, skipUnavailable, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener);
                Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
                remoteClusterClient.search(ccsSearchRequest, ccsListener);
            }
            if (localIndices != null) {
                ActionListener<SearchResponse> ccsListener = TransportSearchAction.createCCSListener("", false, countDown, skippedClusters, exceptions, searchResponseMerger, totalClusters, listener);
                SearchRequest ccsLocalSearchRequest = SearchRequest.subSearchRequest(searchRequest, localIndices.indices(), "", timeProvider.getAbsoluteStartMillis(), false);
                localSearchConsumer.accept(ccsLocalSearchRequest, ccsListener);
            }
        }
    }

    static SearchResponseMerger createSearchResponseMerger(SearchSourceBuilder source, SearchTimeProvider timeProvider, Function<Boolean, InternalAggregation.ReduceContext> reduceContextFunction) {
        int trackTotalHitsUpTo;
        int size;
        int from;
        if (source == null) {
            from = 0;
            size = 10;
            trackTotalHitsUpTo = 10000;
        } else {
            from = source.from() == -1 ? 0 : source.from();
            size = source.size() == -1 ? 10 : source.size();
            trackTotalHitsUpTo = source.trackTotalHitsUpTo() == null ? 10000 : source.trackTotalHitsUpTo();
            source.from(0);
            source.size(from + size);
        }
        return new SearchResponseMerger(from, size, trackTotalHitsUpTo, timeProvider, reduceContextFunction);
    }

    static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters, Map<String, OriginalIndices> remoteIndicesByCluster, RemoteClusterService remoteClusterService, ThreadPool threadPool, ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
        CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
        final ConcurrentHashMap searchShardsResponses = new ConcurrentHashMap();
        AtomicReference exceptions = new AtomicReference();
        for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
            final String clusterAlias = entry.getKey();
            boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
            Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
            String[] indices = entry.getValue().indices();
            ClusterSearchShardsRequest searchShardsRequest = ((ClusterSearchShardsRequest)new ClusterSearchShardsRequest(indices).indicesOptions(indicesOptions).local(true)).preference(preference).routing(routing);
            clusterClient.admin().cluster().searchShards(searchShardsRequest, (ActionListener<ClusterSearchShardsResponse>)new CCSActionListener<ClusterSearchShardsResponse, Map<String, ClusterSearchShardsResponse>>(clusterAlias, skipUnavailable, responsesCountDown, skippedClusters, exceptions, listener){

                @Override
                void innerOnResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
                    searchShardsResponses.put(clusterAlias, clusterSearchShardsResponse);
                }

                @Override
                Map<String, ClusterSearchShardsResponse> createFinalResponse() {
                    return searchShardsResponses;
                }
            });
        }
    }

    private static ActionListener<SearchResponse> createCCSListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, final AtomicInteger skippedClusters, AtomicReference<Exception> exceptions, final SearchResponseMerger searchResponseMerger, final int totalClusters, ActionListener<SearchResponse> originalListener) {
        return new CCSActionListener<SearchResponse, SearchResponse>(clusterAlias, skipUnavailable, countDown, skippedClusters, exceptions, originalListener){

            @Override
            void innerOnResponse(SearchResponse searchResponse) {
                searchResponseMerger.add(searchResponse);
            }

            @Override
            SearchResponse createFinalResponse() {
                SearchResponse.Clusters clusters = new SearchResponse.Clusters(totalClusters, searchResponseMerger.numResponses(), skippedClusters.get());
                return searchResponseMerger.getMergedResponse(clusters);
            }
        };
    }

    private void executeLocalSearch(Task task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices, ClusterState clusterState, ActionListener<SearchResponse> listener) {
        this.executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(), (clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY);
    }

    static BiFunction<String, String, DiscoveryNode> processRemoteShards(Map<String, ClusterSearchShardsResponse> searchShardsResponses, Map<String, OriginalIndices> remoteIndicesByCluster, List<SearchShardIterator> remoteShardIterators, Map<String, AliasFilter> aliasFilterMap) {
        HashMap clusterToNode = new HashMap();
        for (Map.Entry<String, ClusterSearchShardsResponse> entry : searchShardsResponses.entrySet()) {
            String clusterAlias2 = entry.getKey();
            ClusterSearchShardsResponse searchShardsResponse = entry.getValue();
            HashMap<String, DiscoveryNode> idToDiscoveryNode = new HashMap<String, DiscoveryNode>();
            clusterToNode.put(clusterAlias2, idToDiscoveryNode);
            for (DiscoveryNode remoteNode : searchShardsResponse.getNodes()) {
                idToDiscoveryNode.put(remoteNode.getId(), remoteNode);
            }
            Map<String, AliasFilter> indicesAndFilters = searchShardsResponse.getIndicesAndFilters();
            for (ClusterSearchShardsGroup clusterSearchShardsGroup : searchShardsResponse.getGroups()) {
                String[] stringArray;
                AliasFilter aliasFilter;
                ShardId shardId = clusterSearchShardsGroup.getShardId();
                if (indicesAndFilters == null) {
                    aliasFilter = AliasFilter.EMPTY;
                } else {
                    aliasFilter = indicesAndFilters.get(shardId.getIndexName());
                    assert (aliasFilter != null) : "alias filter must not be null for index: " + shardId.getIndex();
                }
                String[] aliases = aliasFilter.getAliases();
                if (aliases.length == 0) {
                    String[] stringArray2 = new String[1];
                    stringArray = stringArray2;
                    stringArray2[0] = shardId.getIndexName();
                } else {
                    stringArray = aliases;
                }
                String[] finalIndices = stringArray;
                aliasFilterMap.put(shardId.getIndex().getUUID(), aliasFilter);
                OriginalIndices originalIndices = remoteIndicesByCluster.get(clusterAlias2);
                assert (originalIndices != null) : "original indices are null for clusterAlias: " + clusterAlias2;
                SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias2, shardId, Arrays.asList(clusterSearchShardsGroup.getShards()), new OriginalIndices(finalIndices, originalIndices.indicesOptions()));
                remoteShardIterators.add(shardIterator);
            }
        }
        return (clusterAlias, nodeId) -> {
            Map clusterNodes = (Map)clusterToNode.get(clusterAlias);
            if (clusterNodes == null) {
                throw new IllegalArgumentException("unknown remote cluster: " + clusterAlias);
            }
            return (DiscoveryNode)clusterNodes.get(nodeId);
        };
    }

    private Index[] resolveLocalIndices(OriginalIndices localIndices, IndicesOptions indicesOptions, ClusterState clusterState, SearchTimeProvider timeProvider) {
        if (localIndices == null) {
            return Index.EMPTY_ARRAY;
        }
        return this.indexNameExpressionResolver.concreteIndices(clusterState, indicesOptions, timeProvider.getAbsoluteStartMillis(), localIndices.indices());
    }

    private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators, BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState, Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener, final SearchResponse.Clusters clusters) {
        clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
        Index[] indices = this.resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider);
        Map<String, AliasFilter> aliasFilter = this.buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
        Map<String, Set<String>> routingMap = this.indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices());
        routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
        Map<String, Float> concreteIndexBoosts = this.resolveIndexBoosts(searchRequest, clusterState);
        if (TransportSearchAction.shouldSplitIndices(searchRequest)) {
            ArrayList<String> writeIndicesList = new ArrayList<String>();
            ArrayList<String> readOnlyIndicesList = new ArrayList<String>();
            TransportSearchAction.splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList);
            String[] writeIndices = writeIndicesList.toArray(new String[0]);
            String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]);
            if (readOnlyIndices.length == 0) {
                this.executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap, aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
            } else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) {
                this.executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap, aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
            } else {
                CountDown countDown = new CountDown(2);
                AtomicReference exceptions = new AtomicReference();
                final SearchResponseMerger searchResponseMerger = TransportSearchAction.createSearchResponseMerger(searchRequest.source(), timeProvider, this.searchService::createReduceContext);
                CountDownActionListener<SearchResponse, SearchResponse> countDownActionListener = new CountDownActionListener<SearchResponse, SearchResponse>(countDown, exceptions, listener){

                    @Override
                    void innerOnResponse(SearchResponse searchResponse) {
                        searchResponseMerger.add(searchResponse);
                    }

                    @Override
                    SearchResponse createFinalResponse() {
                        return searchResponseMerger.getMergedResponse(clusters);
                    }
                };
                SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices, "", timeProvider.getAbsoluteStartMillis(), false);
                this.executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap, aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, (ActionListener<SearchResponse>)countDownActionListener, SearchResponse.Clusters.EMPTY);
                SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices, "", timeProvider.getAbsoluteStartMillis(), false);
                this.executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap, aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, (ActionListener<SearchResponse>)countDownActionListener, SearchResponse.Clusters.EMPTY);
            }
        } else {
            String[] concreteIndices = (String[])Arrays.stream(indices).map(Index::getName).toArray(String[]::new);
            this.executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap, aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
        }
    }

    static boolean shouldSplitIndices(SearchRequest searchRequest) {
        return searchRequest.scroll() == null && searchRequest.searchType() != SearchType.DFS_QUERY_THEN_FETCH && (searchRequest.source() == null || searchRequest.source().size() != 0);
    }

    static void splitIndices(Index[] indices, ClusterState clusterState, List<String> writeIndices, List<String> readOnlyIndices) {
        for (Index index : indices) {
            ClusterBlockException writeBlock = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, index.getName());
            if (writeBlock == null) {
                writeIndices.add(index.getName());
                continue;
            }
            readOnlyIndices.add(index.getName());
        }
    }

    private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices, String[] concreteIndices, Map<String, Set<String>> routingMap, Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts, List<SearchShardIterator> remoteShardIterators, BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState, ActionListener<SearchResponse> listener, SearchResponse.Clusters clusters) {
        Map<String, Long> nodeSearchCounts = this.searchTransportService.getPendingSearchRequests();
        GroupShardsIterator<ShardIterator> localShardsIterator = this.clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference(), this.searchService.getResponseCollectorService(), nodeSearchCounts);
        GroupShardsIterator<SearchShardIterator> shardIterators = TransportSearchAction.mergeShardsIterators(localShardsIterator, localIndices, searchRequest.getLocalClusterAlias(), remoteShardIterators);
        TransportSearchAction.failIfOverShardCountLimit(this.clusterService, shardIterators.size());
        if (shardIterators.size() == 1) {
            searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
        }
        if (searchRequest.allowPartialSearchResults() == null) {
            searchRequest.allowPartialSearchResults(this.searchService.defaultAllowPartialSearchResults());
        }
        if (searchRequest.isSuggestOnly()) {
            searchRequest.requestCache(false);
            if (searchRequest.searchType() == SearchType.DFS_QUERY_THEN_FETCH) {
                searchRequest.searchType(SearchType.QUERY_THEN_FETCH);
            }
        }
        DiscoveryNodes nodes = clusterState.nodes();
        BiFunction<String, String, Transport.Connection> connectionLookup = TransportSearchAction.buildConnectionLookup(searchRequest.getLocalClusterAlias(), nodes::get, remoteConnections, this.searchTransportService::getConnection);
        boolean preFilterSearchShards = TransportSearchAction.shouldPreFilterSearchShards(searchRequest, shardIterators);
        this.searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(), Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
    }

    static BiFunction<String, String, Transport.Connection> buildConnectionLookup(String requestClusterAlias, Function<String, DiscoveryNode> localNodes, BiFunction<String, String, DiscoveryNode> remoteNodes, BiFunction<String, DiscoveryNode, Transport.Connection> nodeToConnection) {
        return (clusterAlias, nodeId) -> {
            boolean remoteCluster;
            DiscoveryNode discoveryNode;
            if (clusterAlias == null || requestClusterAlias != null) {
                assert (requestClusterAlias == null || requestClusterAlias.equals(clusterAlias));
                discoveryNode = (DiscoveryNode)localNodes.apply((String)nodeId);
                remoteCluster = false;
            } else {
                discoveryNode = (DiscoveryNode)remoteNodes.apply((String)clusterAlias, (String)nodeId);
                remoteCluster = true;
            }
            if (discoveryNode == null) {
                throw new IllegalStateException("no node found for id: " + nodeId);
            }
            return (Transport.Connection)nodeToConnection.apply(remoteCluster ? clusterAlias : null, discoveryNode);
        };
    }

    private static boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator<SearchShardIterator> shardIterators) {
        SearchSourceBuilder source = searchRequest.source();
        return searchRequest.searchType() == SearchType.QUERY_THEN_FETCH && SearchService.canRewriteToMatchNone(source) && searchRequest.getPreFilterShardSize() < shardIterators.size();
    }

    static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator, OriginalIndices localIndices, @Nullable String localClusterAlias, List<SearchShardIterator> remoteShardIterators) {
        ArrayList<SearchShardIterator> shards = new ArrayList<SearchShardIterator>(remoteShardIterators);
        for (ShardIterator shardIterator : localShardsIterator) {
            shards.add(new SearchShardIterator(localClusterAlias, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
        }
        return new GroupShardsIterator<SearchShardIterator>(shards);
    }

    private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchRequest searchRequest, GroupShardsIterator<SearchShardIterator> shardIterators, SearchTimeProvider timeProvider, BiFunction<String, String, Transport.Connection> connectionLookup, long clusterStateVersion, Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts, Map<String, Set<String>> indexRoutings, ActionListener<SearchResponse> listener, boolean preFilter, SearchResponse.Clusters clusters) {
        AbstractSearchAsyncAction searchAsyncAction;
        ExecutorService executor = this.threadPool.executor("search");
        if (preFilter) {
            return new CanMatchPreFilterSearchPhase(this.logger, this.searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators, timeProvider, clusterStateVersion, task, iter -> {
                final AbstractSearchAsyncAction action = this.searchAsyncAction(task, searchRequest, (GroupShardsIterator<SearchShardIterator>)iter, timeProvider, connectionLookup, clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
                return new SearchPhase(action.getName()){

                    @Override
                    public void run() {
                        action.start();
                    }
                };
            }, clusters);
        }
        switch (searchRequest.searchType()) {
            case DFS_QUERY_THEN_FETCH: {
                searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(this.logger, this.searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, indexRoutings, this.searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider, clusterStateVersion, task, clusters);
                break;
            }
            case QUERY_THEN_FETCH: {
                searchAsyncAction = new SearchQueryThenFetchAsyncAction(this.logger, this.searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, indexRoutings, this.searchPhaseController, executor, searchRequest, listener, shardIterators, timeProvider, clusterStateVersion, task, clusters);
                break;
            }
            default: {
                throw new IllegalStateException("Unknown search type: [" + searchRequest.searchType() + "]");
            }
        }
        return searchAsyncAction;
    }

    private static void failIfOverShardCountLimit(ClusterService clusterService, int shardCount) {
        long shardCountLimit = clusterService.getClusterSettings().get(SHARD_COUNT_LIMIT_SETTING);
        if ((long)shardCount > shardCountLimit) {
            throw new IllegalArgumentException("Trying to query " + shardCount + " shards, which is over the limit of " + shardCountLimit + ". This limit exists because querying many shards at the same time can make the job of the coordinating node very CPU and/or memory intensive. It is usually a better idea to have a smaller number of larger shards. Update [" + SHARD_COUNT_LIMIT_SETTING.getKey() + "] to a greater value if you really want to query that many shards at the same time.");
        }
    }

    private static RemoteTransportException wrapRemoteClusterFailure(String clusterAlias, Exception e) {
        return new RemoteTransportException("error while communicating with remote cluster [" + clusterAlias + "]", e);
    }

    static abstract class CCSActionListener<Response, FinalResponse>
    extends CountDownActionListener<Response, FinalResponse> {
        private final String clusterAlias;
        private final boolean skipUnavailable;
        private final AtomicInteger skippedClusters;

        CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters, AtomicReference<Exception> exceptions, ActionListener<FinalResponse> originalListener) {
            super(countDown, exceptions, originalListener);
            this.clusterAlias = clusterAlias;
            this.skipUnavailable = skipUnavailable;
            this.skippedClusters = skippedClusters;
        }

        @Override
        public final void onFailure(Exception e) {
            if (this.skipUnavailable) {
                this.skippedClusters.incrementAndGet();
                this.maybeFinish();
            } else {
                Exception exception = e;
                if (!"".equals(this.clusterAlias)) {
                    exception = TransportSearchAction.wrapRemoteClusterFailure(this.clusterAlias, e);
                }
                super.onFailure(exception);
            }
        }
    }

    static abstract class CountDownActionListener<Response, FinalResponse>
    implements ActionListener<Response> {
        private final CountDown countDown;
        private final AtomicReference<Exception> exceptions;
        private final ActionListener<FinalResponse> delegateListener;

        CountDownActionListener(CountDown countDown, AtomicReference<Exception> exceptions, ActionListener<FinalResponse> delegateListener) {
            this.countDown = countDown;
            this.exceptions = exceptions;
            this.delegateListener = delegateListener;
        }

        @Override
        public final void onResponse(Response response) {
            this.innerOnResponse(response);
            this.maybeFinish();
        }

        abstract void innerOnResponse(Response var1);

        final void maybeFinish() {
            if (this.countDown.countDown()) {
                Exception exception = this.exceptions.get();
                if (exception == null) {
                    FinalResponse response;
                    try {
                        response = this.createFinalResponse();
                    }
                    catch (Exception e) {
                        this.delegateListener.onFailure(e);
                        return;
                    }
                    this.delegateListener.onResponse(response);
                } else {
                    this.delegateListener.onFailure(this.exceptions.get());
                }
            }
        }

        abstract FinalResponse createFinalResponse();

        @Override
        public void onFailure(Exception e) {
            if (!this.exceptions.compareAndSet(null, e)) {
                this.exceptions.accumulateAndGet(e, (previous, current) -> {
                    current.addSuppressed((Throwable)previous);
                    return current;
                });
            }
            this.maybeFinish();
        }
    }

    static final class SearchTimeProvider {
        private final long absoluteStartMillis;
        private final long relativeStartNanos;
        private final LongSupplier relativeCurrentNanosProvider;

        SearchTimeProvider(long absoluteStartMillis, long relativeStartNanos, LongSupplier relativeCurrentNanosProvider) {
            this.absoluteStartMillis = absoluteStartMillis;
            this.relativeStartNanos = relativeStartNanos;
            this.relativeCurrentNanosProvider = relativeCurrentNanosProvider;
        }

        long getAbsoluteStartMillis() {
            return this.absoluteStartMillis;
        }

        long buildTookInMillis() {
            return TimeUnit.NANOSECONDS.toMillis(this.relativeCurrentNanosProvider.getAsLong() - this.relativeStartNanos);
        }
    }
}

