/*
 * Decompiled with CFR 0.152.
 */
package io.nosqlbench.engine.api.activityimpl.motor;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.AsyncAction;
import io.nosqlbench.engine.api.activityapi.core.Motor;
import io.nosqlbench.engine.api.activityapi.core.MultiPhaseAction;
import io.nosqlbench.engine.api.activityapi.core.RunState;
import io.nosqlbench.engine.api.activityapi.core.Startable;
import io.nosqlbench.engine.api.activityapi.core.Stoppable;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.OpTracker;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.OpTrackerImpl;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.TrackedOp;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.op_output.StrideOutputConsumer;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultSegmentBuffer;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsSegment;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.output.Output;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.SlotStateTracker;
import io.nosqlbench.engine.api.activityimpl.motor.StrideTracker;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class CoreMotor<D>
implements ActivityDefObserver,
Motor<D>,
Stoppable {
    private static final Logger logger = LogManager.getLogger(CoreMotor.class);
    private final long slotId;
    private Timer inputTimer;
    private RateLimiter strideRateLimiter;
    private Timer stridesServiceTimer;
    private Timer stridesResponseTimer;
    private RateLimiter cycleRateLimiter;
    private Timer cyclesTimer;
    private Timer cycleResponseTimer;
    private RateLimiter phaseRateLimiter;
    private Timer phasesTimer;
    private Input input;
    private Action action;
    private final Activity activity;
    private Output output;
    private final SlotStateTracker slotStateTracker;
    private final AtomicReference<RunState> slotState;
    private int stride = 1;
    private OpTracker<D> opTracker;
    private Counter optrackerBlockCounter;

    public CoreMotor(Activity activity, long slotId, Input input) {
        this.activity = activity;
        this.slotId = slotId;
        this.setInput(input);
        this.slotStateTracker = new SlotStateTracker(slotId);
        this.slotState = this.slotStateTracker.getAtomicSlotState();
        this.onActivityDefUpdate(activity.getActivityDef());
    }

    public CoreMotor(Activity activity, long slotId, Input input, Action action) {
        this(activity, slotId, input);
        this.setAction(action);
    }

    public CoreMotor(Activity activity, long slotId, Input input, Action action, Output output) {
        this(activity, slotId, input);
        this.setAction(action);
        this.setResultOutput(output);
    }

    @Override
    public Motor<D> setInput(Input input) {
        this.input = input;
        return this;
    }

    @Override
    public Input getInput() {
        return this.input;
    }

    @Override
    public Motor<D> setAction(Action action) {
        this.action = action;
        return this;
    }

    @Override
    public Action getAction() {
        return this.action;
    }

    @Override
    public long getSlotId() {
        return this.slotId;
    }

    @Override
    public SlotStateTracker getSlotStateTracker() {
        return this.slotStateTracker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        block62: {
            try {
                this.inputTimer = this.activity.getInstrumentation().getOrCreateInputTimer();
                this.stridesServiceTimer = this.activity.getInstrumentation().getOrCreateStridesServiceTimer();
                this.stridesResponseTimer = this.activity.getInstrumentation().getStridesResponseTimerOrNull();
                this.optrackerBlockCounter = this.activity.getInstrumentation().getOrCreateOpTrackerBlockedCounter();
                this.strideRateLimiter = this.activity.getStrideLimiter();
                this.cycleRateLimiter = this.activity.getCycleLimiter();
                this.phaseRateLimiter = this.activity.getPhaseLimiter();
                if (this.slotState.get() == RunState.Finished) {
                    logger.warn("Input was already exhausted for slot " + this.slotId + ", remaining in finished state.");
                }
                this.slotStateTracker.enterState(RunState.Running);
                MultiPhaseAction multiPhaseAction = null;
                if (this.action instanceof MultiPhaseAction) {
                    multiPhaseAction = (MultiPhaseAction)this.action;
                }
                this.action.init();
                if (this.input instanceof Startable) {
                    ((Startable)((Object)this.input)).start();
                }
                if (this.strideRateLimiter != null) {
                    this.strideRateLimiter.start();
                }
                long strideDelay = 0L;
                long cycleDelay = 0L;
                long phaseDelay = 0L;
                if (this.action instanceof AsyncAction) {
                    AsyncAction async = (AsyncAction)this.action;
                    this.opTracker = new OpTrackerImpl(this.activity, this.slotId);
                    this.opTracker.setCycleOpFunction(async.getOpInitFunction());
                    StrideOutputConsumer strideconsumer = null;
                    if (this.action instanceof StrideOutputConsumer) {
                        strideconsumer = (StrideOutputConsumer)((Object)async);
                    }
                    while (this.slotState.get() == RunState.Running) {
                        CycleSegment cycleSegment = null;
                        try (Timer.Context inputTime = this.inputTimer.time();){
                            cycleSegment = this.input.getInputSegment(this.stride);
                        }
                        if (cycleSegment == null) {
                            logger.trace("input exhausted (input " + this.input + ") via null segment, stopping motor thread " + this.slotId);
                            this.slotStateTracker.enterState(RunState.Finished);
                            continue;
                        }
                        if (this.strideRateLimiter != null) {
                            strideDelay = this.strideRateLimiter.maybeWaitForOp();
                        }
                        StrideTracker strideTracker = new StrideTracker(this.stridesServiceTimer, this.stridesResponseTimer, strideDelay, cycleSegment.peekNextCycle(), this.stride, this.output, strideconsumer);
                        strideTracker.start();
                        long strideStart = System.nanoTime();
                        while (!cycleSegment.isExhausted() && this.slotState.get() == RunState.Running) {
                            long cyclenum = cycleSegment.nextCycle();
                            if (cyclenum < 0L && cycleSegment.isExhausted()) {
                                logger.trace("input exhausted (input " + this.input + ") via negative read, stopping motor thread " + this.slotId);
                                this.slotStateTracker.enterState(RunState.Finished);
                                continue;
                            }
                            if (this.slotState.get() != RunState.Running) {
                                logger.trace("motor stopped in cycle " + cyclenum + ", stopping motor thread " + this.slotId);
                                continue;
                            }
                            if (this.cycleRateLimiter != null) {
                                cycleDelay = this.cycleRateLimiter.maybeWaitForOp();
                            }
                            try {
                                TrackedOp<D> op = this.opTracker.newOp(cyclenum, strideTracker);
                                op.setWaitTime(cycleDelay);
                                OpTracker<D> opTracker = this.opTracker;
                                synchronized (opTracker) {
                                    while (this.opTracker.isFull()) {
                                        try {
                                            logger.trace(() -> "Blocking for enqueue with (" + this.opTracker.getPendingOps() + "/" + this.opTracker.getMaxPendingOps() + ") queued ops");
                                            this.optrackerBlockCounter.inc();
                                            this.opTracker.wait(10000L);
                                        }
                                        catch (InterruptedException interruptedException) {}
                                    }
                                }
                                async.enqueue(op);
                            }
                            catch (Exception t) {
                                logger.error("Error while processing async cycle " + cyclenum + ", error:" + t);
                                throw t;
                            }
                        }
                    }
                    if (this.slotState.get() == RunState.Finished) {
                        boolean finished = this.opTracker.awaitCompletion(60000L);
                        if (finished) {
                            logger.debug("slot " + this.slotId + " completed successfully");
                        } else {
                            logger.warn("slot " + this.slotId + " was stopped before completing successfully");
                        }
                    }
                    if (this.slotState.get() == RunState.Stopping) {
                        this.slotStateTracker.enterState(RunState.Stopped);
                    }
                    break block62;
                }
                if (this.action instanceof SyncAction) {
                    this.cyclesTimer = this.activity.getInstrumentation().getOrCreateCyclesServiceTimer();
                    this.stridesServiceTimer = this.activity.getInstrumentation().getOrCreateStridesServiceTimer();
                    this.phasesTimer = this.activity.getInstrumentation().getOrCreatePhasesServiceTimer();
                    if (this.activity.getActivityDef().getParams().containsKey((Object)"async")) {
                        throw new RuntimeException("The async parameter was given for this activity, but it does not seem to know how to do async.");
                    }
                    SyncAction sync = (SyncAction)this.action;
                    while (this.slotState.get() == RunState.Running) {
                        CycleSegment cycleSegment = null;
                        CycleResultSegmentBuffer segBuffer = new CycleResultSegmentBuffer(this.stride);
                        try (Timer.Context inputTime = this.inputTimer.time();){
                            cycleSegment = this.input.getInputSegment(this.stride);
                        }
                        if (cycleSegment == null) {
                            logger.trace("input exhausted (input " + this.input + ") via null segment, stopping motor thread " + this.slotId);
                            this.slotStateTracker.enterState(RunState.Finished);
                            continue;
                        }
                        if (this.strideRateLimiter != null) {
                            strideDelay = this.strideRateLimiter.maybeWaitForOp();
                        }
                        long strideStart = System.nanoTime();
                        try {
                            while (!cycleSegment.isExhausted()) {
                                long cyclenum = cycleSegment.nextCycle();
                                if (cyclenum < 0L && cycleSegment.isExhausted()) {
                                    logger.trace("input exhausted (input " + this.input + ") via negative read, stopping motor thread " + this.slotId);
                                    this.slotStateTracker.enterState(RunState.Finished);
                                    continue;
                                }
                                if (this.slotState.get() != RunState.Running) {
                                    logger.trace("motor stopped after input (input " + cyclenum + "), stopping motor thread " + this.slotId);
                                    continue;
                                }
                                int result = -1;
                                if (this.cycleRateLimiter != null) {
                                    cycleDelay = this.cycleRateLimiter.maybeWaitForOp();
                                }
                                long cycleStart = System.nanoTime();
                                try {
                                    logger.trace("cycle " + cyclenum);
                                    long phaseStart = System.nanoTime();
                                    if (this.phaseRateLimiter != null) {
                                        phaseDelay = this.phaseRateLimiter.maybeWaitForOp();
                                    }
                                    result = sync.runCycle(cyclenum);
                                    long phaseEnd = System.nanoTime();
                                    this.phasesTimer.update(phaseEnd - phaseStart + phaseDelay, TimeUnit.NANOSECONDS);
                                    if (multiPhaseAction != null) {
                                        while (multiPhaseAction.incomplete()) {
                                            phaseStart = System.nanoTime();
                                            if (this.phaseRateLimiter != null) {
                                                phaseDelay = this.phaseRateLimiter.maybeWaitForOp();
                                            }
                                            result = multiPhaseAction.runPhase(cyclenum);
                                            phaseEnd = System.nanoTime();
                                            this.phasesTimer.update(phaseEnd - phaseStart + phaseDelay, TimeUnit.NANOSECONDS);
                                        }
                                    }
                                }
                                finally {
                                    long cycleEnd = System.nanoTime();
                                    this.cyclesTimer.update(cycleEnd - cycleStart + cycleDelay, TimeUnit.NANOSECONDS);
                                }
                                segBuffer.append(cyclenum, result);
                            }
                        }
                        finally {
                            long strideEnd = System.nanoTime();
                            this.stridesServiceTimer.update(strideEnd - strideStart + strideDelay, TimeUnit.NANOSECONDS);
                        }
                        if (this.output == null) continue;
                        CycleResultsSegment outputBuffer = segBuffer.toReader();
                        try {
                            this.output.onCycleResultSegment(outputBuffer);
                        }
                        catch (Exception t) {
                            logger.error("Error while feeding result segment " + outputBuffer + " to output '" + this.output + "', error:" + t);
                            throw t;
                        }
                    }
                    if (this.slotState.get() == RunState.Stopping) {
                        this.slotStateTracker.enterState(RunState.Stopped);
                    }
                    break block62;
                }
                throw new RuntimeException("Valid Action implementations must implement either the SyncAction or the AsyncAction sub-interface");
            }
            catch (Throwable t) {
                logger.error("Error in core motor loop:" + t, t);
                throw t;
            }
        }
    }

    public String toString() {
        return "slot:" + this.slotId + "; state:" + (Object)((Object)this.slotState.get());
    }

    @Override
    public void onActivityDefUpdate(ActivityDef activityDef) {
        for (Object component : new Object[]{this.input, this.opTracker, this.action, this.output}) {
            if (!(component instanceof ActivityDefObserver)) continue;
            ((ActivityDefObserver)component).onActivityDefUpdate(activityDef);
        }
        this.stride = activityDef.getParams().getOptionalInteger("stride").orElse(1);
        this.strideRateLimiter = this.activity.getStrideLimiter();
        this.cycleRateLimiter = this.activity.getCycleLimiter();
        this.phaseRateLimiter = this.activity.getPhaseLimiter();
    }

    @Override
    public synchronized void requestStop() {
        if (this.slotState.get() == RunState.Running) {
            if (this.input instanceof Stoppable) {
                ((Stoppable)((Object)this.input)).requestStop();
            }
            if (this.action instanceof Stoppable) {
                ((Stoppable)((Object)this.action)).requestStop();
            }
            this.slotStateTracker.enterState(RunState.Stopping);
        } else if (this.slotState.get() != RunState.Stopped && this.slotState.get() != RunState.Stopping) {
            logger.warn("attempted to stop motor " + this.getSlotId() + ": from non Running state:" + (Object)((Object)this.slotState.get()));
        }
    }

    public void setResultOutput(Output resultOutput) {
        this.output = resultOutput;
    }
}

