/*
 * Decompiled with CFR 0.152.
 */
package org.hibernate.search.backend.elasticsearch.orchestration.impl;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.hibernate.search.backend.elasticsearch.link.impl.ElasticsearchLink;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.AbstractElasticsearchWorkOrchestrator;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchBatchedWork;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchBatchedWorkProcessor;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchDefaultWorkBulker;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchDefaultWorkSequenceBuilder;
import org.hibernate.search.backend.elasticsearch.orchestration.impl.ElasticsearchSerialWorkOrchestrator;
import org.hibernate.search.backend.elasticsearch.resources.impl.BackendThreads;
import org.hibernate.search.backend.elasticsearch.work.impl.BulkableWork;
import org.hibernate.search.backend.elasticsearch.work.impl.ElasticsearchWorkExecutionContext;
import org.hibernate.search.backend.elasticsearch.work.impl.IndexingWork;
import org.hibernate.search.backend.elasticsearch.work.impl.NonBulkableWork;
import org.hibernate.search.engine.backend.orchestration.spi.BatchedWorkProcessor;
import org.hibernate.search.engine.backend.orchestration.spi.BatchingExecutor;
import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy;
import org.hibernate.search.engine.cfg.spi.ConfigurationProperty;
import org.hibernate.search.engine.cfg.spi.ConfigurationPropertySource;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.util.common.data.impl.SimpleHashFunction;
import org.hibernate.search.util.common.impl.Closer;

public class ElasticsearchBatchingWorkOrchestrator
extends AbstractElasticsearchWorkOrchestrator<ElasticsearchBatchedWork<?>>
implements ElasticsearchSerialWorkOrchestrator {
    private static final ConfigurationProperty<Integer> QUEUE_COUNT = ConfigurationProperty.forKey((String)"indexing.queue_count").asInteger().withDefault((Object)10).build();
    private static final ConfigurationProperty<Integer> QUEUE_SIZE = ConfigurationProperty.forKey((String)"indexing.queue_size").asInteger().withDefault((Object)1000).build();
    private static final ConfigurationProperty<Integer> MAX_BULK_SIZE = ConfigurationProperty.forKey((String)"indexing.max_bulk_size").asInteger().withDefault((Object)100).build();
    private final BackendThreads threads;
    private final FailureHandler failureHandler;
    private BatchingExecutor<ElasticsearchBatchedWorkProcessor>[] executors;

    public ElasticsearchBatchingWorkOrchestrator(String name, BackendThreads threads, ElasticsearchLink link, FailureHandler failureHandler) {
        super(name, link);
        this.threads = threads;
        this.failureHandler = failureHandler;
    }

    @Override
    public <T> CompletableFuture<T> submit(IndexingWork<T> work) {
        CompletableFuture future = new CompletableFuture();
        this.submit(new ElasticsearchBatchedWork<T>(work, future));
        return future;
    }

    protected void doStart(ConfigurationPropertySource propertySource) {
        int queueCount = (Integer)QUEUE_COUNT.get(propertySource);
        int queueSize = (Integer)QUEUE_SIZE.get(propertySource);
        int maxBulkSize = (Integer)MAX_BULK_SIZE.get(propertySource);
        ElasticsearchWorkExecutionContext executionContext = this.createWorkExecutionContext();
        this.executors = new BatchingExecutor[queueCount];
        for (int i = 0; i < this.executors.length; ++i) {
            ElasticsearchBatchedWorkProcessor processor = this.createProcessor(executionContext, maxBulkSize);
            this.executors[i] = new BatchingExecutor(this.getName() + " - " + i, (BatchedWorkProcessor)processor, queueSize, true, this.failureHandler);
        }
        for (BatchingExecutor<ElasticsearchBatchedWorkProcessor> executor : this.executors) {
            executor.start((ExecutorService)this.threads.getWorkExecutor());
        }
    }

    protected void doSubmit(ElasticsearchBatchedWork<?> work) throws InterruptedException {
        ((BatchingExecutor)SimpleHashFunction.pick((Object[])this.executors, (String)work.getQueuingKey())).submit(work);
    }

    protected CompletableFuture<?> getCompletion() {
        CompletableFuture[] completions = new CompletableFuture[this.executors.length];
        for (int i = 0; i < this.executors.length; ++i) {
            completions[i] = this.executors[i].getCompletion();
        }
        return CompletableFuture.allOf(completions);
    }

    protected void doStop() {
        try (Closer closer = new Closer();){
            closer.pushAll(BatchingExecutor::stop, (Object[])this.executors);
        }
    }

    private ElasticsearchBatchedWorkProcessor createProcessor(ElasticsearchWorkExecutionContext context, int maxBulkSize) {
        ElasticsearchDefaultWorkSequenceBuilder sequenceBuilder = new ElasticsearchDefaultWorkSequenceBuilder(context);
        ElasticsearchDefaultWorkBulker bulker = new ElasticsearchDefaultWorkBulker(sequenceBuilder, (worksToBulk, refreshStrategy) -> (NonBulkableWork)this.link.getWorkBuilderFactory().bulk((List<? extends BulkableWork<?>>)worksToBulk).refresh((DocumentRefreshStrategy)refreshStrategy).build(), maxBulkSize);
        return new ElasticsearchBatchedWorkProcessor(sequenceBuilder, bulker);
    }
}

