/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.elasticsearch;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;

@Connector(name="elastic_search", type=IOType.SINK, help="A sink connector that sends pulsar messages to elastic search", configClass=ElasticSearchConfig.class)
public class ElasticSearchSink
implements Sink<byte[]> {
    private URL url;
    private RestHighLevelClient client;
    private CredentialsProvider credentialsProvider;
    private ElasticSearchConfig elasticSearchConfig;

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        this.elasticSearchConfig = ElasticSearchConfig.load(config);
        this.elasticSearchConfig.validate();
        this.createIndexIfNeeded();
    }

    public void close() throws Exception {
        this.client.close();
    }

    public void write(Record<byte[]> record) {
        KeyValue<String, byte[]> keyValue = this.extractKeyValue(record);
        IndexRequest indexRequest = Requests.indexRequest((String)this.elasticSearchConfig.getIndexName());
        indexRequest.type(this.elasticSearchConfig.getTypeName());
        indexRequest.source((byte[])keyValue.getValue(), XContentType.JSON);
        try {
            IndexResponse indexResponse = this.getClient().index(indexRequest, RequestOptions.DEFAULT);
            if (indexResponse.getResult().equals((Object)DocWriteResponse.Result.CREATED)) {
                record.ack();
            } else {
                record.fail();
            }
        }
        catch (IOException ex) {
            record.fail();
        }
    }

    public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
        String key = record.getKey().orElse("");
        return new KeyValue((Object)key, record.getValue());
    }

    private void createIndexIfNeeded() throws IOException {
        GetIndexRequest request = new GetIndexRequest();
        request.indices(new String[]{this.elasticSearchConfig.getIndexName()});
        boolean exists = this.getClient().indices().exists(request, RequestOptions.DEFAULT);
        if (!exists) {
            CreateIndexRequest cireq = new CreateIndexRequest(this.elasticSearchConfig.getIndexName());
            cireq.settings(Settings.builder().put("index.number_of_shards", this.elasticSearchConfig.getIndexNumberOfShards()).put("index.number_of_replicas", this.elasticSearchConfig.getIndexNumberOfReplicas()));
            CreateIndexResponse ciresp = this.getClient().indices().create(cireq, RequestOptions.DEFAULT);
            if (!ciresp.isAcknowledged() || !ciresp.isShardsAcknowledged()) {
                throw new RuntimeException("Unable to create index.");
            }
        }
    }

    private URL getUrl() throws MalformedURLException {
        if (this.url == null) {
            this.url = new URL(this.elasticSearchConfig.getElasticSearchUrl());
        }
        return this.url;
    }

    private CredentialsProvider getCredentialsProvider() {
        if (StringUtils.isEmpty((CharSequence)this.elasticSearchConfig.getUsername()) || StringUtils.isEmpty((CharSequence)this.elasticSearchConfig.getPassword())) {
            return null;
        }
        this.credentialsProvider = new BasicCredentialsProvider();
        this.credentialsProvider.setCredentials(AuthScope.ANY, (Credentials)new UsernamePasswordCredentials(this.elasticSearchConfig.getUsername(), this.elasticSearchConfig.getPassword()));
        return this.credentialsProvider;
    }

    private RestHighLevelClient getClient() throws MalformedURLException {
        if (this.client == null) {
            CredentialsProvider cp = this.getCredentialsProvider();
            RestClientBuilder builder = RestClient.builder((HttpHost[])new HttpHost[]{new HttpHost(this.getUrl().getHost(), this.getUrl().getPort(), this.getUrl().getProtocol())});
            if (cp != null) {
                builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(cp));
            }
            this.client = new RestHighLevelClient(builder);
        }
        return this.client;
    }
}

