/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.persisting.search.client;

import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.Metadata;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.search.CreateAuditTrail;
import io.fluxcapacitor.common.api.search.DocumentStats;
import io.fluxcapacitor.common.api.search.DocumentUpdate;
import io.fluxcapacitor.common.api.search.FacetEntry;
import io.fluxcapacitor.common.api.search.FacetStats;
import io.fluxcapacitor.common.api.search.GetDocument;
import io.fluxcapacitor.common.api.search.GetSearchHistogram;
import io.fluxcapacitor.common.api.search.HasDocument;
import io.fluxcapacitor.common.api.search.SearchDocuments;
import io.fluxcapacitor.common.api.search.SearchHistogram;
import io.fluxcapacitor.common.api.search.SearchQuery;
import io.fluxcapacitor.common.api.search.SerializedDocument;
import io.fluxcapacitor.common.search.Document;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.persisting.search.SearchHit;
import io.fluxcapacitor.javaclient.persisting.search.client.SearchClient;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import lombok.Generated;

public class InMemorySearchStore
implements SearchClient {
    protected static final Function<SerializedDocument, String> identifier = d -> InMemorySearchStore.asIdentifier(d.getCollection(), d.getId());
    private final Map<String, SerializedDocument> documents = new ConcurrentHashMap<String, SerializedDocument>();
    private final AtomicLong nextIndex = new AtomicLong();
    private final Map<String, ConcurrentSkipListMap<Long, SerializedMessage>> messageLogs = new ConcurrentHashMap<String, ConcurrentSkipListMap<Long, SerializedMessage>>();
    private final List<BiConsumer<String, List<SerializedMessage>>> monitors = new CopyOnWriteArrayList<BiConsumer<String, List<SerializedMessage>>>();
    private Duration retentionTime;

    protected static String asIdentifier(String collection, String documentId) {
        return collection + "/" + documentId;
    }

    @Override
    public CompletableFuture<Void> index(List<SerializedDocument> documents, Guarantee guarantee, boolean ifNotExists) {
        Map updates = documents.stream().collect(Collectors.toMap(identifier, Function.identity(), (a, b) -> b, LinkedHashMap::new));
        if (ifNotExists) {
            updates.keySet().removeAll(this.documents.keySet());
        }
        this.documents.putAll(updates);
        this.storeMessages(updates);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public Stream<SearchHit<SerializedDocument>> search(SearchDocuments searchDocuments, int fetchSize) {
        SearchQuery query = searchDocuments.getQuery();
        Stream<SerializedDocument> documentStream = this.documents.values().stream().filter(query::matches);
        documentStream = documentStream.sorted(Comparator.comparing(SerializedDocument::deserializeDocument, Document.createComparator(searchDocuments)));
        if (!searchDocuments.getPathFilters().isEmpty()) {
            Predicate<Document.Path> pathFilter = searchDocuments.computePathFilter();
            documentStream = documentStream.map(d -> d.deserializeDocument().filterPaths(pathFilter)).map(SerializedDocument::new);
        }
        if (searchDocuments.getSkip() > 0) {
            documentStream = documentStream.skip(searchDocuments.getSkip());
        }
        if (searchDocuments.getLastHit() != null) {
            documentStream = documentStream.dropWhile(d -> !d.getId().equals(searchDocuments.getLastHit().getId())).skip(1L);
        }
        if (searchDocuments.getMaxSize() != null) {
            documentStream = documentStream.limit(searchDocuments.getMaxSize().intValue());
        }
        return documentStream.map(SearchHit::fromDocument);
    }

    @Override
    public boolean documentExists(HasDocument r) {
        return Optional.ofNullable(this.documents.get(InMemorySearchStore.asIdentifier(r.getCollection(), r.getId()))).isPresent();
    }

    @Override
    public Optional<SerializedDocument> fetch(GetDocument r) {
        return Optional.ofNullable(this.documents.get(InMemorySearchStore.asIdentifier(r.getCollection(), r.getId())));
    }

    @Override
    public CompletableFuture<Void> delete(SearchQuery query, Guarantee guarantee) {
        this.documents.values().removeIf(query::matches);
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> delete(String documentId, String collection, Guarantee guarantee) {
        this.documents.remove(InMemorySearchStore.asIdentifier(collection, documentId));
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> createAuditTrail(CreateAuditTrail request) {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public CompletableFuture<Void> deleteCollection(String collection, Guarantee guarantee) {
        this.documents.values().removeIf(d -> Objects.equals(collection, d.getCollection()));
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public List<DocumentStats> fetchStatistics(SearchQuery query, List<String> fields2, List<String> groupBy) {
        return DocumentStats.compute(this.documents.values().stream().filter(query::matches).map(SerializedDocument::deserializeDocument), fields2, groupBy);
    }

    @Override
    public SearchHistogram fetchHistogram(GetSearchHistogram request) {
        SearchQuery query = request.getQuery();
        List<Long> results = IntStream.range(0, request.getResolution()).mapToLong(i -> 0L).boxed().collect(Collectors.toList());
        if (query.getSince() == null) {
            return new SearchHistogram(null, query.getBefore(), results);
        }
        if (query.getBefore() == null) {
            query = query.toBuilder().before(Instant.now()).build();
        }
        long min = query.getSince().toEpochMilli();
        long delta = query.getBefore().toEpochMilli() - min;
        long step = Math.min(1L, delta / (long)request.getResolution());
        this.search(SearchDocuments.builder().query(query).build(), -1).map(h -> ((SerializedDocument)h.getValue()).deserializeDocument()).collect(Collectors.groupingBy(d -> (d.getTimestamp().toEpochMilli() - min) / step)).forEach((bucket, hits) -> results.set(bucket.intValue(), Long.valueOf(hits.size())));
        return new SearchHistogram(query.getSince(), query.getBefore(), results);
    }

    @Override
    public List<FacetStats> fetchFacetStats(SearchQuery query) {
        return this.documents.values().stream().filter(query::matches).flatMap(d -> d.getFacets().stream()).collect(Collectors.groupingBy(Function.identity(), TreeMap::new, Collectors.toList())).values().stream().map(group -> {
            FacetEntry first = (FacetEntry)group.getFirst();
            return new FacetStats(first.getName(), first.getValue(), group.size());
        }).sorted(Comparator.comparing(FacetStats::getCount).reversed()).toList();
    }

    @Override
    public CompletableFuture<Void> bulkUpdate(Collection<DocumentUpdate> updates, Guarantee guarantee) {
        updates.stream().collect(Collectors.groupingBy(DocumentUpdate::getType)).forEach((type, list) -> {
            switch (type) {
                case delete: {
                    list.forEach(u -> this.delete(u.getId(), u.getCollection(), guarantee));
                    break;
                }
                case index: {
                    this.index(list.stream().map(DocumentUpdate::getObject).toList(), guarantee, false);
                    break;
                }
                case indexIfNotExists: {
                    this.index(list.stream().map(DocumentUpdate::getObject).toList(), guarantee, true);
                }
            }
        });
        return CompletableFuture.completedFuture(null);
    }

    public Stream<SerializedMessage> openStream(String collection, Long lastIndex, int maxSize) {
        ConcurrentSkipListMap<Long, SerializedMessage> map = this.messageLogs.get(collection);
        if (map == null) {
            return Stream.empty();
        }
        lastIndex = lastIndex == null ? -1L : lastIndex;
        return map.tailMap((Object)lastIndex, false).values().stream().limit(maxSize);
    }

    protected synchronized void storeMessages(Map<String, SerializedDocument> updates) {
        if (!this.monitors.isEmpty()) {
            Map byCollection = updates.values().stream().collect(Collectors.groupingBy(SerializedDocument::getCollection, Collectors.mapping(this::asSerializedMessage, Collectors.toList())));
            try {
                byCollection.forEach((collection, messages) -> {
                    ConcurrentSkipListMap log = this.messageLogs.computeIfAbsent((String)collection, c -> new ConcurrentSkipListMap());
                    messages.forEach(m -> log.put(m.getIndex(), m));
                });
                if (this.retentionTime != null) {
                    this.purgeExpiredMessages(this.retentionTime);
                }
            }
            finally {
                byCollection.forEach(this::notifyMonitors);
            }
        }
    }

    protected SerializedMessage asSerializedMessage(SerializedDocument document) {
        long index = this.nextIndex.updateAndGet(IndexUtils::nextIndex);
        Metadata metadata = Metadata.of("$start", document.getTimestamp(), "$end", document.getEnd());
        SerializedMessage result = new SerializedMessage(document.getDocument(), metadata, document.getId(), IndexUtils.millisFromIndex(index));
        result.setIndex(index);
        return result;
    }

    protected void purgeExpiredMessages(Duration messageExpiration) {
        long threshold = FluxCapacitor.currentTime().minus(messageExpiration).toEpochMilli();
        this.messageLogs.values().forEach(messageLog -> messageLog.headMap((Object)IndexUtils.maxIndexFromMillis(threshold), true).clear());
    }

    protected void notifyMonitors(String collection, List<SerializedMessage> messages) {
        this.notifyAll();
        this.monitors.forEach(m -> m.accept(collection, messages));
    }

    public synchronized Registration registerMonitor(BiConsumer<String, List<SerializedMessage>> monitor) {
        this.monitors.add(monitor);
        return () -> this.monitors.remove(monitor);
    }

    public Registration registerMonitor(String collection, Consumer<List<SerializedMessage>> monitor) {
        return this.registerMonitor((c, messages) -> {
            if (Objects.equals(collection, c)) {
                monitor.accept((List<SerializedMessage>)messages);
            }
        });
    }

    @Override
    public void close() {
    }

    @ConstructorProperties(value={"retentionTime"})
    @Generated
    public InMemorySearchStore(Duration retentionTime) {
        this.retentionTime = retentionTime;
    }

    @Generated
    public Duration getRetentionTime() {
        return this.retentionTime;
    }

    @Generated
    public void setRetentionTime(Duration retentionTime) {
        this.retentionTime = retentionTime;
    }
}

