/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.connect.kafka.handler.sink;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.java.analytics.AnalyticsOptions;
import com.couchbase.client.java.analytics.ReactiveAnalyticsResult;
import com.couchbase.client.java.json.JsonArray;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.connect.kafka.config.sink.CouchbaseSinkConfig;
import com.couchbase.connect.kafka.handler.sink.ConcurrencyHint;
import com.couchbase.connect.kafka.handler.sink.SinkAction;
import com.couchbase.connect.kafka.handler.sink.SinkDocument;
import com.couchbase.connect.kafka.handler.sink.SinkHandler;
import com.couchbase.connect.kafka.handler.sink.SinkHandlerContext;
import com.couchbase.connect.kafka.handler.sink.SinkHandlerParams;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.config.ConfigException;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

@Stability.Volatile
public class AnalyticsSinkHandler
implements SinkHandler {
    private static final Logger log = LoggerFactory.getLogger(AnalyticsSinkHandler.class);
    protected String bucketName;

    @Override
    public void init(SinkHandlerContext context) {
        CouchbaseSinkConfig config = ConfigHelper.parse(CouchbaseSinkConfig.class, context.configProperties());
        this.bucketName = config.bucket();
    }

    private static Pair<String, JsonArray> prepareWhereClauseForDelete(JsonObject documentKeys) {
        ArrayList values = new ArrayList();
        String whereClause = documentKeys.getNames().stream().map(key -> {
            values.add(documentKeys.get(key));
            return "`" + key + "`=?";
        }).collect(Collectors.joining(" AND "));
        return Pair.of((Object)whereClause, (Object)JsonArray.from(values));
    }

    protected static Pair<String, JsonArray> deleteQuery(String keySpace, JsonObject documentKeys) {
        Pair<String, JsonArray> whereClause = AnalyticsSinkHandler.prepareWhereClauseForDelete(documentKeys);
        return Pair.of((Object)("DELETE FROM " + keySpace + " WHERE " + (String)whereClause.getLeft() + ";"), (Object)((JsonArray)whereClause.getRight()));
    }

    protected static JsonObject getJsonObject(String object) {
        JsonObject node = null;
        try {
            node = JsonObject.fromJson((String)object);
        }
        catch (Exception e) {
            log.warn("could not generate analytics statement from node (not json)", (Throwable)e);
        }
        if (node != null && node.isEmpty()) {
            node = null;
            log.warn("could not generate analytics statement from empty node");
        }
        return node;
    }

    private String upsertStatement(String keySpace, JsonObject values) {
        return "UPSERT INTO " + keySpace + " ([" + values + "]);";
    }

    @Override
    public SinkAction handle(SinkHandlerParams params) {
        String documentKeys = this.getDocumentId(params);
        SinkDocument doc = params.document().orElse(null);
        String keySpace = AnalyticsSinkHandler.keyspace(this.bucketName, params.getScopeAndCollection().getScope(), params.getScopeAndCollection().getCollection());
        if (doc != null) {
            String docContent = new String(doc.content(), StandardCharsets.UTF_8);
            if (docContent.contains("`")) {
                log.warn("Could not generate Analytics N1QL UPSERT statement with backtick (`) in document content");
                return SinkAction.ignore();
            }
            JsonObject node = AnalyticsSinkHandler.getJsonObject(docContent);
            if (node == null) {
                return SinkAction.ignore();
            }
            String statement = this.upsertStatement(keySpace, node);
            Mono action = Mono.defer(() -> params.cluster().analyticsQuery(statement, AnalyticsOptions.analyticsOptions().parameters(node)).map(ReactiveAnalyticsResult::metaData));
            ConcurrencyHint concurrencyHint = ConcurrencyHint.of(documentKeys);
            return new SinkAction((Publisher<?>)action, concurrencyHint);
        }
        if (documentKeys.contains("`")) {
            log.warn("Could not generate Analytics N1QL DELETE statement with backtick (`) in field name");
            return SinkAction.ignore();
        }
        JsonObject documentKeysJson = AnalyticsSinkHandler.getJsonObject(documentKeys);
        if (documentKeysJson == null) {
            return SinkAction.ignore();
        }
        Pair<String, JsonArray> deleteQuery = AnalyticsSinkHandler.deleteQuery(keySpace, documentKeysJson);
        Mono action = Mono.defer(() -> params.cluster().analyticsQuery((String)deleteQuery.getLeft(), AnalyticsOptions.analyticsOptions().parameters((JsonArray)deleteQuery.getRight())).map(ReactiveAnalyticsResult::metaData));
        ConcurrencyHint concurrencyHint = ConcurrencyHint.of(documentKeys);
        return new SinkAction((Publisher<?>)action, concurrencyHint);
    }

    public String toString() {
        return "AnalyticsSinkHandler{, bucketName='" + this.bucketName + '\'' + '}';
    }

    protected static String keyspace(String bucketName, String scope, String collection) {
        if (scope.equals("") || collection.equals("")) {
            throw new ConfigException("Missing required configuration for scope and collection.");
        }
        String keySpace = "";
        if (bucketName != null && !bucketName.isEmpty()) {
            keySpace = keySpace + "`" + bucketName + "`.";
        }
        keySpace = keySpace + "`" + scope + "`.`" + collection + "`";
        return keySpace;
    }
}

