/*
 * Decompiled with CFR 0.152.
 */
package net.pincette.jes.util;

import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.Aggregates;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Sorts;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.reactivestreams.client.ClientSession;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.json.JsonObject;
import javax.json.JsonValue;
import net.pincette.jes.util.Command;
import net.pincette.jes.util.Event;
import net.pincette.jes.util.Href;
import net.pincette.jes.util.Reducer;
import net.pincette.jes.util.Util;
import net.pincette.json.JsonUtil;
import net.pincette.json.Transform;
import net.pincette.mongo.BsonUtil;
import net.pincette.mongo.Collection;
import net.pincette.mongo.JsonClient;
import net.pincette.mongo.Patch;
import net.pincette.rs.Chain;
import net.pincette.rs.Mapper;
import net.pincette.util.Collections;
import net.pincette.util.Pair;
import net.pincette.util.State;
import net.pincette.util.StreamUtil;
import org.bson.BsonDocument;
import org.bson.BsonElement;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.BsonType;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;

public class Mongo {
    public static final Bson NOT_DELETED = Filters.ne("_deleted", true);
    private static final String FIELD_ID = "id";
    private static final String EVENT_ID = "_id.id";
    private static final String FIELD_SEQ = "seq";
    private static final String EVENT_SEQ = "_id.seq";
    private static final Bson SORT_EVENTS = Aggregates.sort(Sorts.ascending("_id.seq"));
    private static final String HREF = "href";
    private static final List<Bson> OLD_EVENTS = Collections.list(Aggregates.match(Filters.and(Filters.exists("_seq"), Filters.type("_id", BsonType.STRING))), Aggregates.sort(Sorts.descending("_timestamp")));
    private static final String RESOLVED = "_resolved";
    private static final String SET = "$set";

    private Mongo() {
    }

    public static Bson addNotDeleted(Bson query) {
        return Filters.and(Collections.list(query, NOT_DELETED));
    }

    static String collection(JsonObject json, String environment) {
        return json.getString("_type") + Mongo.collectionInfix(json) + Mongo.suffix(environment);
    }

    static String collection(Href href, String environment) {
        return href.type + Mongo.suffix(environment);
    }

    private static String collectionInfix(JsonObject json) {
        Supplier<String> tryCommand = () -> Command.isCommand(json) ? "-command" : "";
        return Event.isEvent(json) ? "-event" : tryCommand.get();
    }

    private static String eventCollection(String type, String environment) {
        return type + "-event" + Mongo.suffix(environment);
    }

    public static BsonDocument eventToMongo(JsonObject event) {
        return new BsonDocument(Stream.concat(Stream.of(new BsonElement("_id", Mongo.mongoEventKey(event))), event.entrySet().stream().filter(e -> !((String)e.getKey()).equals("_id") && !((String)e.getKey()).equals("_seq")).map(e -> new BsonElement((String)e.getKey(), BsonUtil.fromJson((JsonValue)e.getValue())))).collect(Collectors.toList()));
    }

    public static Publisher<JsonObject> events(String id, String type, String environment, DbContext events) {
        return Mongo.mongoToEvents(JsonClient.aggregationPublisher(events.database.getCollection(Mongo.eventCollection(type, environment)), events.session, Collections.list(Aggregates.match(Filters.eq(EVENT_ID, id)), SORT_EVENTS), null));
    }

    public static Publisher<JsonObject> events(JsonObject snapshot, String environment, DbContext events) {
        return Mongo.mongoToEvents(JsonClient.aggregationPublisher(events.database.getCollection(Mongo.eventCollection(snapshot.getString("_type"), environment)), events.session, Collections.list(Aggregates.match(Filters.and(Filters.eq(EVENT_ID, snapshot.getString("_id")), Filters.gt(EVENT_SEQ, snapshot.getInt("_seq", -1)))), SORT_EVENTS), null));
    }

