/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.mongodb.serde;

import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentSerializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.RowDataToBsonConverters;
import org.apache.seatunnel.connectors.seatunnel.mongodb.sink.MongodbWriterOptions;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;

public class RowDataDocumentSerializer
implements DocumentSerializer<SeaTunnelRow> {
    private final RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter;
    private final Boolean isUpsertEnable;
    private final Function<BsonDocument, BsonDocument> filterConditions;

    public RowDataDocumentSerializer(RowDataToBsonConverters.RowDataToBsonConverter rowDataToBsonConverter, MongodbWriterOptions options, Function<BsonDocument, BsonDocument> filterConditions) {
        this.rowDataToBsonConverter = rowDataToBsonConverter;
        this.isUpsertEnable = options.isUpsertEnable();
        this.filterConditions = filterConditions;
    }

    @Override
    public WriteModel<BsonDocument> serializeToWriteModel(SeaTunnelRow row) {
        BsonDocument bsonDocument = this.rowDataToBsonConverter.convert(row);
        if (this.isUpsertEnable.booleanValue()) {
            Bson filter = RowDataDocumentSerializer.generateFilter(this.filterConditions.apply(bsonDocument));
            bsonDocument.remove("_id");
            BsonDocument update = new BsonDocument("$set", bsonDocument);
            return new UpdateOneModel<BsonDocument>(filter, update, new UpdateOptions().upsert(true));
        }
        return new InsertOneModel<BsonDocument>(bsonDocument);
    }

    public static Bson generateFilter(BsonDocument filterConditions) {
        List<Bson> filters = filterConditions.entrySet().stream().map(entry -> Filters.eq((String)entry.getKey(), entry.getValue())).collect(Collectors.toList());
        return Filters.and(filters);
    }
}

