/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.conductor.es6.dao.index;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.MapType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.metadata.events.EventExecution;
import com.netflix.conductor.common.metadata.tasks.TaskExecLog;
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.run.WorkflowSummary;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.es6.config.ElasticSearchProperties;
import com.netflix.conductor.es6.dao.index.BulkRequestBuilderWrapper;
import com.netflix.conductor.es6.dao.index.ElasticSearchBaseDAO;
import com.netflix.conductor.es6.dao.query.parser.internal.ParserException;
import com.netflix.conductor.metrics.Monitors;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.retry.support.RetryTemplate;

@Trace
public class ElasticSearchDAOV6
extends ElasticSearchBaseDAO
implements IndexDAO {
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchDAOV6.class);
    private static final String WORKFLOW_DOC_TYPE = "workflow";
    private static final String TASK_DOC_TYPE = "task";
    private static final String LOG_DOC_TYPE = "task_log";
    private static final String EVENT_DOC_TYPE = "event";
    private static final String MSG_DOC_TYPE = "message";
    private static final int CORE_POOL_SIZE = 6;
    private static final long KEEP_ALIVE_TIME = 1L;
    private static final int UPDATE_REQUEST_RETRY_COUNT = 5;
    private static final String CLASS_NAME = ElasticSearchDAOV6.class.getSimpleName();
    private final String workflowIndexName;
    private final String taskIndexName;
    private final String eventIndexPrefix;
    private String eventIndexName;
    private final String messageIndexPrefix;
    private String messageIndexName;
    private String logIndexName;
    private final String logIndexPrefix;
    private final String docTypeOverride;
    private final ObjectMapper objectMapper;
    private final Client elasticSearchClient;
    private static final TimeZone GMT = TimeZone.getTimeZone("GMT");
    private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat("yyyyMMWW");
    private final ExecutorService executorService;
    private final ExecutorService logExecutorService;
    private final ConcurrentHashMap<String, BulkRequests> bulkRequests;
    private final int indexBatchSize;
    private final long asyncBufferFlushTimeout;
    private final ElasticSearchProperties properties;
    private final RetryTemplate retryTemplate;

    public ElasticSearchDAOV6(Client elasticSearchClient, RetryTemplate retryTemplate, ElasticSearchProperties properties, ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
        this.elasticSearchClient = elasticSearchClient;
        this.indexPrefix = properties.getIndexPrefix();
        this.workflowIndexName = this.getIndexName(WORKFLOW_DOC_TYPE);
        this.taskIndexName = this.getIndexName(TASK_DOC_TYPE);
        this.logIndexPrefix = this.indexPrefix + "_task_log";
        this.messageIndexPrefix = this.indexPrefix + "_message";
        this.eventIndexPrefix = this.indexPrefix + "_event";
        int workerQueueSize = properties.getAsyncWorkerQueueSize();
        int maximumPoolSize = properties.getAsyncMaxPoolSize();
        this.bulkRequests = new ConcurrentHashMap();
        this.indexBatchSize = properties.getIndexBatchSize();
        this.asyncBufferFlushTimeout = properties.getAsyncBufferFlushTimeout().toMillis();
        this.properties = properties;
        this.docTypeOverride = !properties.isAutoIndexManagementEnabled() && StringUtils.isNotBlank((CharSequence)properties.getDocumentTypeOverride()) ? properties.getDocumentTypeOverride() : "";
        this.executorService = new ThreadPoolExecutor(6, maximumPoolSize, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(workerQueueSize), (runnable, executor) -> {
            LOGGER.warn("Request  {} to async dao discarded in executor {}", (Object)runnable, (Object)executor);
            Monitors.recordDiscardedIndexingCount((String)"indexQueue");
        });
        int corePoolSize = 1;
        maximumPoolSize = 2;
        long keepAliveTime = 30L;
        this.logExecutorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(workerQueueSize), (runnable, executor) -> {
            LOGGER.warn("Request {} to async log dao discarded in executor {}", (Object)runnable, (Object)executor);
            Monitors.recordDiscardedIndexingCount((String)"logQueue");
        });
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flushBulkRequests, 60L, 30L, TimeUnit.SECONDS);
        this.retryTemplate = retryTemplate;
    }

    @PreDestroy
    private void shutdown() {
        LOGGER.info("Starting graceful shutdown of executor service");
        this.shutdownExecutorService(this.logExecutorService);
        this.shutdownExecutorService(this.executorService);
    }

    private void shutdownExecutorService(ExecutorService execService) {
        try {
            execService.shutdown();
            if (execService.awaitTermination(30L, TimeUnit.SECONDS)) {
                LOGGER.debug("tasks completed, shutting down");
            } else {
                LOGGER.warn("Forcing shutdown after waiting for 30 seconds");
                execService.shutdownNow();
            }
        }
        catch (InterruptedException ie) {
            LOGGER.warn("Shutdown interrupted, invoking shutdownNow on scheduledThreadPoolExecutor for delay queue");
            execService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    @PostConstruct
    public void setup() throws Exception {
        this.waitForHealthyCluster();
        if (this.properties.isAutoIndexManagementEnabled()) {
            this.createIndexesTemplates();
            this.createWorkflowIndex();
            this.createTaskIndex();
        }
    }

    private void waitForHealthyCluster() throws Exception {
        this.elasticSearchClient.admin().cluster().prepareHealth(new String[0]).setWaitForGreenStatus().execute().get();
    }

    private void createIndexesTemplates() {
        try {
            this.initIndexesTemplates();
            this.updateIndexesNames();
            Executors.newScheduledThreadPool(1).scheduleAtFixedRate(this::updateIndexesNames, 0L, 1L, TimeUnit.HOURS);
        }
        catch (Exception e) {
            LOGGER.error("Error creating index templates", (Throwable)e);
        }
    }

    private void initIndexesTemplates() {
        this.initIndexTemplate(LOG_DOC_TYPE);
        this.initIndexTemplate(EVENT_DOC_TYPE);
        this.initIndexTemplate(MSG_DOC_TYPE);
    }

    private void initIndexTemplate(String type) {
        String template = "template_" + type;
        GetIndexTemplatesResponse result = (GetIndexTemplatesResponse)this.elasticSearchClient.admin().indices().prepareGetTemplates(new String[]{template}).execute().actionGet();
        if (result.getIndexTemplates().isEmpty()) {
            LOGGER.info("Creating the index template '{}'", (Object)template);
            try {
                String templateSource = this.loadTypeMappingSource("/" + template + ".json");
                this.elasticSearchClient.admin().indices().preparePutTemplate(template).setSource(templateSource.getBytes(), XContentType.JSON).execute().actionGet();
            }
            catch (Exception e) {
                LOGGER.error("Failed to init " + template, (Throwable)e);
            }
        }
    }

    private void updateIndexesNames() {
        this.logIndexName = this.updateIndexName(LOG_DOC_TYPE);
        this.eventIndexName = this.updateIndexName(EVENT_DOC_TYPE);
        this.messageIndexName = this.updateIndexName(MSG_DOC_TYPE);
    }

    private String updateIndexName(String type) {
        String indexName = this.indexPrefix + "_" + type + "_" + SIMPLE_DATE_FORMAT.format(new Date());
        this.createIndex(indexName);
        return indexName;
    }

    private void createWorkflowIndex() {
        this.createIndex(this.workflowIndexName);
        this.addTypeMapping(this.workflowIndexName, WORKFLOW_DOC_TYPE, "/mappings_docType_workflow.json");
    }

    private void createTaskIndex() {
        this.createIndex(this.taskIndexName);
        this.addTypeMapping(this.taskIndexName, TASK_DOC_TYPE, "/mappings_docType_task.json");
    }

    private void createIndex(String indexName) {
        try {
            ((GetIndexRequestBuilder)this.elasticSearchClient.admin().indices().prepareGetIndex().addIndices(new String[]{indexName})).execute().actionGet();
        }
        catch (IndexNotFoundException infe) {
            try {
                CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
                createIndexRequest.settings(Settings.builder().put("index.number_of_shards", this.properties.getIndexShardCount()).put("index.number_of_replicas", this.properties.getIndexReplicasCount()));
                this.elasticSearchClient.admin().indices().create(createIndexRequest).actionGet();
            }
            catch (ResourceAlreadyExistsException done) {
                LOGGER.error("Failed to update log index name: {}", (Object)indexName, (Object)done);
            }
        }
    }

    private void addTypeMapping(String indexName, String type, String sourcePath) {
        GetMappingsResponse getMappingsResponse = (GetMappingsResponse)((GetMappingsRequestBuilder)this.elasticSearchClient.admin().indices().prepareGetMappings(new String[]{indexName}).addTypes(new String[]{type})).execute().actionGet();
        if (getMappingsResponse.mappings().isEmpty()) {
            LOGGER.info("Adding the {} type mappings", (Object)indexName);
            try {
                String source = this.loadTypeMappingSource(sourcePath);
                this.elasticSearchClient.admin().indices().preparePutMapping(new String[]{indexName}).setType(type).setSource(source, XContentType.JSON).execute().actionGet();
            }
            catch (Exception e) {
                LOGGER.error("Failed to init index " + indexName + " mappings", (Throwable)e);
            }
        }
    }

    public void indexWorkflow(WorkflowSummary workflow) {
        try {
            long startTime = Instant.now().toEpochMilli();
            String id = workflow.getWorkflowId();
            byte[] doc = this.objectMapper.writeValueAsBytes((Object)workflow);
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? WORKFLOW_DOC_TYPE : this.docTypeOverride;
            UpdateRequest req = this.buildUpdateRequest(id, doc, this.workflowIndexName, docType);
            this.elasticSearchClient.update(req).actionGet();
            long endTime = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for indexing workflow: {}", (Object)(endTime - startTime), (Object)workflow.getWorkflowId());
            Monitors.recordESIndexTime((String)"index_workflow", (String)WORKFLOW_DOC_TYPE, (long)(endTime - startTime));
            Monitors.recordWorkerQueueSize((String)"indexQueue", (int)((ThreadPoolExecutor)this.executorService).getQueue().size());
        }
        catch (Exception e) {
            Monitors.error((String)CLASS_NAME, (String)"indexWorkflow");
            LOGGER.error("Failed to index workflow: {}", (Object)workflow.getWorkflowId(), (Object)e);
        }
    }

    public CompletableFuture<Void> asyncIndexWorkflow(WorkflowSummary workflow) {
        return CompletableFuture.runAsync(() -> this.indexWorkflow(workflow), this.executorService);
    }

    public void indexTask(TaskSummary task) {
        try {
            long startTime = Instant.now().toEpochMilli();
            String id = task.getTaskId();
            byte[] doc = this.objectMapper.writeValueAsBytes((Object)task);
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? TASK_DOC_TYPE : this.docTypeOverride;
            UpdateRequest req = new UpdateRequest(this.taskIndexName, docType, id);
            req.doc(doc, XContentType.JSON);
            req.upsert(doc, XContentType.JSON);
            this.indexObject(req, TASK_DOC_TYPE);
            long endTime = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for  indexing task:{} in workflow: {}", new Object[]{endTime - startTime, task.getTaskId(), task.getWorkflowId()});
            Monitors.recordESIndexTime((String)"index_task", (String)TASK_DOC_TYPE, (long)(endTime - startTime));
            Monitors.recordWorkerQueueSize((String)"indexQueue", (int)((ThreadPoolExecutor)this.executorService).getQueue().size());
        }
        catch (Exception e) {
            LOGGER.error("Failed to index task: {}", (Object)task.getTaskId(), (Object)e);
        }
    }

    public CompletableFuture<Void> asyncIndexTask(TaskSummary task) {
        return CompletableFuture.runAsync(() -> this.indexTask(task), this.executorService);
    }

    private void indexObject(UpdateRequest req, String docType) {
        if (this.bulkRequests.get(docType) == null) {
            this.bulkRequests.put(docType, new BulkRequests(System.currentTimeMillis(), this.elasticSearchClient.prepareBulk()));
        }
        this.bulkRequests.get(docType).getBulkRequestBuilder().add(req);
        if (this.bulkRequests.get(docType).getBulkRequestBuilder().numberOfActions() >= this.indexBatchSize) {
            this.indexBulkRequest(docType);
        }
    }

    private synchronized void indexBulkRequest(String docType) {
        if (this.bulkRequests.get(docType).getBulkRequestBuilder() != null && this.bulkRequests.get(docType).getBulkRequestBuilder().numberOfActions() > 0) {
            this.updateWithRetry(this.bulkRequests.get(docType).getBulkRequestBuilder(), docType);
            this.bulkRequests.put(docType, new BulkRequests(System.currentTimeMillis(), this.elasticSearchClient.prepareBulk()));
        }
    }

    public void addTaskExecutionLogs(List<TaskExecLog> taskExecLogs) {
        if (taskExecLogs.isEmpty()) {
            return;
        }
        try {
            long startTime = Instant.now().toEpochMilli();
            BulkRequestBuilderWrapper bulkRequestBuilder = new BulkRequestBuilderWrapper(this.elasticSearchClient.prepareBulk());
            for (TaskExecLog log : taskExecLogs) {
                String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? LOG_DOC_TYPE : this.docTypeOverride;
                IndexRequest request = new IndexRequest(this.logIndexName, docType);
                request.source(this.objectMapper.writeValueAsBytes((Object)log), XContentType.JSON);
                bulkRequestBuilder.add(request);
            }
            bulkRequestBuilder.execute().actionGet(5L, TimeUnit.SECONDS);
            long endTime = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for indexing taskExecutionLogs", (Object)(endTime - startTime));
            Monitors.recordESIndexTime((String)"index_task_execution_logs", (String)LOG_DOC_TYPE, (long)(endTime - startTime));
            Monitors.recordWorkerQueueSize((String)"logQueue", (int)((ThreadPoolExecutor)this.logExecutorService).getQueue().size());
        }
        catch (Exception e) {
            List taskIds = taskExecLogs.stream().map(TaskExecLog::getTaskId).collect(Collectors.toList());
            LOGGER.error("Failed to index task execution logs for tasks: {}", taskIds, (Object)e);
        }
    }

    public CompletableFuture<Void> asyncAddTaskExecutionLogs(List<TaskExecLog> logs) {
        return CompletableFuture.runAsync(() -> this.addTaskExecutionLogs(logs), this.logExecutorService);
    }

    public List<TaskExecLog> getTaskExecutionLogs(String taskId) {
        try {
            BoolQueryBuilder query = this.boolQueryBuilder("taskId='" + taskId + "'", "*");
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? LOG_DOC_TYPE : this.docTypeOverride;
            SearchRequestBuilder srb = this.elasticSearchClient.prepareSearch(new String[]{this.logIndexPrefix + "*"}).setQuery((QueryBuilder)query).setTypes(new String[]{docType}).setSize(this.properties.getTaskLogResultLimit()).addSort(SortBuilders.fieldSort((String)"createdTime").order(SortOrder.ASC));
            return this.mapTaskExecLogsResponse((SearchResponse)srb.execute().actionGet());
        }
        catch (Exception e) {
            LOGGER.error("Failed to get task execution logs for task: {}", (Object)taskId, (Object)e);
            return null;
        }
    }

    private List<TaskExecLog> mapTaskExecLogsResponse(SearchResponse response) throws IOException {
        SearchHit[] hits = response.getHits().getHits();
        ArrayList<TaskExecLog> logs = new ArrayList<TaskExecLog>(hits.length);
        for (SearchHit hit : hits) {
            String source = hit.getSourceAsString();
            TaskExecLog tel = (TaskExecLog)this.objectMapper.readValue(source, TaskExecLog.class);
            logs.add(tel);
        }
        return logs;
    }

    public void addMessage(String queue, Message message) {
        try {
            long startTime = Instant.now().toEpochMilli();
            HashMap<String, Object> doc = new HashMap<String, Object>();
            doc.put("messageId", message.getId());
            doc.put("payload", message.getPayload());
            doc.put("queue", queue);
            doc.put("created", System.currentTimeMillis());
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? MSG_DOC_TYPE : this.docTypeOverride;
            UpdateRequest req = new UpdateRequest(this.messageIndexName, docType, message.getId());
            req.doc(doc, XContentType.JSON);
            req.upsert(doc, XContentType.JSON);
            this.indexObject(req, MSG_DOC_TYPE);
            long endTime = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for  indexing message: {}", (Object)(endTime - startTime), (Object)message.getId());
            Monitors.recordESIndexTime((String)"add_message", (String)MSG_DOC_TYPE, (long)(endTime - startTime));
        }
        catch (Exception e) {
            LOGGER.error("Failed to index message: {}", (Object)message.getId(), (Object)e);
        }
    }

    public CompletableFuture<Void> asyncAddMessage(String queue, Message message) {
        return CompletableFuture.runAsync(() -> this.addMessage(queue, message), this.executorService);
    }

    public List<Message> getMessages(String queue) {
        try {
            BoolQueryBuilder fq = this.boolQueryBuilder("queue='" + queue + "'", "*");
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? MSG_DOC_TYPE : this.docTypeOverride;
            SearchRequestBuilder srb = this.elasticSearchClient.prepareSearch(new String[]{this.messageIndexPrefix + "*"}).setQuery((QueryBuilder)fq).setTypes(new String[]{docType}).addSort(SortBuilders.fieldSort((String)"created").order(SortOrder.ASC));
            return this.mapGetMessagesResponse((SearchResponse)srb.execute().actionGet());
        }
        catch (Exception e) {
            LOGGER.error("Failed to get messages for queue: {}", (Object)queue, (Object)e);
            return null;
        }
    }

    private List<Message> mapGetMessagesResponse(SearchResponse response) throws IOException {
        SearchHit[] hits = response.getHits().getHits();
        TypeFactory factory = TypeFactory.defaultInstance();
        MapType type = factory.constructMapType(HashMap.class, String.class, String.class);
        ArrayList<Message> messages = new ArrayList<Message>(hits.length);
        for (SearchHit hit : hits) {
            String source = hit.getSourceAsString();
            Map mapSource = (Map)this.objectMapper.readValue(source, (JavaType)type);
            Message msg = new Message((String)mapSource.get("messageId"), (String)mapSource.get("payload"), null);
            messages.add(msg);
        }
        return messages;
    }

    public void addEventExecution(EventExecution eventExecution) {
        try {
            long startTime = Instant.now().toEpochMilli();
            byte[] doc = this.objectMapper.writeValueAsBytes((Object)eventExecution);
            String id = eventExecution.getName() + "." + eventExecution.getEvent() + "." + eventExecution.getMessageId() + "." + eventExecution.getId();
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? EVENT_DOC_TYPE : this.docTypeOverride;
            UpdateRequest req = this.buildUpdateRequest(id, doc, this.eventIndexName, docType);
            this.indexObject(req, EVENT_DOC_TYPE);
            long endTime = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for indexing event execution: {}", (Object)(endTime - startTime), (Object)eventExecution.getId());
            Monitors.recordESIndexTime((String)"add_event_execution", (String)EVENT_DOC_TYPE, (long)(endTime - startTime));
            Monitors.recordWorkerQueueSize((String)"logQueue", (int)((ThreadPoolExecutor)this.logExecutorService).getQueue().size());
        }
        catch (Exception e) {
            LOGGER.error("Failed to index event execution: {}", (Object)eventExecution.getId(), (Object)e);
        }
    }

    public CompletableFuture<Void> asyncAddEventExecution(EventExecution eventExecution) {
        return CompletableFuture.runAsync(() -> this.addEventExecution(eventExecution), this.logExecutorService);
    }

    public List<EventExecution> getEventExecutions(String event) {
        try {
            BoolQueryBuilder fq = this.boolQueryBuilder("event='" + event + "'", "*");
            String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? EVENT_DOC_TYPE : this.docTypeOverride;
            SearchRequestBuilder srb = this.elasticSearchClient.prepareSearch(new String[]{this.eventIndexPrefix + "*"}).setQuery((QueryBuilder)fq).setTypes(new String[]{docType}).addSort(SortBuilders.fieldSort((String)"created").order(SortOrder.ASC));
            return this.mapEventExecutionsResponse((SearchResponse)srb.execute().actionGet());
        }
        catch (Exception e) {
            LOGGER.error("Failed to get executions for event: {}", (Object)event, (Object)e);
            return null;
        }
    }

    private List<EventExecution> mapEventExecutionsResponse(SearchResponse response) throws IOException {
        SearchHit[] hits = response.getHits().getHits();
        ArrayList<EventExecution> executions = new ArrayList<EventExecution>(hits.length);
        for (SearchHit hit : hits) {
            String source = hit.getSourceAsString();
            EventExecution tel = (EventExecution)this.objectMapper.readValue(source, EventExecution.class);
            executions.add(tel);
        }
        return executions;
    }

    private void updateWithRetry(BulkRequestBuilderWrapper request, String docType) {
        try {
            long startTime = Instant.now().toEpochMilli();
            this.retryTemplate.execute(context -> (BulkResponse)request.execute().actionGet(5L, TimeUnit.SECONDS));
            long endTime = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for indexing object of type: {}", (Object)(endTime - startTime), (Object)docType);
            Monitors.recordESIndexTime((String)"index_object", (String)docType, (long)(endTime - startTime));
        }
        catch (Exception e) {
            Monitors.error((String)CLASS_NAME, (String)"index");
            LOGGER.error("Failed to index {} for requests", (Object)request.numberOfActions(), (Object)e);
        }
    }

    public SearchResult<String> searchWorkflows(String query, String freeText, int start, int count, List<String> sort) {
        return this.search(query, start, count, sort, freeText, WORKFLOW_DOC_TYPE);
    }

    public long getWorkflowCount(String query, String freeText) {
        return this.count(query, freeText, WORKFLOW_DOC_TYPE);
    }

    public SearchResult<String> searchTasks(String query, String freeText, int start, int count, List<String> sort) {
        return this.search(query, start, count, sort, freeText, TASK_DOC_TYPE);
    }

    public void removeWorkflow(String workflowId) {
        try {
            long startTime = Instant.now().toEpochMilli();
            DeleteRequest request = new DeleteRequest(this.workflowIndexName, WORKFLOW_DOC_TYPE, workflowId);
            DeleteResponse response = (DeleteResponse)this.elasticSearchClient.delete(request).actionGet();
            if (response.getResult() == DocWriteResponse.Result.DELETED) {
                LOGGER.error("Index removal failed - document not found by id: {}", (Object)workflowId);
            }
            long endTime = Instant.now().toEpochMilli();
            LOGGER.debug("Time taken {} for removing workflow: {}", (Object)(endTime - startTime), (Object)workflowId);
            Monitors.recordESIndexTime((String)"remove_workflow", (String)WORKFLOW_DOC_TYPE, (long)(endTime - startTime));
            Monitors.recordWorkerQueueSize((String)"indexQueue", (int)((ThreadPoolExecutor)this.executorService).getQueue().size());
        }
        catch (Throwable e) {
            LOGGER.error("Failed to remove workflow {} from index", (Object)workflowId, (Object)e);
            Monitors.error((String)CLASS_NAME, (String)"remove");
        }
    }

    public CompletableFuture<Void> asyncRemoveWorkflow(String workflowId) {
        return CompletableFuture.runAsync(() -> this.removeWorkflow(workflowId), this.executorService);
    }

    public void updateWorkflow(String workflowInstanceId, String[] keys, Object[] values) {
        if (keys.length != values.length) {
            throw new ApplicationException(ApplicationException.Code.INVALID_INPUT, "Number of keys and values do not match");
        }
        long startTime = Instant.now().toEpochMilli();
        UpdateRequest request = new UpdateRequest(this.workflowIndexName, WORKFLOW_DOC_TYPE, workflowInstanceId);
        Map<String, Object> source = IntStream.range(0, keys.length).boxed().collect(Collectors.toMap(i -> keys[i], i -> values[i]));
        request.doc(source);
        LOGGER.debug("Updating workflow {} in elasticsearch index: {}", (Object)workflowInstanceId, (Object)this.workflowIndexName);
        this.elasticSearchClient.update(request).actionGet();
        long endTime = Instant.now().toEpochMilli();
        LOGGER.debug("Time taken {} for updating workflow: {}", (Object)(endTime - startTime), (Object)workflowInstanceId);
        Monitors.recordESIndexTime((String)"update_workflow", (String)WORKFLOW_DOC_TYPE, (long)(endTime - startTime));
        Monitors.recordWorkerQueueSize((String)"indexQueue", (int)((ThreadPoolExecutor)this.executorService).getQueue().size());
    }

    public CompletableFuture<Void> asyncUpdateWorkflow(String workflowInstanceId, String[] keys, Object[] values) {
        return CompletableFuture.runAsync(() -> this.updateWorkflow(workflowInstanceId, keys, values), this.executorService);
    }

    public String get(String workflowInstanceId, String fieldToGet) {
        Map sourceAsMap;
        String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? WORKFLOW_DOC_TYPE : this.docTypeOverride;
        GetRequest request = new GetRequest(this.workflowIndexName, docType, workflowInstanceId).fetchSourceContext(new FetchSourceContext(true, new String[]{fieldToGet}, Strings.EMPTY_ARRAY));
        GetResponse response = (GetResponse)this.elasticSearchClient.get(request).actionGet();
        if (response.isExists() && (sourceAsMap = response.getSourceAsMap()).get(fieldToGet) != null) {
            return sourceAsMap.get(fieldToGet).toString();
        }
        LOGGER.debug("Unable to find Workflow: {} in ElasticSearch index: {}.", (Object)workflowInstanceId, (Object)this.workflowIndexName);
        return null;
    }

    private long count(String structuredQuery, String freeTextQuery, String docType) {
        try {
            docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? docType : this.docTypeOverride;
            BoolQueryBuilder fq = this.boolQueryBuilder(structuredQuery, freeTextQuery);
            SearchRequestBuilder srb = this.elasticSearchClient.prepareSearch(new String[]{this.getIndexName(docType)}).setQuery((QueryBuilder)fq).setTypes(new String[]{docType}).storedFields(new String[]{"_id"}).setSize(0);
            SearchResponse response = (SearchResponse)srb.get();
            return response.getHits().getTotalHits();
        }
        catch (ParserException e) {
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), (Throwable)e);
        }
    }

    private SearchResult<String> search(String structuredQuery, int start, int size, List<String> sortOptions, String freeTextQuery, String docType) {
        try {
            docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? docType : this.docTypeOverride;
            BoolQueryBuilder fq = this.boolQueryBuilder(structuredQuery, freeTextQuery);
            SearchRequestBuilder srb = this.elasticSearchClient.prepareSearch(new String[]{this.getIndexName(docType)}).setQuery((QueryBuilder)fq).setTypes(new String[]{docType}).storedFields(new String[]{"_id"}).setFrom(start).setSize(size);
            this.addSortOptions(srb, sortOptions);
            return this.mapSearchResult((SearchResponse)srb.get());
        }
        catch (ParserException e) {
            throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, e.getMessage(), (Throwable)e);
        }
    }

    private void addSortOptions(SearchRequestBuilder srb, List<String> sortOptions) {
        if (sortOptions != null) {
            sortOptions.forEach(sortOption -> {
                SortOrder order = SortOrder.ASC;
                String field = sortOption;
                int indx = sortOption.indexOf(58);
                if (indx > 0) {
                    field = sortOption.substring(0, indx);
                    order = SortOrder.valueOf((String)sortOption.substring(indx + 1));
                }
                srb.addSort(field, order);
            });
        }
    }

    private SearchResult<String> mapSearchResult(SearchResponse response) {
        LinkedList result = new LinkedList();
        response.getHits().forEach(hit -> result.add(hit.getId()));
        long count = response.getHits().getTotalHits();
        return new SearchResult(count, result);
    }

    public List<String> searchArchivableWorkflows(String indexName, long archiveTtlDays) {
        BoolQueryBuilder q = QueryBuilders.boolQuery().must((QueryBuilder)QueryBuilders.rangeQuery((String)"endTime").lt((Object)LocalDate.now().minusDays(archiveTtlDays).toString()).gte((Object)LocalDate.now().minusDays(archiveTtlDays).minusDays(1L).toString())).should((QueryBuilder)QueryBuilders.termQuery((String)"status", (String)"COMPLETED")).should((QueryBuilder)QueryBuilders.termQuery((String)"status", (String)"FAILED")).should((QueryBuilder)QueryBuilders.termQuery((String)"status", (String)"TIMED_OUT")).should((QueryBuilder)QueryBuilders.termQuery((String)"status", (String)"TERMINATED")).mustNot((QueryBuilder)QueryBuilders.existsQuery((String)"archived")).minimumShouldMatch(1);
        String docType = StringUtils.isBlank((CharSequence)this.docTypeOverride) ? WORKFLOW_DOC_TYPE : this.docTypeOverride;
        SearchRequestBuilder s = this.elasticSearchClient.prepareSearch(new String[]{indexName}).setTypes(new String[]{docType}).setQuery((QueryBuilder)q).setSize(1000);
        return this.extractSearchIds(s);
    }

    private UpdateRequest buildUpdateRequest(String id, byte[] doc, String indexName, String docType) {
        UpdateRequest req = new UpdateRequest(indexName, docType, id);
        req.doc(doc, XContentType.JSON);
        req.upsert(doc, XContentType.JSON);
        req.retryOnConflict(5);
        return req;
    }

    private List<String> extractSearchIds(SearchRequestBuilder s) {
        SearchResponse response = (SearchResponse)s.execute().actionGet();
        SearchHits hits = response.getHits();
        LinkedList<String> ids = new LinkedList<String>();
        for (SearchHit hit : hits.getHits()) {
            ids.add(hit.getId());
        }
        return ids;
    }

    private void flushBulkRequests() {
        this.bulkRequests.entrySet().stream().filter(entry -> System.currentTimeMillis() - ((BulkRequests)entry.getValue()).getLastFlushTime() >= this.asyncBufferFlushTimeout).filter(entry -> ((BulkRequests)entry.getValue()).getBulkRequestBuilder() != null && ((BulkRequests)entry.getValue()).getBulkRequestBuilder().numberOfActions() > 0).forEach(entry -> {
            LOGGER.debug("Flushing bulk request buffer for type {}, size: {}", entry.getKey(), (Object)((BulkRequests)entry.getValue()).getBulkRequestBuilder().numberOfActions());
            this.indexBulkRequest((String)entry.getKey());
        });
    }

    static {
        SIMPLE_DATE_FORMAT.setTimeZone(GMT);
    }

    private static class BulkRequests {
        private long lastFlushTime;
        private BulkRequestBuilderWrapper bulkRequestBuilder;

        public long getLastFlushTime() {
            return this.lastFlushTime;
        }

        public void setLastFlushTime(long lastFlushTime) {
            this.lastFlushTime = lastFlushTime;
        }

        public BulkRequestBuilderWrapper getBulkRequestBuilder() {
            return this.bulkRequestBuilder;
        }

        public void setBulkRequestBuilder(BulkRequestBuilder bulkRequestBuilder) {
            this.bulkRequestBuilder = new BulkRequestBuilderWrapper(bulkRequestBuilder);
        }

        BulkRequests(long lastFlushTime, BulkRequestBuilder bulkRequestBuilder) {
            this.lastFlushTime = lastFlushTime;
            this.bulkRequestBuilder = new BulkRequestBuilderWrapper(bulkRequestBuilder);
        }
    }
}

