/*
 * Decompiled with CFR 0.152.
 */
package org.graylog2.indexer.messages;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.github.joschi.jadconfig.util.Duration;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
import org.graylog2.configuration.ElasticsearchConfiguration;
import org.graylog2.indexer.Deflector;
import org.graylog2.indexer.IndexFailure;
import org.graylog2.indexer.IndexFailureImpl;
import org.graylog2.indexer.messages.DocumentNotFoundException;
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.plugin.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class Messages {
    private static final Logger LOG = LoggerFactory.getLogger(Messages.class);
    private static final Duration MAX_WAIT_TIME = Duration.seconds((long)30L);
    private static final Retryer<BulkResponse> BULK_REQUEST_RETRYER = RetryerBuilder.newBuilder().retryIfException(t -> t instanceof ElasticsearchTimeoutException).withWaitStrategy(WaitStrategies.exponentialWait((long)MAX_WAIT_TIME.getQuantity(), (TimeUnit)MAX_WAIT_TIME.getUnit())).build();
    private final Client c;
    private final String deflectorName;
    private final String analyzer;
    private final Meter invalidTimestampMeter;
    private final LinkedBlockingQueue<List<IndexFailure>> indexFailureQueue;

    @Inject
    public Messages(Client client, ElasticsearchConfiguration configuration, MetricRegistry metricRegistry) {
        this.c = client;
        this.deflectorName = Deflector.buildName(configuration.getIndexPrefix());
        this.analyzer = configuration.getAnalyzer();
        this.invalidTimestampMeter = metricRegistry.meter(MetricRegistry.name(Messages.class, (String[])new String[]{"invalid-timestamps"}));
        this.indexFailureQueue = new LinkedBlockingQueue(1000);
    }

    public ResultMessage get(String messageId, String index) throws DocumentNotFoundException {
        GetRequest request = (GetRequest)this.c.prepareGet(index, "message", messageId).request();
        GetResponse r = (GetResponse)this.c.get(request).actionGet();
        if (!r.isExists()) {
            throw new DocumentNotFoundException(index, messageId);
        }
        return ResultMessage.parseFromSource(r);
    }

    public List<String> analyze(String string, String index) {
        AnalyzeResponse response = (AnalyzeResponse)this.c.admin().indices().prepareAnalyze(index, string).setAnalyzer(this.analyzer).get();
        List tokens = response.getTokens();
        ArrayList<String> terms = new ArrayList<String>(tokens.size());
        for (AnalyzeResponse.AnalyzeToken token : tokens) {
            terms.add(token.getTerm());
        }
        return terms;
    }

    public boolean bulkIndex(List<Message> messages) {
        return this.bulkIndex(this.deflectorName, messages);
    }

    public boolean bulkIndex(String indexName, List<Message> messages) {
        if (messages.isEmpty()) {
            return true;
        }
        BulkRequestBuilder requestBuilder = this.c.prepareBulk().setConsistencyLevel(WriteConsistencyLevel.ONE);
        for (Message msg : messages) {
            requestBuilder.add(this.buildIndexRequest(indexName, msg.toElasticSearchObject(this.invalidTimestampMeter), msg.getId()));
        }
        BulkResponse response = this.runBulkRequest((BulkRequest)requestBuilder.request());
        LOG.debug("Index {}: Bulk indexed {} messages, took {} ms, failures: {}", new Object[]{indexName, response.getItems().length, response.getTookInMillis(), response.hasFailures()});
        if (response.hasFailures()) {
            this.propagateFailure(response.getItems(), messages, response.buildFailureMessage());
        }
        return !response.hasFailures();
    }

    private BulkResponse runBulkRequest(BulkRequest request) {
        try {
            return (BulkResponse)this.c.bulk(request).actionGet();
        }
        catch (ElasticsearchTimeoutException timeoutException) {
            LOG.debug("Bulk indexing request timed out. Retrying.", (Throwable)timeoutException);
            try {
                return (BulkResponse)BULK_REQUEST_RETRYER.call((Callable)new BulkRequestCallable(this.c, request));
            }
            catch (RetryException | ExecutionException e) {
                LOG.error("Couldn't bulk index " + request.numberOfActions() + " messages.", e);
                throw Throwables.propagate((Throwable)e);
            }
        }
    }

    private void propagateFailure(BulkItemResponse[] items, List<Message> messages, String errorMessage) {
        LinkedList<IndexFailureImpl> indexFailures = new LinkedList<IndexFailureImpl>();
        for (BulkItemResponse item : items) {
            if (!item.isFailed()) continue;
            LOG.trace("Failed to index message: {}", (Object)item.getFailureMessage());
            BulkItemResponse.Failure f = item.getFailure();
            Message message = messages.get(item.getItemId());
            ImmutableMap doc = ImmutableMap.builder().put((Object)"letter_id", (Object)item.getId()).put((Object)"index", (Object)f.getIndex()).put((Object)"type", (Object)f.getType()).put((Object)"message", (Object)f.getMessage()).put((Object)"timestamp", (Object)message.getTimestamp()).build();
            indexFailures.add(new IndexFailureImpl((Map<String, Object>)doc));
        }
        LOG.error("Failed to index [{}] messages. Please check the index error log in your web interface for the reason. Error: {}", (Object)indexFailures.size(), (Object)errorMessage);
        try {
            this.indexFailureQueue.offer(indexFailures, 25L, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.warn("Couldn't save index failures.", (Throwable)e);
        }
    }

    public IndexRequest buildIndexRequest(String index, Map<String, Object> source, String id) {
        source.remove("_id");
        return (IndexRequest)((IndexRequestBuilder)this.c.prepareIndex(index, "message", id).setSource(source).setConsistencyLevel(WriteConsistencyLevel.ONE)).request();
    }

    public LinkedBlockingQueue<List<IndexFailure>> getIndexFailureQueue() {
        return this.indexFailureQueue;
    }

    private static class BulkRequestCallable
    implements Callable<BulkResponse> {
        private final Client client;
        private final BulkRequest request;

        public BulkRequestCallable(Client client, BulkRequest request) {
            this.client = (Client)Preconditions.checkNotNull((Object)client);
            this.request = (BulkRequest)Preconditions.checkNotNull((Object)request);
        }

        @Override
        public BulkResponse call() throws Exception {
            return (BulkResponse)this.client.bulk(this.request).actionGet();
        }
    }
}