    private static CompletionStage<Map<Href, JsonObject>> fetchHrefs(Set<Href> hrefs, String environment, MongoDatabase database) {
        return StreamUtil.composeAsyncStream(hrefs.stream().map(href -> Mongo.findHref(href, environment, database).thenApply(j -> Pair.pair(href, j.orElse(null))))).thenApply(stream -> stream.filter(pair -> pair.second != null).collect(Collectors.toMap(pair -> (Href)pair.first, pair -> (JsonObject)pair.second)));
    }

    public static CompletionStage<Optional<JsonObject>> findHref(Href href, String environment, MongoDatabase database) {
        return JsonClient.findOne(database.getCollection(Mongo.collection(href, environment)), Filters.eq("_id", href.id));
    }

    private static JsonObject fixId(JsonObject event) {
        return JsonUtil.createObjectBuilder(event).add("_id", Mongo.stripSequence(event.getString("_id"))).build();
    }

    private static boolean hrefOnly(JsonObject json) {
        return json.containsKey(HREF) && json.size() == 1;
    }

    private static Set<Href> hrefs(Stream<JsonObject> json) {
        return json.flatMap(JsonUtil::nestedObjects).filter(Mongo::hrefOnly).map(j -> new Href(j.getString(HREF))).collect(Collectors.toSet());
    }

    public static CompletionStage<Boolean> insert(JsonObject json, String collection, MongoDatabase database) {
        return JsonClient.insert(database.getCollection(collection), json);
    }

    public static CompletionStage<Boolean> insert(JsonObject json, String collection, MongoDatabase database, ClientSession session) {
        return JsonClient.insert(database.getCollection(collection), json, session);
    }

    public static CompletionStage<Boolean> insertJson(JsonObject json, String environment, MongoDatabase database) {
        return Mongo.insertJson(json, environment, database, null);
    }

    public static CompletionStage<Boolean> insertJson(JsonObject json, String environment, MongoDatabase database, ClientSession session) {
        return JsonClient.insert(database.getCollection(Mongo.collection(json, environment)), json, session);
    }

    public static CompletionStage<JsonObject> reconstruct(String id, String type, String environment, DbContext events) {
        return net.pincette.rs.Reducer.reduce(Mongo.events(id, type, environment, events), JsonUtil::emptyObject, Event::applyEvent);
    }

    private static BsonDocument mongoEventKey(JsonObject event) {
        return new BsonDocument(Collections.list(new BsonElement(FIELD_ID, new BsonString(event.getString("_id"))), new BsonElement(FIELD_SEQ, new BsonInt32(event.getInt("_seq")))));
    }

    public static JsonObject mongoToEvent(JsonObject event) {
        return JsonUtil.createObjectBuilder(event).remove("_id").add("_id", event.getValue(JsonUtil.toJsonPointer(EVENT_ID))).add("_seq", event.getValue(JsonUtil.toJsonPointer(EVENT_SEQ))).build();
    }

    private static Publisher<JsonObject> mongoToEvents(Publisher<JsonObject> mongoEvents) {
        return Chain.with(mongoEvents).map(Mongo::mongoToEvent).get();
    }

    public static CompletionStage<JsonObject> reconstruct(JsonObject snapshot, String environment, DbContext events) {
        return net.pincette.rs.Reducer.reduce(Mongo.events(snapshot, environment, events), () -> snapshot, Event::applyEvent);
    }

    public static Publisher<JsonObject> reconstructionPublisher(Publisher<JsonObject> events) {
        return Mongo.reconstructionPublisher(events, null);
    }

    public static Publisher<JsonObject> reconstructionPublisher(Publisher<JsonObject> events, JsonObject snapshot) {
        return net.pincette.rs.Util.subscribe(events, new Mapper<JsonObject, JsonObject>(Event.applyEvent(snapshot)));
    }

