/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.persisting.search.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.api.search.BulkUpdateDocuments;
import io.fluxcapacitor.common.api.search.CreateAuditTrail;
import io.fluxcapacitor.common.api.search.DeleteCollection;
import io.fluxcapacitor.common.api.search.DeleteDocumentById;
import io.fluxcapacitor.common.api.search.DeleteDocuments;
import io.fluxcapacitor.common.api.search.DocumentStats;
import io.fluxcapacitor.common.api.search.GetDocument;
import io.fluxcapacitor.common.api.search.GetDocumentResult;
import io.fluxcapacitor.common.api.search.GetDocumentStats;
import io.fluxcapacitor.common.api.search.GetDocumentStatsResult;
import io.fluxcapacitor.common.api.search.GetSearchHistogram;
import io.fluxcapacitor.common.api.search.GetSearchHistogramResult;
import io.fluxcapacitor.common.api.search.IndexDocuments;
import io.fluxcapacitor.common.api.search.SearchDocuments;
import io.fluxcapacitor.common.api.search.SearchDocumentsResult;
import io.fluxcapacitor.common.api.search.SearchHistogram;
import io.fluxcapacitor.common.api.search.SearchQuery;
import io.fluxcapacitor.common.api.search.SerializedDocument;
import io.fluxcapacitor.common.api.search.SerializedDocumentUpdate;
import io.fluxcapacitor.common.search.Document;
import io.fluxcapacitor.javaclient.common.websocket.AbstractWebsocketClient;
import io.fluxcapacitor.javaclient.configuration.client.WebSocketClient;
import io.fluxcapacitor.javaclient.persisting.search.SearchHit;
import io.fluxcapacitor.javaclient.persisting.search.client.SearchClient;
import java.net.URI;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.websocket.ClientEndpoint;

@ClientEndpoint
public class WebSocketSearchClient
extends AbstractWebsocketClient
implements SearchClient {
    public static int maxFetchSize = 10000;

    public WebSocketSearchClient(String endPointUrl, WebSocketClient.ClientConfig clientConfig) {
        this(URI.create(endPointUrl), clientConfig);
    }

    public WebSocketSearchClient(URI endpointUri, WebSocketClient.ClientConfig clientConfig) {
        this(endpointUri, clientConfig, true);
    }

    public WebSocketSearchClient(URI endpointUri, WebSocketClient.ClientConfig clientConfig, boolean sendMetrics) {
        super(endpointUri, clientConfig, sendMetrics, clientConfig.getSearchSessions());
    }

    @Override
    public Awaitable index(List<Document> documents, Guarantee guarantee, boolean ifNotExists) {
        return this.sendCommand(new IndexDocuments(ObjectUtils.deduplicate(documents, Document.identityFunction).stream().map(SerializedDocument::new).collect(Collectors.toList()), ifNotExists, guarantee));
    }

    @Override
    public Awaitable bulkUpdate(Collection<SerializedDocumentUpdate> batch, Guarantee guarantee) {
        return this.sendCommand(new BulkUpdateDocuments(batch, guarantee));
    }

    @Override
    public Stream<SearchHit<Document>> search(SearchDocuments searchDocuments) {
        AtomicInteger count = new AtomicInteger();
        Integer maxSize = searchDocuments.getMaxSize();
        int fetchSize = maxSize == null ? maxFetchSize : Math.min(maxSize, maxFetchSize);
        SearchDocuments request = searchDocuments.toBuilder().maxSize(fetchSize).build();
        Stream<SearchHit> documentStream = ObjectUtils.iterate((SearchDocumentsResult)this.sendAndWait(request), result2 -> (SearchDocumentsResult)this.sendAndWait(request.toBuilder().maxSize(maxSize == null ? fetchSize : Math.min(maxSize - count.get(), fetchSize)).lastHit(result2.lastMatch()).build()), result2 -> result2.size() < fetchSize || maxSize != null && count.addAndGet(result2.size()) >= maxSize).flatMap(r -> r.getMatches().stream());
        if (maxSize != null) {
            documentStream = documentStream.limit(maxSize.intValue());
        }
        return documentStream.map(d -> new SearchHit<Document>(d.getId(), d.getCollection(), d.getTimestamp() == null ? null : Instant.ofEpochMilli(d.getTimestamp()), d.getEnd() == null ? null : Instant.ofEpochMilli(d.getEnd()), d::deserializeDocument));
    }

    @Override
    public Optional<Document> fetch(GetDocument request) {
        return Optional.ofNullable(((GetDocumentResult)this.sendAndWait(request)).getDocument()).map(SerializedDocument::deserializeDocument);
    }

    @Override
    public List<DocumentStats> fetchStatistics(SearchQuery query, List<String> fields2, List<String> groupBy) {
        GetDocumentStatsResult result2 = (GetDocumentStatsResult)this.sendAndWait(new GetDocumentStats(query, fields2, groupBy));
        return result2.getDocumentStats();
    }

    @Override
    public SearchHistogram fetchHistogram(GetSearchHistogram request) {
        GetSearchHistogramResult result2 = (GetSearchHistogramResult)this.sendAndWait(request);
        return result2.getHistogram();
    }

    @Override
    public Awaitable delete(SearchQuery query, Guarantee guarantee) {
        return this.sendCommand(new DeleteDocuments(query, guarantee));
    }

    @Override
    public Awaitable delete(String documentId, String collection, Guarantee guarantee) {
        return this.sendCommand(new DeleteDocumentById(collection, documentId, guarantee));
    }

    @Override
    public Awaitable deleteCollection(String collection, Guarantee guarantee) {
        return this.sendCommand(new DeleteCollection(collection, guarantee));
    }

    @Override
    public Awaitable createAuditTrail(CreateAuditTrail request) {
        return this.sendCommand(request);
    }
}

