/*
 * Decompiled with CFR 0.152.
 */
package io.nosqlbench.engine.api.activityimpl.marker;

import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResult;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsIntervalSegment;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsSegment;
import io.nosqlbench.engine.api.activityapi.output.Output;
import io.nosqlbench.engine.api.activityimpl.marker.ByteTrackerExtent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class ContiguousOutputChunker
implements Output {
    private static final Logger logger = LogManager.getLogger(ContiguousOutputChunker.class);
    private final int extentSize;
    private final int maxExtents;
    private final List<Output> readers = new ArrayList<Output>();
    private final AtomicLong min;
    private final AtomicLong nextMin;
    private final AtomicReference<ByteTrackerExtent> markingExtents = new AtomicReference();
    private final ReentrantLock lock = new ReentrantLock(false);
    private final Condition nowMarking = this.lock.newCondition();
    private final Semaphore mutex = new Semaphore(1, false);

    public ContiguousOutputChunker(long min, long nextRangeMin, int extentSize, int maxExtents) {
        this.min = new AtomicLong(min);
        this.nextMin = new AtomicLong(nextRangeMin);
        this.extentSize = extentSize;
        this.maxExtents = maxExtents;
        this.initExtents();
    }

    public ContiguousOutputChunker(Activity activity) {
        if (!activity.getInputDispenserDelegate().getInput(0L).isContiguous()) {
            throw new RuntimeException("This type of output may not be used with non-contiguous inputs yet.");
        }
        this.min = new AtomicLong(activity.getActivityDef().getStartCycle());
        this.nextMin = new AtomicLong(activity.getActivityDef().getEndCycle());
        long stride = activity.getParams().getOptionalLong("stride").orElse(1L);
        long cycleCount = this.nextMin.get() - this.min.get();
        if (cycleCount % stride != 0L) {
            throw new RuntimeException("stride must evenly divide into cycles.");
        }
        this.extentSize = this.calculateExtentSize(cycleCount, stride);
        this.maxExtents = 3;
        this.initExtents();
    }

    private synchronized void initExtents() {
        ByteTrackerExtent extent = new ByteTrackerExtent(this.min.get(), this.min.get() + (long)this.extentSize);
        this.markingExtents.set(extent);
        for (int i = 0; i < this.maxExtents; ++i) {
            extent = extent.extend();
            logger.debug("added tracker extent " + extent.rangeSummary());
        }
        logger.info("using max " + this.maxExtents + " extents with getCount: " + this.extentSize);
    }

    @Override
    public synchronized void onCycleResultSegment(CycleResultsSegment segment) {
        logger.trace("on-cycle-result-segment: (" + segment + ")");
        for (CycleResult cr : segment) {
            this.onCycleResult(cr.getCycle(), cr.getResult());
        }
    }

    @Override
    public synchronized boolean onCycleResult(long completedCycle, int result) {
        logger.trace("on-cycle-result: (" + completedCycle + "," + result + ")");
        while (true) {
            ByteTrackerExtent extent;
            long unmarked;
            if ((unmarked = (extent = this.markingExtents.get()).markResult(completedCycle, result)) > 0L) {
                return true;
            }
            if (unmarked == 0L) {
                try {
                    this.mutex.acquire();
                    ByteTrackerExtent head = this.markingExtents.get();
                    while (head.isFullyFilled()) {
                        head.extend();
                        if (!this.markingExtents.compareAndSet(head, head.getNextExtent().get())) {
                            throw new RuntimeException("Unable to swap head extent.");
                        }
                        this.onFullyFilled(head);
                        head = this.markingExtents.get();
                    }
                    this.mutex.release();
                }
                catch (InterruptedException head) {
                }
                catch (Throwable t) {
                    throw t;
                }
                return true;
            }
            System.out.println("whoops");
        }
    }

    @Override
    public synchronized void close() throws Exception {
        try {
            this.mutex.acquire();
            ByteTrackerExtent e = this.markingExtents.get();
            while (e != null) {
                this.onFullyFilled(e);
                e = e.getNextExtent().get();
            }
            this.mutex.release();
            for (Output reader : this.readers) {
                logger.debug("closing downstream reader: " + reader);
                reader.close();
            }
        }
        catch (Throwable t) {
            logger.error("Error while attempting to close " + this + ": " + t, t);
            throw t;
        }
    }

    private void onFullyFilled(ByteTrackerExtent extent) {
        logger.trace("MARKER>: fully filled: " + extent);
        for (Output reader : this.readers) {
            CycleResultsIntervalSegment remainingSegment = extent.getRemainingSegment();
            if (remainingSegment == null) continue;
            reader.onCycleResultSegment(remainingSegment);
        }
    }

    private void onFullyServed(ByteTrackerExtent firstReadable) {
        logger.debug("TRACKER: fully tracked: " + firstReadable);
    }

    public synchronized void addExtentReader(Output reader) {
        this.readers.add(reader);
    }

    public synchronized void removeExtentReader(Output reader) {
        this.readers.remove(reader);
    }

    private int calculateExtentSize(long cycleCount, long stride) {
        if (cycleCount <= 2000000L) {
            return (int)cycleCount;
        }
        for (int cs = 2000000; cs > 500000; --cs) {
            if (cycleCount % (long)cs != 0L || (long)cs % stride != 0L) continue;
            return cs;
        }
        throw new RuntimeException("no even divisor of cycleCount and Stride between 500K and 2M, with cycles=" + cycleCount + ",  and stride=" + stride);
    }

    public String toString() {
        return ContiguousOutputChunker.class.getSimpleName() + "{extentSize=" + this.extentSize + ", maxExtents=" + this.maxExtents + ", readers=" + this.readers + ", min=" + this.min + ", nextMin=" + this.nextMin + ", markingExtents/Chain=" + this.markingExtents.get().getChainSize() + "}";
    }
}

