/*
 * Decompiled with CFR 0.152.
 */
package com.digitalpebble.stormcrawler.opensearch;

import com.digitalpebble.stormcrawler.util.ConfUtils;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkProcessor;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.client.Node;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class OpenSearchConnection {
    private static final Logger LOG = LoggerFactory.getLogger(OpenSearchConnection.class);
    @NotNull
    private final RestHighLevelClient client;
    @NotNull
    private final BulkProcessor processor;
    private boolean isClosed = false;

    private OpenSearchConnection(@NotNull RestHighLevelClient c, @NotNull BulkProcessor p) {
        this.processor = p;
        this.client = c;
    }

    public RestHighLevelClient getClient() {
        return this.client;
    }

    public void addToProcessor(DocWriteRequest<?> request) {
        this.processor.add(request);
    }

    public static RestHighLevelClient getClient(Map<String, Object> stormConf, String boltType) {
        boolean needsProxy;
        String dottedType = boltType + ".";
        ArrayList<HttpHost> hosts = new ArrayList<HttpHost>();
        List confighosts = ConfUtils.loadListFromConf((String)"opensearch.", (String)dottedType, (String)"addresses", stormConf);
        for (Object host : confighosts) {
            URI uri;
            int port = 9200;
            String scheme = "http";
            if (!((String)host).startsWith(scheme)) {
                host = "http://" + (String)host;
            }
            if ((uri = URI.create((String)host)).getHost() == null) {
                throw new RuntimeException("host undefined " + (String)host);
            }
            if (uri.getPort() != -1) {
                port = uri.getPort();
            }
            if (uri.getScheme() != null) {
                scheme = uri.getScheme();
            }
            hosts.add(new HttpHost(uri.getHost(), port, scheme));
        }
        RestClientBuilder builder = RestClient.builder((HttpHost[])hosts.toArray(new HttpHost[0]));
        String user = ConfUtils.getString(stormConf, (String)"opensearch.", (String)dottedType, (String)"user");
        String password = ConfUtils.getString(stormConf, (String)"opensearch.", (String)dottedType, (String)"password");
        String proxyhost = ConfUtils.getString(stormConf, (String)"opensearch.", (String)dottedType, (String)"proxy.host");
        int proxyport = ConfUtils.getInt(stormConf, (String)"opensearch.", (String)dottedType, (String)"proxy.port", (int)-1);
        String proxyscheme = ConfUtils.getString(stormConf, (String)"opensearch.", (String)dottedType, (String)"proxy.scheme", (String)"http");
        boolean needsUser = StringUtils.isNotBlank((String)user) && StringUtils.isNotBlank((String)password);
        boolean bl = needsProxy = StringUtils.isNotBlank((String)proxyhost) && proxyport != -1;
        if (needsUser || needsProxy) {
            builder.setHttpClientConfigCallback(httpClientBuilder -> {
                if (needsUser) {
                    BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                    credentialsProvider.setCredentials(AuthScope.ANY, (Credentials)new UsernamePasswordCredentials(user, password));
                    httpClientBuilder.setDefaultCredentialsProvider((CredentialsProvider)credentialsProvider);
                }
                if (needsProxy) {
                    httpClientBuilder.setProxy(new HttpHost(proxyhost, proxyport, proxyscheme));
                }
                return httpClientBuilder;
            });
        }
        int connectTimeout = ConfUtils.getInt(stormConf, (String)"opensearch.", (String)dottedType, (String)"connect.timeout", (int)1000);
        int socketTimeout = ConfUtils.getInt(stormConf, (String)"opensearch.", (String)dottedType, (String)"socket.timeout", (int)30000);
        builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connectTimeout).setSocketTimeout(socketTimeout));
        builder.setNodeSelector(nodes -> {
            for (Node node : nodes) {
                LOG.debug("Connected to ES node {} [{}] for {}", new Object[]{node.getName(), node.getHost(), boltType});
            }
        });
        boolean compression = ConfUtils.getBoolean(stormConf, (String)"opensearch.", (String)dottedType, (String)"compression", (boolean)false);
        builder.setCompressionEnabled(compression);
        return new RestHighLevelClient(builder);
    }

    public static OpenSearchConnection getConnection(Map<String, Object> stormConf, String boltType) {
        BulkProcessor.Listener listener = new BulkProcessor.Listener(){

            public void afterBulk(long arg0, BulkRequest arg1, BulkResponse arg2) {
            }

            public void afterBulk(long arg0, BulkRequest arg1, Throwable arg2) {
            }

            public void beforeBulk(long arg0, BulkRequest arg1) {
            }
        };
        return OpenSearchConnection.getConnection(stormConf, boltType, listener);
    }

    public static OpenSearchConnection getConnection(Map<String, Object> stormConf, String boltType, BulkProcessor.Listener listener) {
        RestHighLevelClient client = OpenSearchConnection.getClient(stormConf, boltType);
        String dottedType = boltType + ".";
        String flushIntervalString = ConfUtils.getString(stormConf, (String)"opensearch.", (String)dottedType, (String)"flushInterval", (String)"5s");
        TimeValue flushInterval = TimeValue.parseTimeValue((String)flushIntervalString, (TimeValue)TimeValue.timeValueSeconds((long)5L), (String)"flushInterval");
        int bulkActions = ConfUtils.getInt(stormConf, (String)"opensearch.", (String)dottedType, (String)"bulkActions", (int)50);
        int concurrentRequests = ConfUtils.getInt(stormConf, (String)"opensearch.", (String)dottedType, (String)"concurrentRequests", (int)1);
        BulkProcessor bulkProcessor = BulkProcessor.builder((request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), (BulkProcessor.Listener)listener).setFlushInterval(flushInterval).setBulkActions(bulkActions).setConcurrentRequests(concurrentRequests).build();
        return new OpenSearchConnection(client, bulkProcessor);
    }

    public void close() {
        if (this.isClosed) {
            LOG.warn("Tried to close an already closed connection!");
            return;
        }
        LOG.debug("Start closing the OpenSearch connection");
        try {
            boolean success = this.processor.awaitClose(60L, TimeUnit.SECONDS);
            if (!success) {
                throw new RuntimeException("Failed to flush pending actions when closing BulkProcessor");
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        try {
            this.client.close();
        }
        catch (IOException e) {
            LOG.trace("Client threw IO exception.");
        }
        this.isClosed = true;
    }
}

