/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.elasticsearch;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.elasticsearch.ElasticSearchClient;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
import org.apache.pulsar.io.elasticsearch.JsonConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="elastic_search", type=IOType.SINK, help="A sink connector that sends pulsar messages to elastic search", configClass=ElasticSearchConfig.class)
public class ElasticSearchSink
implements Sink<GenericObject> {
    private static final Logger log = LoggerFactory.getLogger(ElasticSearchSink.class);
    private ElasticSearchConfig elasticSearchConfig;
    private ElasticSearchClient elasticsearchClient;
    private ObjectMapper objectMapper = new ObjectMapper();
    private List<String> primaryFields = null;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.elasticSearchConfig = ElasticSearchConfig.load(config);
        this.elasticSearchConfig.validate();
        this.elasticsearchClient = new ElasticSearchClient(this.elasticSearchConfig);
        if (!Strings.isNullOrEmpty((String)this.elasticSearchConfig.getPrimaryFields())) {
            this.primaryFields = Arrays.asList(this.elasticSearchConfig.getPrimaryFields().split(","));
        }
    }

    public void close() {
        if (this.elasticsearchClient != null) {
            this.elasticsearchClient.close();
            this.elasticsearchClient = null;
        }
    }

    public void write(Record<GenericObject> record) throws Exception {
        if (!this.elasticsearchClient.isFailed()) {
            Pair<String, String> idAndDoc = this.extractIdAndDocument(record);
            try {
                if (log.isDebugEnabled()) {
                    log.debug("index doc {} {}", idAndDoc.getLeft(), idAndDoc.getRight());
                }
                if (idAndDoc.getRight() == null) {
                    switch (this.elasticSearchConfig.getNullValueAction()) {
                        case DELETE: {
                            if (idAndDoc.getLeft() == null) break;
                            if (this.elasticSearchConfig.isBulkEnabled()) {
                                this.elasticsearchClient.bulkDelete(record, (String)idAndDoc.getLeft());
                                break;
                            }
                            this.elasticsearchClient.deleteDocument(record, (String)idAndDoc.getLeft());
                            break;
                        }
                        case IGNORE: {
                            break;
                        }
                        case FAIL: {
                            this.elasticsearchClient.failed((Exception)new PulsarClientException.InvalidMessageException("Unexpected null message value"));
                            throw this.elasticsearchClient.irrecoverableError.get();
                        }
                    }
                }
                if (this.elasticSearchConfig.isBulkEnabled()) {
                    this.elasticsearchClient.bulkIndex(record, idAndDoc);
                }
                this.elasticsearchClient.indexDocument(record, idAndDoc);
            }
            catch (JsonProcessingException jsonProcessingException) {
                switch (this.elasticSearchConfig.getMalformedDocAction()) {
                    case IGNORE: {
                        break;
                    }
                    case WARN: {
                        log.warn("Ignoring malformed document messageId={}", record.getMessage().map(Message::getMessageId).orElse(null), (Object)jsonProcessingException);
                        this.elasticsearchClient.failed((Exception)((Object)jsonProcessingException));
                        throw jsonProcessingException;
                    }
                    case FAIL: {
                        log.error("Malformed document messageId={}", record.getMessage().map(Message::getMessageId).orElse(null), (Object)jsonProcessingException);
                        this.elasticsearchClient.failed((Exception)((Object)jsonProcessingException));
                        throw jsonProcessingException;
                    }
                }
            }
            catch (Exception e) {
                log.error("write error for {} {}:", new Object[]{idAndDoc.getLeft(), idAndDoc.getRight(), e});
                throw e;
            }
        } else {
            throw new IllegalStateException("Elasticsearch client is in FAILED status");
        }
    }

    @VisibleForTesting
    ElasticSearchClient getElasticsearchClient() {
        return this.elasticsearchClient;
    }

    public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) throws JsonProcessingException {
        if (this.elasticSearchConfig.isSchemaEnable()) {
            Object key = null;
            GenericObject value = null;
            Schema keySchema = null;
            Schema valueSchema = null;
            if (record.getSchema() != null && record.getSchema() instanceof KeyValueSchema) {
                KeyValueSchema keyValueSchema = (KeyValueSchema)record.getSchema();
                keySchema = keyValueSchema.getKeySchema();
                valueSchema = keyValueSchema.getValueSchema();
                KeyValue keyValue = (KeyValue)((GenericObject)record.getValue()).getNativeObject();
                key = keyValue.getKey();
                value = (GenericObject)keyValue.getValue();
            } else {
                key = record.getKey().orElse(null);
                valueSchema = record.getSchema();
                value = (GenericObject)record.getValue();
            }
            String id = null;
            if (!this.elasticSearchConfig.isKeyIgnore() && key != null && keySchema != null) {
                id = this.stringifyKey(keySchema, key);
            }
            String doc = null;
            if (value != null) {
                doc = valueSchema != null ? this.stringifyValue(valueSchema, value) : (value.getNativeObject() instanceof byte[] ? new String((byte[])value.getNativeObject(), StandardCharsets.UTF_8) : value.getNativeObject().toString());
            }
            if (doc != null && this.primaryFields != null) {
                try {
                    JsonNode jsonNode = this.objectMapper.readTree(doc);
                    id = this.stringifyKey(jsonNode, this.primaryFields);
                }
                catch (JsonProcessingException e) {
                    log.error("Failed to read JSON", (Throwable)e);
                    throw e;
                }
            }
            if (log.isDebugEnabled()) {
                SchemaType schemaType = null;
                if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
                    schemaType = record.getSchema().getSchemaInfo().getType();
                }
                log.debug("recordType={} schemaType={} id={} doc={}", new Object[]{record.getClass().getName(), schemaType, id, doc});
            }
            return Pair.of((Object)id, (Object)doc);
        }
        return Pair.of(null, (Object)new String(((Message)record.getMessage().orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"))).getData(), StandardCharsets.UTF_8));
    }

    public String stringifyKey(Schema<?> schema, Object val) throws JsonProcessingException {
        switch (schema.getSchemaInfo().getType()) {
            case INT8: {
                return Byte.toString((Byte)val);
            }
            case INT16: {
                return Short.toString((Short)val);
            }
            case INT32: {
                return Integer.toString((Integer)val);
            }
            case INT64: {
                return Long.toString((Long)val);
            }
            case STRING: {
                return (String)val;
            }
            case JSON: 
            case AVRO: {
                return this.stringifyKey(ElasticSearchSink.extractJsonNode(schema, val));
            }
        }
        throw new UnsupportedOperationException("Unsupported key schemaType=" + schema.getSchemaInfo().getType());
    }

    public String stringifyKey(JsonNode jsonNode) throws JsonProcessingException {
        ArrayList<String> fields = new ArrayList<String>();
        jsonNode.fieldNames().forEachRemaining(fields::add);
        return this.stringifyKey(jsonNode, fields);
    }

    public String stringifyKey(JsonNode jsonNode, List<String> fields) throws JsonProcessingException {
        if (fields.size() == 1) {
            JsonNode singleNode = jsonNode.get(fields.get(0));
            String id = this.objectMapper.writeValueAsString((Object)singleNode);
            return id.startsWith("\"") && id.endsWith("\"") ? id.substring(1, id.length() - 1) : id;
        }
        return JsonConverter.toJsonArray(jsonNode, fields).toString();
    }

    public String stringifyValue(Schema<?> schema, Object val) throws JsonProcessingException {
        JsonNode jsonNode = ElasticSearchSink.extractJsonNode(schema, val);
        return this.elasticSearchConfig.isStripNulls() ? this.objectMapper.writeValueAsString((Object)ElasticSearchSink.stripNullNodes(jsonNode)) : this.objectMapper.writeValueAsString((Object)jsonNode);
    }

    public static JsonNode stripNullNodes(JsonNode node) {
        Iterator it = node.iterator();
        while (it.hasNext()) {
            JsonNode child = (JsonNode)it.next();
            if (child.isNull()) {
                it.remove();
                continue;
            }
            ElasticSearchSink.stripNullNodes(child);
        }
        return node;
    }

    public static JsonNode extractJsonNode(Schema<?> schema, Object val) {
        switch (schema.getSchemaInfo().getType()) {
            case JSON: {
                return (JsonNode)((org.apache.pulsar.client.api.schema.GenericRecord)val).getNativeObject();
            }
            case AVRO: {
                GenericRecord node = (GenericRecord)((org.apache.pulsar.client.api.schema.GenericRecord)val).getNativeObject();
                return JsonConverter.toJson(node);
            }
        }
        throw new UnsupportedOperationException("Unsupported value schemaType=" + schema.getSchemaInfo().getType());
    }
}

