/*
 * Decompiled with CFR 0.152.
 */
package io.nats.bridge.task;

import io.nats.bridge.MessageBridge;
import io.nats.bridge.MessageBridgeTasksManager;
import io.nats.bridge.task.BridgeTaskRunner;
import io.nats.bridge.task.BridgeTaskRunnerBuilder;
import io.nats.bridge.task.NonResumeProcessNotifier;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.slf4j.Logger;

public class MessageBridgeTasksManagerImpl
implements MessageBridgeTasksManager {
    private final String name;
    private final Logger logger;
    private final Function<String, MessageBridge> bridgeBuilder;
    private final int workers;
    private final int tasks;
    private final ExecutorService pool;
    private final Duration pollDuration;
    private final boolean namePerTask;
    private AtomicBoolean stop = new AtomicBoolean();
    private AtomicInteger startedCount = new AtomicInteger();
    private AtomicReference<Exception> lastError = new AtomicReference();

    public MessageBridgeTasksManagerImpl(String name, Logger logger, Function<String, MessageBridge> bridgeFactory, int workers, int tasks, Duration pollDuration, boolean namePerTask) {
        this.name = name;
        this.logger = logger;
        this.bridgeBuilder = bridgeFactory;
        this.workers = workers;
        this.tasks = tasks;
        this.pool = Executors.newWorkStealingPool(workers + 1);
        this.pollDuration = pollDuration;
        this.namePerTask = namePerTask;
    }

    @Override
    public String name() {
        return this.name;
    }

    private String createName(int task, int worker) {
        if (this.namePerTask) {
            return String.format("%s_w%i_t%i", this.name, worker, task);
        }
        return this.name;
    }

    @Override
    public void start() {
        try {
            for (int worker = 0; worker < this.workers; ++worker) {
                ArrayList<MessageBridge> bridges = new ArrayList<MessageBridge>(this.tasks);
                for (int task = 0; task < this.tasks; ++task) {
                    bridges.add(this.bridgeBuilder.apply(this.createName(task, worker)));
                }
                BridgeTaskRunner runner = this.createBridgeTaskRunner(worker, bridges);
                this.pool.submit(() -> {
                    while (!this.stop.get()) {
                        runner.process();
                        try {
                            Thread.sleep(3000L);
                        }
                        catch (InterruptedException interruptedException) {}
                    }
                });
            }
        }
        catch (Exception ex) {
            this.lastError.set(ex);
            this.logger.error("Error starting message bridge task manager", (Throwable)ex);
        }
    }

    private BridgeTaskRunner createBridgeTaskRunner(int worker, List<MessageBridge> bridges) {
        BridgeTaskRunnerBuilder bridgeTaskRunnerBuilder = BridgeTaskRunnerBuilder.builder().withName(this.name()).withPollDuration(this.pollDuration).withMessageBridges(bridges).withWorker(worker).withProcessNotifier(new NonResumeProcessNotifier(this.name, worker, this.stop, this.logger, this.startedCount, this.lastError));
        return bridgeTaskRunnerBuilder.build();
    }

    @Override
    public void close() {
        this.stop.set(true);
    }

    @Override
    public boolean isHealthy() {
        return this.lastError.get() == null;
    }

    @Override
    public Exception lastError() {
        return this.lastError.get();
    }

    @Override
    public boolean wasStarted() {
        return this.workers == this.startedCount.get();
    }

    @Override
    public void clearLastError() {
        this.lastError.set(null);
    }
}