    private static CompletionStage<Boolean> replaceEvent(MongoCollection<BsonDocument> collection, JsonObject event) {
        return Collection.deleteOne(collection, Filters.eq("_id", event.getString("_id"))).thenApply(DeleteResult::wasAcknowledged).thenApply(result -> net.pincette.util.Util.must(result, r -> r)).thenComposeAsync(result -> Collection.insertOne(collection, Mongo.eventToMongo(Mongo.fixId(event)))).thenApply(InsertOneResult::wasAcknowledged).thenApply(result -> net.pincette.util.Util.must(result, r -> r));
    }

    public static CompletionStage<JsonObject> resolve(JsonObject json, String environment, MongoDatabase database) {
        return Mongo.resolve(Collections.list(json), environment, database).thenApply(list -> (JsonObject)list.get(0));
    }

    public static CompletionStage<List<JsonObject>> resolve(List<JsonObject> json, String environment, MongoDatabase database) {
        return Optional.of(Mongo.hrefs(json.stream())).filter(hrefs -> !hrefs.isEmpty()).map(hrefs -> Mongo.fetchHrefs(hrefs, environment, database).thenApply(map -> json.stream().map(j -> Mongo.resolve(j, (Map<Href, JsonObject>)map)).collect(Collectors.toList()))).orElseGet(() -> CompletableFuture.completedFuture(json));
    }

    private static JsonObject resolve(JsonObject json, Map<Href, JsonObject> fetchedHrefs) {
        return Transform.transform(json, new Transform.Transformer(entry -> JsonUtil.isObject(entry.value) && Mongo.hrefOnly(entry.value.asJsonObject()), entry -> Optional.of(Mongo.resolve(entry, fetchedHrefs))));
    }

    private static Transform.JsonEntry resolve(Transform.JsonEntry entry, Map<Href, JsonObject> fetchedHrefs) {
        return new Transform.JsonEntry(entry.path, Optional.ofNullable(fetchedHrefs.get(new Href(entry.value.asJsonObject().getString(HREF)))).map(fetched -> JsonUtil.add(JsonUtil.createObjectBuilder(entry.value.asJsonObject()).add(RESOLVED, true), fetched).build()).orElse(entry.value.asJsonObject()));
    }

    public static CompletionStage<JsonObject> restore(String id, String type, String environment, DbContext aggregates, DbContext events) {
        return Mongo.reconstruct(id, type, environment, events).thenComposeAsync(json -> Mongo.restoreReconstructed(json, environment, aggregates.database, aggregates.session));
    }

    public static CompletionStage<JsonObject> restore(JsonObject snapshot, String environment, DbContext aggregates, DbContext events) {
        return Mongo.reconstruct(snapshot, environment, events).thenComposeAsync(json -> json.getInt("_seq") > snapshot.getInt("_seq") ? Mongo.restoreReconstructed(json, environment, aggregates.database, aggregates.session) : CompletableFuture.completedFuture(snapshot));
    }

    private static CompletionStage<JsonObject> restoreReconstructed(JsonObject json, String environment, MongoDatabase database, ClientSession session) {
        return Util.isManagedObject(json) ? Mongo.update(json, environment, database, session).thenApply(result -> net.pincette.util.Util.must(result, r -> r)).thenApply(result -> json) : CompletableFuture.completedFuture(JsonUtil.emptyObject());
    }

    private static String stripSequence(String id) {
        return Optional.of(id.lastIndexOf(45)).filter(index -> index != -1).map(index -> id.substring(0, (int)index)).filter(net.pincette.util.Util::isUUID).orElse(id);
    }

    private static String suffix(String environment) {
        return environment != null ? "-" + environment : "";
    }

    private static JsonObject technicalUpdateOperator(JsonObject event) {
        return JsonUtil.createObjectBuilder().add(SET, JsonUtil.createObjectBuilder().add("_seq", event.getInt("_seq")).add("_corr", event.getString("_corr")).add("_timestamp", event.getJsonNumber("_timestamp"))).build();
    }

