/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.search.dispatch.rpc;

import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import com.yahoo.collections.ListMap;
import com.yahoo.collections.Pair;
import com.yahoo.compress.Compressor;
import com.yahoo.container.protect.Error;
import com.yahoo.data.access.slime.SlimeAdapter;
import com.yahoo.prelude.fastsearch.DocumentDatabase;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.TimeoutException;
import com.yahoo.processing.IllegalInputException;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client;
import com.yahoo.search.dispatch.rpc.CompressPayload;
import com.yahoo.search.dispatch.rpc.ProtobufSerialization;
import com.yahoo.search.dispatch.rpc.RpcConnectionPool;
import com.yahoo.search.dispatch.rpc.TimeoutHelper;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.slime.BinaryFormat;
import com.yahoo.slime.BinaryView;
import com.yahoo.slime.Inspector;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class RpcProtobufFillInvoker
extends FillInvoker {
    private static final String RPC_METHOD = "vespa.searchprotocol.getDocsums";
    private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName());
    private final DocumentDatabase documentDb;
    private final RpcConnectionPool resourcePool;
    private final boolean summaryNeedsQuery;
    private final String serverId;
    private final CompressPayload compressor;
    private final DecodePolicy decodePolicy;
    private BlockingQueue<Pair<Client.ResponseOrError<Client.ProtobufResponse>, List<FastHit>>> responses;
    private boolean hasReportedError = false;
    private int outstandingResponses;

    RpcProtobufFillInvoker(RpcConnectionPool resourcePool, CompressPayload compressor, DocumentDatabase documentDb, String serverId, DecodePolicy decodePolicy, boolean summaryNeedsQuery) {
        this.documentDb = documentDb;
        this.resourcePool = resourcePool;
        this.serverId = serverId;
        this.summaryNeedsQuery = summaryNeedsQuery;
        this.compressor = compressor;
        this.decodePolicy = decodePolicy;
    }

    @Override
    protected void sendFillRequest(Result result, String summaryClass) {
        if (summaryClass != null) {
            if (summaryClass.equals("")) {
                summaryClass = null;
            } else if (!this.documentDb.getDocsumDefinitionSet().hasDocsum(summaryClass)) {
                throw new IllegalInputException("invalid presentation.summary=" + summaryClass);
            }
        }
        ListMap<Integer, FastHit> hitsByNode = RpcProtobufFillInvoker.hitsByNode(result);
        result.getQuery().trace(false, 5, "Sending ", hitsByNode.size(), " summary fetch requests with jrt/protobuf");
        this.outstandingResponses = hitsByNode.size();
        this.responses = new LinkedBlockingQueue<Pair<Client.ResponseOrError<Client.ProtobufResponse>, List<FastHit>>>(this.outstandingResponses);
        TimeoutHelper.Timeout timeout = TimeoutHelper.calculateTimeout(result.getQuery());
        if (timeout.timedOut()) {
            hitsByNode.forEach((nodeId, hits) -> this.receive(Client.ResponseOrError.fromTimeoutError("Timed out prior to sending docsum request to " + nodeId), (List<FastHit>)hits));
            return;
        }
        SearchProtocol.DocsumRequest.Builder builder = ProtobufSerialization.createDocsumRequestBuilder(result.getQuery(), this.serverId, summaryClass, result.getQuery().getPresentation().getSummaryFields(), this.summaryNeedsQuery, timeout.request());
        hitsByNode.forEach((nodeId, hits) -> {
            byte[] payload = ProtobufSerialization.serializeDocsumRequest(builder, hits);
            this.sendDocsumsRequest((int)nodeId, (List<FastHit>)hits, payload, result, timeout.client());
        });
    }

    @Override
    protected void getFillResults(Result result, String summaryClass) {
        try {
            this.processResponses(result, summaryClass);
            result.hits().setSorted(false);
            result.analyzeHits();
        }
        catch (TimeoutException e) {
            result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage()));
        }
    }

    @Override
    protected void release() {
    }

    public void receive(Client.ResponseOrError<Client.ProtobufResponse> response, List<FastHit> hitsContext) {
        this.responses.add((Pair<Client.ResponseOrError<Client.ProtobufResponse>, List<FastHit>>)new Pair(response, hitsContext));
    }

    private static ListMap<Integer, FastHit> hitsByNode(Result result) {
        ListMap hitsByNode = new ListMap();
        Iterator<Hit> i = result.hits().unorderedDeepIterator();
        while (i.hasNext()) {
            Hit h = i.next();
            if (!(h instanceof FastHit)) continue;
            FastHit hit = (FastHit)h;
            hitsByNode.put((Object)hit.getDistributionKey(), (Object)hit);
        }
        return hitsByNode;
    }

    private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, Result result, double clientTimeout) {
        Client.NodeConnection node = this.resourcePool.getConnection(nodeId);
        if (node == null) {
            String error = "Could not fill hits from unknown node " + nodeId;
            this.receive(Client.ResponseOrError.fromError(error), hits);
            result.hits().addError(ErrorMessage.createEmptyDocsums(error));
            log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config");
            return;
        }
        Query query = result.getQuery();
        Compressor.Compression compressionResult = this.compressor.compress(query, payload);
        node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), roe -> this.receive(roe, hits), clientTimeout);
    }

    private void processResponses(Result result, String summaryClass) throws TimeoutException {
        try {
            int skippedHits = 0;
            while (this.outstandingResponses > 0) {
                Client.ResponseOrError response;
                Pair<Client.ResponseOrError<Client.ProtobufResponse>, List<FastHit>> responseAndHits;
                long timeLeftMs = result.getQuery().getTimeLeft();
                if (timeLeftMs <= 0L) {
                    this.throwTimeout();
                }
                if ((responseAndHits = this.responses.poll(timeLeftMs, TimeUnit.MILLISECONDS)) == null) {
                    this.throwTimeout();
                }
                if ((response = (Client.ResponseOrError)responseAndHits.getFirst()).timeout()) {
                    this.throwTimeout();
                }
                List hitsContext = (List)responseAndHits.getSecond();
                skippedHits += this.processResponse(result, response, hitsContext, summaryClass);
                --this.outstandingResponses;
            }
            if (skippedHits != 0) {
                result.hits().addError(ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " + summaryClass + " for " + skippedHits + " hits"));
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private int processResponse(Result result, Client.ResponseOrError<Client.ProtobufResponse> responseOrError, List<FastHit> hitsContext, String summaryClass) {
        if (responseOrError.error().isPresent()) {
            if (this.hasReportedError) {
                return 0;
            }
        } else {
            Client.ProtobufResponse response = responseOrError.response().get();
            byte[] responseBytes = this.compressor.decompress(response);
            return this.fill(result, hitsContext, summaryClass, responseBytes);
        }
        String error = responseOrError.error().get();
        result.hits().addError(ErrorMessage.createBackendCommunicationError(error));
        log.log(Level.WARNING, "Error fetching summary data: " + error);
        this.hasReportedError = true;
        return 0;
    }

    private void addErrors(Result result, Inspector errors) {
        errors.traverse((index, value) -> {
            int errorCode = "timeout".equalsIgnoreCase(value.field("type").asString()) ? Error.TIMEOUT.code : Error.UNSPECIFIED.code;
            result.hits().addError(new ErrorMessage(errorCode, value.field("message").asString(), value.field("details").asString()));
        });
    }

    private void convertErrorsFromDocsumReply(Result target, List<SearchProtocol.Error> errors) {
        for (SearchProtocol.Error error : errors) {
            target.hits().addError(ErrorMessage.createDocsumReplyError(error.getMessage()));
        }
    }

    private int fill(Result result, List<FastHit> hits, String summaryClass, byte[] payload) {
        try {
            boolean hasErrors;
            SearchProtocol.DocsumReply protobuf = SearchProtocol.DocsumReply.parseFrom((byte[])payload);
            Inspector root = this.decodePolicy == DecodePolicy.ONDEMAND ? BinaryView.inspect((byte[])protobuf.getSlimeSummaries().toByteArray()) : BinaryFormat.decode((byte[])protobuf.getSlimeSummaries().toByteArray()).get();
            Inspector errors = root.field("errors");
            boolean bl = hasErrors = errors.valid() && errors.entries() > 0;
            if (hasErrors) {
                this.addErrors(result, errors);
            }
            this.convertErrorsFromDocsumReply(result, protobuf.getErrorsList());
            SlimeAdapter summaries = new SlimeAdapter(root.field("docsums"));
            if (!summaries.valid()) {
                return 0;
            }
            int skippedHits = 0;
            for (int i = 0; i < hits.size(); ++i) {
                com.yahoo.data.access.Inspector summary = summaries.entry(i).field("docsum");
                if (summary.valid()) {
                    hits.get(i).setField("sddocname", this.documentDb.schema().name());
                    hits.get(i).addSummary(this.documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary);
                    hits.get(i).setFilled(summaryClass);
                    continue;
                }
                ++skippedHits;
            }
            return skippedHits;
        }
        catch (InvalidProtocolBufferException ex) {
            log.log(Level.WARNING, "Invalid response to docsum request", ex);
            result.hits().addError(ErrorMessage.createInternalServerError("Invalid response to docsum request from backend"));
            return 0;
        }
    }

    private void throwTimeout() throws TimeoutException {
        throw new TimeoutException("Timed out waiting for summary data. " + this.outstandingResponses + " responses outstanding.");
    }

    static enum DecodePolicy {
        EAGER,
        ONDEMAND;

    }
}

