/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.dao.es6.index;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.type.MapType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.common.utils.RetryUtil;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.execution.ApplicationException;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.dao.es6.index.BulkRequestWrapper;
import com.netflix.conductor.dao.es6.index.ElasticSearchBaseDAO;
import com.netflix.conductor.dao.es6.index.ElasticSearchDAOV6;
import com.netflix.conductor.elasticsearch.ElasticSearchConfiguration;
import com.netflix.conductor.elasticsearch.query.parser.ParserException;
import com.netflix.conductor.metrics.Monitors;
import conductor.com.fasterxml.jackson.core.JsonProcessingException;
import conductor.org.apache.http.Header;
import conductor.org.apache.http.HttpEntity;
import conductor.org.apache.http.entity.ContentType;
import conductor.org.apache.http.nio.entity.NByteArrayEntity;
import conductor.org.apache.http.nio.entity.NStringEntity;
import conductor.org.apache.http.util.EntityUtils;
import conductor.org.elasticsearch.action.DocWriteResponse;
import conductor.org.elasticsearch.action.bulk.BulkRequest;
import conductor.org.elasticsearch.action.bulk.BulkResponse;
import conductor.org.elasticsearch.action.delete.DeleteRequest;
import conductor.org.elasticsearch.action.delete.DeleteResponse;
import conductor.org.elasticsearch.action.get.GetRequest;
import conductor.org.elasticsearch.action.get.GetResponse;
import conductor.org.elasticsearch.action.index.IndexRequest;
import conductor.org.elasticsearch.action.search.SearchRequest;
import conductor.org.elasticsearch.action.search.SearchResponse;
import conductor.org.elasticsearch.action.update.UpdateRequest;
import conductor.org.elasticsearch.client.Response;
import conductor.org.elasticsearch.client.ResponseException;
import conductor.org.elasticsearch.client.RestClient;
import conductor.org.elasticsearch.client.RestClientBuilder;
import conductor.org.elasticsearch.client.RestHighLevelClient;
import conductor.org.elasticsearch.common.xcontent.XContentType;
import conductor.org.elasticsearch.index.query.BoolQueryBuilder;
import conductor.org.elasticsearch.index.query.QueryBuilder;
import conductor.org.elasticsearch.index.query.QueryBuilders;
import conductor.org.elasticsearch.search.SearchHit;
import conductor.org.elasticsearch.search.builder.SearchSourceBuilder;
import conductor.org.elasticsearch.search.sort.FieldSortBuilder;
import conductor.org.elasticsearch.search.sort.SortBuilder;
import conductor.org.elasticsearch.search.sort.SortOrder;
import conductor.org.joda.time.DateTime;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Trace
@Singleton
public class ElasticSearchRestDAOV6
extends ElasticSearchBaseDAO
implements IndexDAO {
    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDAOV6.class);
    private static final int RETRY_COUNT = 3;
    private static final int CORE_POOL_SIZE = 6;
    private static final long KEEP_ALIVE_TIME = 1L;
    private static final String WORKFLOW_DOC_TYPE = "workflow";
    private static final String TASK_DOC_TYPE = "task";
    private static final String LOG_DOC_TYPE = "task_log";
    private static final String EVENT_DOC_TYPE = "event";
    private static final String MSG_DOC_TYPE = "message";
    private static final TimeZone GMT = TimeZone.getTimeZone("GMT");
    private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMWW");
    private static final String className = ElasticSearchRestDAOV6.class.getSimpleName();
    private final String workflowIndexName;
    private final String taskIndexName;
    private final String eventIndexPrefix;
    private String eventIndexName;
    private final String messageIndexPrefix;
    private String messageIndexName;
    private String logIndexName;
    private final String logIndexPrefix;
    private final String docTypeOverride;
    private final String clusterHealthColor;
    private final ObjectMapper objectMapper;
    private final RestHighLevelClient elasticSearchClient;
    private final RestClient elasticSearchAdminClient;
    private final ExecutorService executorService;
    private final ExecutorService logExecutorService;
    private final ConcurrentHashMap<String, BulkRequests> bulkRequests;
    private final int indexBatchSize;
    private final int asyncBufferFlushTimeout;
    private final ElasticSearchConfiguration config;

    @Inject
    public ElasticSearchRestDAOV6(RestClientBuilder restClientBuilder, ElasticSearchConfiguration config, ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
        this.elasticSearchAdminClient = restClientBuilder.build();
        this.elasticSearchClient = new RestHighLevelClient(restClientBuilder);
        this.clusterHealthColor = config.getClusterHealthColor();
        this.bulkRequests = new ConcurrentHashMap();
        this.indexBatchSize = config.getIndexBatchSize();
        this.asyncBufferFlushTimeout = config.getAsyncBufferFlushTimeout();
        this.config = config;
        this.indexPrefix = config.getIndexName();
        this.docTypeOverride = !config.isElasticSearchAutoIndexManagementEnabled() && StringUtils.isNotBlank((CharSequence)config.getElasticSearchDocumentTypeOverride()) ? config.getElasticSearchDocumentTypeOverride() : "";
        this.workflowIndexName = this.getIndexName(WORKFLOW_DOC_TYPE);
        this.taskIndexName = this.getIndexName(TASK_DOC_TYPE);
        this.logIndexPrefix = this.indexPrefix + "_" + LOG_DOC_TYPE;
        this.messageIndexPrefix = this.indexPrefix + "_" + MSG_DOC_TYPE;
        this.eventIndexPrefix = this.indexPrefix + "_" + EVENT_DOC_TYPE;
        int workerQueueSize = config.getAsyncWorkerQueueSize();
        int maximumPoolSize = config.getAsyncMaxPoolSize();
        this.executorService = new ThreadPoolExecutor(6, maximumPoolSize, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(workerQueueSize), (runnable, executor) -> {
            logger.warn("Request  {} to async dao discarded in executor {}", (Object)runnable, (Object)executor);
            Monitors.recordDiscardedIndexingCount((String)"indexQueue");
        });
        int corePoolSize = 1;
        maximumPoolSize = 2;
        long keepAliveTime = 30L;
        this.logExecutorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(workerQueueSize), (runnable, executor) -> {
            logger.warn("Request {} to async log dao discarded in executor {}", (Object)runnable, (Object)executor);
            Monitors.recordDiscardedIndexingCount((String)"logQueue");
        });
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flushBulkRequests, 60L, 30L, TimeUnit.SECONDS);
    }

    @PreDestroy
    private void shutdown() {
        logger.info("Gracefully shutdown executor service");
        this.shutdownExecutorService(this.logExecutorService);
        this.shutdownExecutorService(this.executorService);
    }

    private void shutdownExecutorService(ExecutorService execService) {
        try {
            execService.shutdown();
            if (execService.awaitTermination(30L, TimeUnit.SECONDS)) {
                logger.debug("tasks completed, shutting down");
            } else {
                logger.warn("Forcing shutdown after waiting for 30 seconds");
                execService.shutdownNow();
            }
        }
        catch (InterruptedException ie) {
            logger.warn("Shutdown interrupted, invoking shutdownNow on scheduledThreadPoolExecutor for delay queue");
            execService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    public void setup() throws Exception {
        this.waitForHealthyCluster();
        if (this.config.isElasticSearchAutoIndexManagementEnabled()) {
            this.createIndexesTemplates();
            this.createWorkflowIndex();
            this.createTaskIndex();
        }
    }

    private void createIndexesTemplates() {
        try {
            this.initIndexesTemplates();
            this.updateIndexesNames();
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(this::updateIndexesNames, 0L, 1L, TimeUnit.HOURS);
        }
        catch (Exception e) {
            logger.error("Error creating index templates!", (Throwable)e);
        }
    }

    private void initIndexesTemplates() {
        this.initIndexTemplate(LOG_DOC_TYPE);
        this.initIndexTemplate(EVENT_DOC_TYPE);
        this.initIndexTemplate(MSG_DOC_TYPE);
    }

    private void initIndexTemplate(String type) {
        String template = "template_" + type;
        try {
            if (this.doesResourceNotExist("/_template/" + template)) {
                logger.info("Creating the index template '" + template + "'");
                InputStream stream = ElasticSearchDAOV6.class.getResourceAsStream("/" + template + ".json");
                byte[] templateSource = IOUtils.toByteArray((InputStream)stream);
                NByteArrayEntity entity = new NByteArrayEntity(templateSource, ContentType.APPLICATION_JSON);
                this.elasticSearchAdminClient.performRequest("PUT", "/_template/" + template, Collections.emptyMap(), (HttpEntity)entity, new Header[0]);
            }
        }
        catch (Exception e) {
            logger.error("Failed to init " + template, (Throwable)e);
        }
    }

    private void updateIndexesNames() {
        this.logIndexName = this.updateIndexName(LOG_DOC_TYPE);
        this.eventIndexName = this.updateIndexName(EVENT_DOC_TYPE);
        this.messageIndexName = this.updateIndexName(MSG_DOC_TYPE);
    }

    private String updateIndexName(String type) {
        String indexName = this.indexPrefix + "_" + type + "_" + SIMPLE_DATE_FORMAT.format(new Date());
        try {
            this.addIndex(indexName);
            return indexName;
        }
        catch (IOException e) {
            logger.error("Failed to update log index name: {}", (Object)indexName, (Object)e);
            throw new ApplicationException(e.getMessage(), (Throwable)e);
        }
    }

    private void createWorkflowIndex() {
        String indexName = this.getIndexName(WORKFLOW_DOC_TYPE);
        try {
            this.addIndex(indexName);
        }
        catch (IOException e) {
            logger.error("Failed to initialize index '{}'", (Object)indexName, (Object)e);
        }
        try {
            this.addMappingToIndex(indexName, WORKFLOW_DOC_TYPE, "/mappings_docType_workflow.json");
        }
        catch (IOException e) {
            logger.error("Failed to add {} mapping", (Object)WORKFLOW_DOC_TYPE);
        }
    }

    private void createTaskIndex() {
        String indexName = this.getIndexName(TASK_DOC_TYPE);
        try {
            this.addIndex(indexName);
        }
        catch (IOException e) {
            logger.error("Failed to initialize index '{}'", (Object)indexName, (Object)e);
        }
        try {
            this.addMappingToIndex(indexName, TASK_DOC_TYPE, "/mappings_docType_task.json");
        }
        catch (IOException e) {
            logger.error("Failed to add {} mapping", (Object)TASK_DOC_TYPE);
        }
    }

    private void waitForHealthyCluster() throws Exception {
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("wait_for_status", this.clusterHealthColor);
        params.put("timeout", "30s");
        this.elasticSearchAdminClient.performRequest("GET", "/_cluster/health", params, new Header[0]);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void addIndex(String index) throws IOException {
        logger.info("Adding index '{}'...", (Object)index);
        String resourcePath = "/" + index;
        if (this.doesResourceNotExist(resourcePath)) {
            try {
                ObjectNode setting = this.objectMapper.createObjectNode();
                ObjectNode indexSetting = this.objectMapper.createObjectNode();
                indexSetting.put("number_of_shards", this.config.getElasticSearchIndexShardCount());
                indexSetting.put("number_of_replicas", this.config.getElasticSearchIndexReplicationCount());
                setting.set("index", (JsonNode)indexSetting);
                this.elasticSearchAdminClient.performRequest("PUT", resourcePath, Collections.emptyMap(), (HttpEntity)new NStringEntity(setting.toString(), ContentType.APPLICATION_JSON), new Header[0]);
                logger.info("Added '{}' index", (Object)index);
                return;
            }
            catch (ResponseException e) {
                JsonNode root;
                String errorCode;
                boolean errorCreatingIndex = true;
                Response errorResponse = e.getResponse();
                if (errorResponse.getStatusLine().getStatusCode() == 400 && "index_already_exists_exception".equals(errorCode = (root = this.objectMapper.readTree(EntityUtils.toString(errorResponse.getEntity()))).get("error").get("type").asText())) {
                    return;
                }
                if (!errorCreatingIndex) return;
                throw e;
            }
        } else {
            logger.info("Index '{}' already exists", (Object)index);
        }
    }

    private void addMappingToIndex(String index, String mappingType, String mappingFilename) throws IOException {
        logger.info("Adding '{}' mapping to index '{}'...", (Object)mappingType, (Object)index);
        String resourcePath = "/" + index + "/_mapping/" + mappingType;
        if (this.doesResourceNotExist(resourcePath)) {
            NByteArrayEntity entity = new NByteArrayEntity(this.loadTypeMappingSource(mappingFilename).getBytes(), ContentType.APPLICATION_JSON);
            this.elasticSearchAdminClient.performRequest("PUT", resourcePath, Collections.emptyMap(), (HttpEntity)entity, new Header[0]);
            logger.info("Added '{}' mapping", (Object)mappingType);
        } else {
            logger.info("Mapping '{}' already exists", (Object)mappingType);
        }
    }

    public boolean doesResourceExist(String resourcePath) throws IOException {
        Response response = this.elasticSearchAdminClient.performRequest("HEAD", resourcePath, new Header[0]);
        return response.getStatusLine().getStatusCode() == 200;
    }

    public boolean doesResourceNotExist(String resourcePath) throws IOException {
        return !this.doesResourceExist(resourcePath);
    }

    public void indexWorkflow(Workflow workflow) {
        try {
            long startTime = Instant.now().toEpochMilli();
            String workflowId = workflow.getWorkflowId();
            WorkflowSummary summary = new WorkflowSummary(workflow);
            byte[] docBytes = this.objectMapper.writeValueAsBytes((Object)summary);
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? WORKFLOW_DOC_TYPE : this.docTypeOverride;
            IndexRequest request = new IndexRequest(this.workflowIndexName, docType, workflowId);
            request.source(docBytes, XContentType.JSON);
            new RetryUtil().retryOnException(() -> {
                try {
                    return this.elasticSearchClient.index(request, new Header[0]);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }, null, null, 3, "Indexing workflow document: " + workflow.getWorkflowId(), "indexWorkflow");
            long endTime = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for indexing workflow: {}", (Object)(endTime - startTime), (Object)workflowId);
            Monitors.recordESIndexTime((String)"index_workflow", (String)WORKFLOW_DOC_TYPE, (long)(endTime - startTime));
            Monitors.recordWorkerQueueSize((String)"indexQueue", (int)((ThreadPoolExecutor)this.executorService).getQueue().size());
        }
        catch (Exception e) {
            Monitors.error((String)className, (String)"indexWorkflow");
            logger.error("Failed to index workflow: {}", (Object)workflow.getWorkflowId(), (Object)e);
        }
    }

    public CompletableFuture<Void> asyncIndexWorkflow(Workflow workflow) {
        return CompletableFuture.runAsync(() -> this.indexWorkflow(workflow), this.executorService);
    }

    public void indexTask(Task task) {
        try {
            long startTime = Instant.now().toEpochMilli();
            String taskId = task.getTaskId();
            TaskSummary summary = new TaskSummary(task);
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? TASK_DOC_TYPE : this.docTypeOverride;
            this.indexObject(this.taskIndexName, docType, taskId, summary);
            long endTime = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for  indexing task:{} in workflow: {}", new Object[]{endTime - startTime, taskId, task.getWorkflowInstanceId()});
            Monitors.recordESIndexTime((String)"index_task", (String)TASK_DOC_TYPE, (long)(endTime - startTime));
            Monitors.recordWorkerQueueSize((String)"indexQueue", (int)((ThreadPoolExecutor)this.executorService).getQueue().size());
        }
        catch (Exception e) {
            logger.error("Failed to index task: {}", (Object)task.getTaskId(), (Object)e);
        }
    }

    public CompletableFuture<Void> asyncIndexTask(Task task) {
        return CompletableFuture.runAsync(() -> this.indexTask(task), this.executorService);
    }

    public void addTaskExecutionLogs(List<TaskExecLog> taskExecLogs) {
        if (taskExecLogs.isEmpty()) {
            return;
        }
        long startTime = Instant.now().toEpochMilli();
        BulkRequest bulkRequest = new BulkRequest();
        for (TaskExecLog log : taskExecLogs) {
            byte[] docBytes;
            try {
                docBytes = this.objectMapper.writeValueAsBytes((Object)log);
            }
            catch (JsonProcessingException e) {
                logger.error("Failed to convert task log to JSON for task {}", (Object)log.getTaskId());
                continue;
            }
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? LOG_DOC_TYPE : this.docTypeOverride;
            IndexRequest request = new IndexRequest(this.logIndexName, docType);
            request.source(docBytes, XContentType.JSON);
            bulkRequest.add(request);
        }
        try {
            new RetryUtil().retryOnException(() -> {
                try {
                    return this.elasticSearchClient.bulk(bulkRequest, new Header[0]);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }, null, BulkResponse::hasFailures, 3, "Indexing task execution logs", "addTaskExecutionLogs");
            long endTime = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for indexing taskExecutionLogs", (Object)(endTime - startTime));
            Monitors.recordESIndexTime((String)"index_task_execution_logs", (String)LOG_DOC_TYPE, (long)(endTime - startTime));
            Monitors.recordWorkerQueueSize((String)"logQueue", (int)((ThreadPoolExecutor)this.logExecutorService).getQueue().size());
        }
        catch (Exception e) {
            List taskIds = taskExecLogs.stream().map(TaskExecLog::getTaskId).collect(Collectors.toList());
            logger.error("Failed to index task execution logs for tasks: {}", taskIds, (Object)e);
        }
    }

    public CompletableFuture<Void> asyncAddTaskExecutionLogs(List<TaskExecLog> logs) {
        return CompletableFuture.runAsync(() -> this.addTaskExecutionLogs(logs), this.logExecutorService);
    }

    public List<TaskExecLog> getTaskExecutionLogs(String taskId) {
        try {
            BoolQueryBuilder query = this.boolQueryBuilder("taskId='" + taskId + "'", "*");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(query);
            searchSourceBuilder.sort((SortBuilder<?>)new FieldSortBuilder("createdTime").order(SortOrder.ASC));
            searchSourceBuilder.size(this.config.getElasticSearchTasklogLimit());
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? LOG_DOC_TYPE : this.docTypeOverride;
            SearchRequest searchRequest = new SearchRequest(this.logIndexPrefix + "*");
            searchRequest.types(docType);
            searchRequest.source(searchSourceBuilder);
            SearchResponse response = this.elasticSearchClient.search(searchRequest, new Header[0]);
            return this.mapTaskExecLogsResponse(response);
        }
        catch (Exception e) {
            logger.error("Failed to get task execution logs for task: {}", (Object)taskId, (Object)e);
            return null;
        }
    }

    private List<TaskExecLog> mapTaskExecLogsResponse(SearchResponse response) throws IOException {
        SearchHit[] hits = response.getHits().getHits();
        ArrayList<TaskExecLog> logs = new ArrayList<TaskExecLog>(hits.length);
        for (SearchHit hit : hits) {
            String source = hit.getSourceAsString();
            TaskExecLog tel = (TaskExecLog)this.objectMapper.readValue(source, TaskExecLog.class);
            logs.add(tel);
        }
        return logs;
    }

    public List<Message> getMessages(String queue) {
        try {
            BoolQueryBuilder query = this.boolQueryBuilder("queue='" + queue + "'", "*");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(query);
            searchSourceBuilder.sort((SortBuilder<?>)new FieldSortBuilder("created").order(SortOrder.ASC));
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? MSG_DOC_TYPE : this.docTypeOverride;
            SearchRequest searchRequest = new SearchRequest(this.messageIndexPrefix + "*");
            searchRequest.types(docType);
            searchRequest.source(searchSourceBuilder);
            SearchResponse response = this.elasticSearchClient.search(searchRequest, new Header[0]);
            return this.mapGetMessagesResponse(response);
        }
        catch (Exception e) {
            logger.error("Failed to get messages for queue: {}", (Object)queue, (Object)e);
            return null;
        }
    }

    private List<Message> mapGetMessagesResponse(SearchResponse response) throws IOException {
        SearchHit[] hits = response.getHits().getHits();
        TypeFactory factory = TypeFactory.defaultInstance();
        MapType type = factory.constructMapType(HashMap.class, String.class, String.class);
        ArrayList<Message> messages = new ArrayList<Message>(hits.length);
        for (SearchHit hit : hits) {
            String source = hit.getSourceAsString();
            Map mapSource = (Map)this.objectMapper.readValue(source, (JavaType)type);
            Message msg = new Message((String)mapSource.get("messageId"), (String)mapSource.get("payload"), null);
            messages.add(msg);
        }
        return messages;
    }

    public List<EventExecution> getEventExecutions(String event) {
        try {
            BoolQueryBuilder query = this.boolQueryBuilder("event='" + event + "'", "*");
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
            searchSourceBuilder.query(query);
            searchSourceBuilder.sort((SortBuilder<?>)new FieldSortBuilder("created").order(SortOrder.ASC));
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? EVENT_DOC_TYPE : this.docTypeOverride;
            SearchRequest searchRequest = new SearchRequest(this.eventIndexPrefix + "*");
            searchRequest.types(docType);
            searchRequest.source(searchSourceBuilder);
            SearchResponse response = this.elasticSearchClient.search(searchRequest, new Header[0]);
            return this.mapEventExecutionsResponse(response);
        }
        catch (Exception e) {
            logger.error("Failed to get executions for event: {}", (Object)event, (Object)e);
            return null;
        }
    }

    private List<EventExecution> mapEventExecutionsResponse(SearchResponse response) throws IOException {
        SearchHit[] hits = response.getHits().getHits();
        ArrayList<EventExecution> executions = new ArrayList<EventExecution>(hits.length);
        for (SearchHit hit : hits) {
            String source = hit.getSourceAsString();
            EventExecution tel = (EventExecution)this.objectMapper.readValue(source, EventExecution.class);
            executions.add(tel);
        }
        return executions;
    }

    public void addMessage(String queue, Message message) {
        try {
            long startTime = Instant.now().toEpochMilli();
            HashMap<String, Object> doc = new HashMap<String, Object>();
            doc.put("messageId", message.getId());
            doc.put("payload", message.getPayload());
            doc.put("queue", queue);
            doc.put("created", System.currentTimeMillis());
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? MSG_DOC_TYPE : this.docTypeOverride;
            this.indexObject(this.messageIndexName, docType, doc);
            long endTime = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for  indexing message: {}", (Object)(endTime - startTime), (Object)message.getId());
            Monitors.recordESIndexTime((String)"add_message", (String)MSG_DOC_TYPE, (long)(endTime - startTime));
        }
        catch (Exception e) {
            logger.error("Failed to index message: {}", (Object)message.getId(), (Object)e);
        }
    }

    public CompletableFuture<Void> asyncAddMessage(String queue, Message message) {
        return CompletableFuture.runAsync(() -> this.addMessage(queue, message), this.executorService);
    }

    public void addEventExecution(EventExecution eventExecution) {
        try {
            long startTime = Instant.now().toEpochMilli();
            String id = eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution.getMessageId() + "." + eventExecution.getId();
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? EVENT_DOC_TYPE : this.docTypeOverride;
            this.indexObject(this.eventIndexName, docType, id, eventExecution);
            long endTime = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for indexing event execution: {}", (Object)(endTime - startTime), (Object)eventExecution.getId());
            Monitors.recordESIndexTime((String)"add_event_execution", (String)EVENT_DOC_TYPE, (long)(endTime - startTime));
            Monitors.recordWorkerQueueSize((String)"logQueue", (int)((ThreadPoolExecutor)this.logExecutorService).getQueue().size());
        }
        catch (Exception e) {
            logger.error("Failed to index event execution: {}", (Object)eventExecution.getId(), (Object)e);
        }
    }

    public CompletableFuture<Void> asyncAddEventExecution(EventExecution eventExecution) {
        return CompletableFuture.runAsync(() -> this.addEventExecution(eventExecution), this.logExecutorService);
    }

    public SearchResult<String> searchWorkflows(String query, String freeText, int start, int count, List<String> sort) {
        try {
            return this.searchObjectIdsViaExpression(query, start, count, sort, freeText, WORKFLOW_DOC_TYPE);
        }
        catch (Exception e) {
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), (Throwable)e);
        }
    }

    public SearchResult<String> searchTasks(String query, String freeText, int start, int count, List<String> sort) {
        try {
            return this.searchObjectIdsViaExpression(query, start, count, sort, freeText, TASK_DOC_TYPE);
        }
        catch (Exception e) {
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), (Throwable)e);
        }
    }

    public void removeWorkflow(String workflowId) {
        long startTime = Instant.now().toEpochMilli();
        String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? WORKFLOW_DOC_TYPE : this.docTypeOverride;
        DeleteRequest request = new DeleteRequest(this.workflowIndexName, docType, workflowId);
        try {
            DeleteResponse response = this.elasticSearchClient.delete(request, new Header[0]);
            if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) {
                logger.error("Index removal failed - document not found by id: {}", (Object)workflowId);
            }
            long endTime = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for removing workflow: {}", (Object)(endTime - startTime), (Object)workflowId);
            Monitors.recordESIndexTime((String)"remove_workflow", (String)WORKFLOW_DOC_TYPE, (long)(endTime - startTime));
            Monitors.recordWorkerQueueSize((String)"indexQueue", (int)((ThreadPoolExecutor)this.executorService).getQueue().size());
        }
        catch (IOException e) {
            logger.error("Failed to remove workflow {} from index", (Object)workflowId, (Object)e);
            Monitors.error((String)className, (String)"remove");
        }
    }

    public CompletableFuture<Void> asyncRemoveWorkflow(String workflowId) {
        return CompletableFuture.runAsync(() -> this.removeWorkflow(workflowId), this.executorService);
    }

    public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] values) {
        if (keys.length != values.length) {
            throw new ApplicationException(ApplicationException.Code.INVALID_INPUT, "Number of keys and values do not match");
        }
        long startTime = Instant.now().toEpochMilli();
        String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? WORKFLOW_DOC_TYPE : this.docTypeOverride;
        UpdateRequest request = new UpdateRequest(this.workflowIndexName, docType, workflowInstanceId);
        Map<String, Object> source = IntStream.range(0, keys.length).boxed().collect(Collectors.toMap(i -> keys[i], i -> values[i]));
        request.doc(source);
        logger.debug("Updating workflow {} with {}", (Object)workflowInstanceId, source);
        new RetryUtil().retryOnException(() -> {
            try {
                return this.elasticSearchClient.update(request, new Header[0]);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, null, null, 3, "Updating workflow document: " + workflowInstanceId, "updateWorkflow");
        long endTime = Instant.now().toEpochMilli();
        logger.debug("Time taken {} for updating workflow: {}", (Object)(endTime - startTime), (Object)workflowInstanceId);
        Monitors.recordESIndexTime((String)"update_workflow", (String)WORKFLOW_DOC_TYPE, (long)(endTime - startTime));
        Monitors.recordWorkerQueueSize((String)"indexQueue", (int)((ThreadPoolExecutor)this.executorService).getQueue().size());
    }

    public CompletableFuture<Void> asyncUpdateWorkflow(String workflowInstanceId, String[] keys, Object[] values) {
        return CompletableFuture.runAsync(() -> this.updateWorkflow(workflowInstanceId, keys, values), this.executorService);
    }

    public String get(String workflowInstanceId, String fieldToGet) {
        Map<String, Object> sourceAsMap;
        GetResponse response;
        String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? WORKFLOW_DOC_TYPE : this.docTypeOverride;
        GetRequest request = new GetRequest(this.workflowIndexName, docType, workflowInstanceId);
        try {
            response = this.elasticSearchClient.get(request, new Header[0]);
        }
        catch (IOException e) {
            logger.error("Unable to get Workflow: {} from ElasticSearch index: {}", new Object[]{workflowInstanceId, this.workflowIndexName, e});
            return null;
        }
        if (response.isExists() && (sourceAsMap = response.getSourceAsMap()).get(fieldToGet) != null) {
            return sourceAsMap.get(fieldToGet).toString();
        }
        logger.debug("Unable to find Workflow: {} in ElasticSearch index: {}.", (Object)workflowInstanceId, (Object)this.workflowIndexName);
        return null;
    }

    private SearchResult<String> searchObjectIdsViaExpression(String structuredQuery, int start, int size, List<String> sortOptions, String freeTextQuery, String docType) throws ParserException, IOException {
        BoolQueryBuilder queryBuilder = this.boolQueryBuilder(structuredQuery, freeTextQuery);
        return this.searchObjectIds(this.getIndexName(docType), queryBuilder, start, size, sortOptions, docType);
    }

    private SearchResult<String> searchObjectIds(String indexName, QueryBuilder queryBuilder, int start, int size, String docType) throws IOException {
        return this.searchObjectIds(indexName, queryBuilder, start, size, null, docType);
    }

    private SearchResult<String> searchObjectIds(String indexName, QueryBuilder queryBuilder, int start, int size, List<String> sortOptions, String docType) throws IOException {
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query(queryBuilder);
        searchSourceBuilder.from(start);
        searchSourceBuilder.size(size);
        if (sortOptions != null && !sortOptions.isEmpty()) {
            for (String sortOption : sortOptions) {
                SortOrder order = SortOrder.ASC;
                String field = sortOption;
                int index = sortOption.indexOf(":");
                if (index > 0) {
                    field = sortOption.substring(0, index);
                    order = SortOrder.valueOf(sortOption.substring(index + 1));
                }
                searchSourceBuilder.sort((SortBuilder<?>)new FieldSortBuilder(field).order(order));
            }
        }
        docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? docType : this.docTypeOverride;
        SearchRequest searchRequest = new SearchRequest(indexName);
        searchRequest.types(docType);
        searchRequest.source(searchSourceBuilder);
        SearchResponse response = this.elasticSearchClient.search(searchRequest, new Header[0]);
        LinkedList result = new LinkedList();
        response.getHits().forEach(hit -> result.add(hit.getId()));
        long count = response.getHits().getTotalHits();
        return new SearchResult(count, result);
    }

    public List<String> searchArchivableWorkflows(String indexName, long archiveTtlDays) {
        SearchResult<String> workflowIds;
        BoolQueryBuilder q = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("endTime").lt(LocalDate.now().minusDays(archiveTtlDays).toString()).gte(LocalDate.now().minusDays(archiveTtlDays).minusDays(1L).toString())).should(QueryBuilders.termQuery("status", "COMPLETED")).should(QueryBuilders.termQuery("status", "FAILED")).should(QueryBuilders.termQuery("status", "TIMED_OUT")).should(QueryBuilders.termQuery("status", "TERMINATED")).mustNot(QueryBuilders.existsQuery("archived")).minimumShouldMatch(1);
        try {
            workflowIds = this.searchObjectIds(indexName, q, 0, 1000, WORKFLOW_DOC_TYPE);
        }
        catch (IOException e) {
            logger.error("Unable to communicate with ES to find archivable workflows", (Throwable)e);
            return Collections.emptyList();
        }
        return workflowIds.getResults();
    }

    public List<String> searchRecentRunningWorkflows(int lastModifiedHoursAgoFrom, int lastModifiedHoursAgoTo) {
        SearchResult<String> workflowIds;
        DateTime dateTime = new DateTime();
        BoolQueryBuilder q = QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery("updateTime").gt(dateTime.minusHours(lastModifiedHoursAgoFrom))).must(QueryBuilders.rangeQuery("updateTime").lt(dateTime.minusHours(lastModifiedHoursAgoTo))).must(QueryBuilders.termQuery("status", "RUNNING"));
        try {
            workflowIds = this.searchObjectIds(this.workflowIndexName, q, 0, 5000, Collections.singletonList("updateTime:ASC"), WORKFLOW_DOC_TYPE);
        }
        catch (IOException e) {
            logger.error("Unable to communicate with ES to find recent running workflows", (Throwable)e);
            return Collections.emptyList();
        }
        return workflowIds.getResults();
    }

    private void indexObject(String index, String docType, Object doc) {
        this.indexObject(index, docType, null, doc);
    }

    private void indexObject(String index, String docType, String docId, Object doc) {
        byte[] docBytes;
        try {
            docBytes = this.objectMapper.writeValueAsBytes(doc);
        }
        catch (JsonProcessingException e) {
            logger.error("Failed to convert {} '{}' to byte string", (Object)docType, (Object)docId);
            return;
        }
        IndexRequest request = new IndexRequest(index, docType, docId);
        request.source(docBytes, XContentType.JSON);
        if (this.bulkRequests.get(docType) == null) {
            this.bulkRequests.put(docType, new BulkRequests(System.currentTimeMillis(), new BulkRequest()));
        }
        this.bulkRequests.get(docType).getBulkRequest().add(request);
        if (this.bulkRequests.get(docType).getBulkRequest().numberOfActions() >= this.indexBatchSize) {
            this.indexBulkRequest(docType);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void indexBulkRequest(String docType) {
        if (this.bulkRequests.get(docType).getBulkRequest() != null && this.bulkRequests.get(docType).getBulkRequest().numberOfActions() > 0) {
            BulkRequestWrapper bulkRequestWrapper = this.bulkRequests.get(docType).getBulkRequest();
            synchronized (bulkRequestWrapper) {
                this.indexWithRetry(this.bulkRequests.get(docType).getBulkRequest().get(), "Bulk Indexing " + docType, docType);
                this.bulkRequests.put(docType, new BulkRequests(System.currentTimeMillis(), new BulkRequest()));
            }
        }
    }

    private void indexWithRetry(BulkRequest request, String operationDescription, String docType) {
        try {
            long startTime = Instant.now().toEpochMilli();
            new RetryUtil().retryOnException(() -> {
                try {
                    return this.elasticSearchClient.bulk(request, new Header[0]);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }, null, null, 3, operationDescription, "indexWithRetry");
            long endTime = Instant.now().toEpochMilli();
            logger.debug("Time taken {} for indexing object of type: {}", (Object)(endTime - startTime), (Object)docType);
            Monitors.recordESIndexTime((String)"index_object", (String)docType, (long)(endTime - startTime));
            Monitors.recordWorkerQueueSize((String)"indexQueue", (int)((ThreadPoolExecutor)this.executorService).getQueue().size());
            Monitors.recordWorkerQueueSize((String)"logQueue", (int)((ThreadPoolExecutor)this.logExecutorService).getQueue().size());
        }
        catch (Exception e) {
            Monitors.error((String)className, (String)"index");
            logger.error("Failed to index {} for request type: {}", new Object[]{request, docType, e});
        }
    }

    private void flushBulkRequests() {
        this.bulkRequests.entrySet().stream().filter(entry -> System.currentTimeMillis() - ((BulkRequests)entry.getValue()).getLastFlushTime() >= (long)(this.asyncBufferFlushTimeout * 1000)).filter(entry -> ((BulkRequests)entry.getValue()).getBulkRequest() != null && ((BulkRequests)entry.getValue()).getBulkRequest().numberOfActions() > 0).forEach(entry -> {
            logger.debug("Flushing bulk request buffer for type {}, size: {}", entry.getKey(), (Object)((BulkRequests)entry.getValue()).getBulkRequest().numberOfActions());
            this.indexBulkRequest((String)entry.getKey());
        });
    }

    static {
        SIMPLE_DATE_FORMAT.setTimeZone(GMT);
    }

    private static class BulkRequests {
        private final long lastFlushTime;
        private final BulkRequestWrapper bulkRequest;

        long getLastFlushTime() {
            return this.lastFlushTime;
        }

        BulkRequestWrapper getBulkRequest() {
            return this.bulkRequest;
        }

        BulkRequests(long lastFlushTime, BulkRequest bulkRequest) {
            this.lastFlushTime = lastFlushTime;
            this.bulkRequest = new BulkRequestWrapper(bulkRequest);
        }
    }

    private static @interface HttpMethod {
        public static final String GET = "GET";
        public static final String POST = "POST";
        public static final String PUT = "PUT";
        public static final String HEAD = "HEAD";
    }
}