    public static JsonObject unresolve(JsonObject aggregate) {
        return Transform.transform(aggregate, new Transform.Transformer(entry -> JsonUtil.isObject(entry.value) && entry.value.asJsonObject().containsKey(RESOLVED), entry -> Optional.of(new Transform.JsonEntry(entry.path, JsonUtil.copy(entry.value.asJsonObject(), JsonUtil.createObjectBuilder(), key -> key.equals(HREF)).build()))));
    }

    public static CompletionStage<Boolean> update(JsonObject json, String environment, MongoDatabase database) {
        return Mongo.update(json, environment, database, null);
    }

    public static CompletionStage<Boolean> update(JsonObject json, String environment, MongoDatabase database, ClientSession session) {
        return JsonClient.update(database.getCollection(Mongo.collection(json, environment)), json, json.getString("_id"), session);
    }

    public static CompletionStage<Boolean> update(JsonObject json, String id, String collection, MongoDatabase database) {
        return JsonClient.update(database.getCollection(collection), json, id);
    }

    public static CompletionStage<Boolean> update(JsonObject json, String id, String collection, MongoDatabase database, ClientSession session) {
        return JsonClient.update(database.getCollection(collection), json, id, session);
    }

    public static CompletionStage<Boolean> updateAggregate(MongoCollection<Document> collection, JsonObject currentState, JsonObject event) {
        return Mongo.updateAggregate(collection, currentState, event, null);
    }

    public static CompletionStage<Boolean> updateAggregate(MongoCollection<Document> collection, JsonObject currentState, JsonObject event, ClientSession session) {
        Bson filter = Filters.eq("_id", currentState.getString("_id"));
        List operators = Stream.concat(Stream.of(Mongo.technicalUpdateOperator(event)), Patch.updateOperators(currentState, event.getJsonArray("_ops").stream().filter(JsonUtil::isObject).map(JsonValue::asJsonObject))).map(op -> new UpdateOneModel(filter, BsonUtil.fromJson(op))).collect(Collectors.toList());
        BulkWriteOptions options = new BulkWriteOptions().ordered(true);
        return Collection.exec(collection, c -> session != null ? c.bulkWrite(session, operators, options) : c.bulkWrite(operators, options)).thenApply(BulkWriteResult::wasAcknowledged).thenApply(result -> net.pincette.util.Util.must(result, r -> r));
    }

    public static CompletionStage<Boolean> upgradeEventLog(String type, String environment, MongoDatabase database, LongConsumer progress) {
        MongoCollection<BsonDocument> collection = database.getCollection(Mongo.eventCollection(type, environment), BsonDocument.class);
        State<Long> count = new State<Long>(0L);
        return net.pincette.rs.Reducer.reduce(Chain.with(JsonClient.aggregationPublisher(database.getCollection(Mongo.eventCollection(type, environment)), OLD_EVENTS)).per(1000).map(events -> {
            if (progress != null) {
                progress.accept(count.set((Long)count.get() + (long)events.size()));
            }
            return events;
        }).mapAsync(events -> StreamUtil.composeAsyncStream(events.stream().map(ev -> Mongo.replaceEvent(collection, ev))).thenApply(results -> results.reduce(true, (r1, r2) -> r1 != false && r2 != false)).exceptionally(e -> {
            Logger.getGlobal().log(Level.SEVERE, e.getMessage(), (Throwable)e);
            return false;
        })).get(), (r1, r2) -> r1 != false && r2 != false).thenApply(result -> result.orElse(true));
    }

    public static Reducer withResolver(Reducer reducer, String environment, MongoDatabase database) {
        return (command, state) -> Mongo.resolve(Collections.list(command, state), environment, database).thenComposeAsync(resolved -> (CompletionStage)reducer.apply((JsonObject)resolved.get(0), (JsonObject)resolved.get(1))).thenApply(Mongo::unresolve);
    }

    public static class DbContext {
        final MongoDatabase database;
        final ClientSession session;

        public DbContext(MongoDatabase database, ClientSession session) {
            this.database = database;
            this.session = session;
        }
    }
}

