/*
 * Decompiled with CFR 0.152.
 */
package alluxio.stress.cli.worker;

import alluxio.AlluxioURI;
import alluxio.annotation.SuppressFBWarnings;
import alluxio.client.file.FileSystem;
import alluxio.grpc.WritePType;
import alluxio.stress.cli.AbstractStressBench;
import alluxio.stress.cli.client.ClientIOWritePolicy;
import alluxio.stress.worker.WorkerBenchParameters;
import alluxio.stress.worker.WorkerBenchTaskResult;
import alluxio.util.CommonUtils;
import alluxio.util.FormatUtils;
import alluxio.util.executor.ExecutorServiceFactories;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StressWorkerBench
extends AbstractStressBench<WorkerBenchTaskResult, WorkerBenchParameters> {
    private static final Logger LOG = LoggerFactory.getLogger(StressWorkerBench.class);
    private FileSystem[] mCachedFs;
    private Path[] mFilePaths;
    private Integer[] mOffsets;
    private Integer[] mLengths;

    private Integer randomNumInRange(Random rand, int min, int max) {
        return rand.nextInt(max - min + 1) + min;
    }

    public StressWorkerBench() {
        this.mParameters = new WorkerBenchParameters();
    }

    public static void main(String[] args) {
        StressWorkerBench.mainInternal(args, new StressWorkerBench());
    }

    @Override
    public String getBenchDescription() {
        return String.join((CharSequence)"\n", (Iterable<? extends CharSequence>)ImmutableList.of((Object)"A benchmarking tool to measure the read performance of alluxio workers in the cluster", (Object)"The test will create one file and repeatedly read the created file to test the performance", (Object)"", (Object)"Example:", (Object)"# This would create a 100MB file with block size of 16KB and then read the file for 30s after 10s warmup", (Object)"$ bin/alluxio runClass alluxio.stress.cli.worker.StressWorkerBench --clients 1 --base alluxio:///stress-worker-base --block-size 16k --file-size 100m --warmup 10s --duration 30s --cluster\n"));
    }

    @Override
    @SuppressFBWarnings(value={"BC_UNCONFIRMED_CAST"})
    public void prepare() throws Exception {
        ClientIOWritePolicy.setMaxWorkers(1);
        Path path = new Path(((WorkerBenchParameters)this.mParameters).mBasePath);
        int fileSize = (int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mFileSize);
        this.mFilePaths = new Path[((WorkerBenchParameters)this.mParameters).mNumFiles.intValue()];
        this.mLengths = new Integer[((WorkerBenchParameters)this.mParameters).mNumFiles.intValue()];
        this.mOffsets = new Integer[((WorkerBenchParameters)this.mParameters).mNumFiles.intValue()];
        Random rand = new Random();
        if (((WorkerBenchParameters)this.mParameters).mIsRandom) {
            rand = new Random(((WorkerBenchParameters)this.mParameters).mRandomSeed.intValue());
        }
        for (int i = 0; i < ((WorkerBenchParameters)this.mParameters).mNumFiles; ++i) {
            Path filePath;
            this.mFilePaths[i] = filePath = new Path(path, "data" + i);
            if (((WorkerBenchParameters)this.mParameters).mIsRandom) {
                int n = (int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mRandomMinReadLength);
                int randomMax = (int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mRandomMaxReadLength);
                this.mOffsets[i] = this.randomNumInRange(rand, 0, fileSize - 1 - n);
                this.mLengths[i] = this.randomNumInRange(rand, n, Integer.min(fileSize - this.mOffsets[i], randomMax));
                continue;
            }
            this.mOffsets[i] = 0;
            this.mLengths[i] = fileSize;
        }
        if (!this.mBaseParameters.mDistributed) {
            Configuration hdfsConf = new Configuration();
            hdfsConf.set("alluxio.user.file.delete.unchecked", "true");
            if (((WorkerBenchParameters)this.mParameters).mFree && WritePType.MUST_CACHE.name().equals(((WorkerBenchParameters)this.mParameters).mWriteType)) {
                throw new IllegalStateException(String.format("%s cannot be %s when %s option provided", "--write-type", WritePType.MUST_CACHE, "--free"));
            }
            hdfsConf.set("alluxio.user.file.writetype.default", ((WorkerBenchParameters)this.mParameters).mWriteType);
            hdfsConf.set("alluxio.user.block.write.location.policy.class", ClientIOWritePolicy.class.getName());
            hdfsConf.set("alluxio.user.ufs.block.read.location.policy", ClientIOWritePolicy.class.getName());
            FileSystem prepareFs = FileSystem.get((URI)new URI(((WorkerBenchParameters)this.mParameters).mBasePath), (Configuration)hdfsConf);
            if (!((WorkerBenchParameters)this.mParameters).mSkipCreation) {
                prepareFs.delete(path, true);
                prepareFs.mkdirs(path);
                byte[] byArray = new byte[(int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mBufferSize)];
                Arrays.fill(byArray, (byte)65);
                for (int i = 0; i < ((WorkerBenchParameters)this.mParameters).mNumFiles; ++i) {
                    Path filePath = this.mFilePaths[i];
                    try (FSDataOutputStream mOutStream = prepareFs.create(filePath, false, byArray.length, (short)1, FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)this.mParameters).mBlockSize));){
                        int bytesToWrite;
                        while ((bytesToWrite = (int)Math.min((long)fileSize - mOutStream.getPos(), (long)byArray.length)) != 0) {
                            mOutStream.write(byArray, 0, bytesToWrite);
                        }
                    }
                    if (!((WorkerBenchParameters)this.mParameters).mFree || !"alluxio".equals(filePath.toUri().getScheme())) continue;
                    FileSystem.Factory.get().free(new AlluxioURI(filePath.toString()));
                    LOG.info("Freed file before reading: " + filePath);
                }
            }
        }
        Configuration hdfsConf = new Configuration();
        hdfsConf.set(String.format("fs.%s.impl.disable.cache", new URI(((WorkerBenchParameters)this.mParameters).mBasePath).getScheme()), "true");
        hdfsConf.set("alluxio.user.block.write.location.policy.class", ClientIOWritePolicy.class.getName());
        hdfsConf.set("alluxio.user.ufs.block.read.location.policy", ClientIOWritePolicy.class.getName());
        for (Map.Entry entry : ((WorkerBenchParameters)this.mParameters).mConf.entrySet()) {
            hdfsConf.set((String)entry.getKey(), (String)entry.getValue());
        }
        this.mCachedFs = new FileSystem[((WorkerBenchParameters)this.mParameters).mClients];
        for (int i = 0; i < this.mCachedFs.length; ++i) {
            this.mCachedFs[i] = FileSystem.get((URI)new URI(((WorkerBenchParameters)this.mParameters).mBasePath), (Configuration)hdfsConf);
        }
    }

    @Override
    @SuppressFBWarnings(value={"BC_UNCONFIRMED_CAST"})
    public WorkerBenchTaskResult runLocal() throws Exception {
        ExecutorService service = ExecutorServiceFactories.fixedThreadPool((String)"bench-thread", (int)((WorkerBenchParameters)this.mParameters).mThreads).create();
        long durationMs = FormatUtils.parseTimeSize((String)((WorkerBenchParameters)this.mParameters).mDuration);
        long warmupMs = FormatUtils.parseTimeSize((String)((WorkerBenchParameters)this.mParameters).mWarmup);
        long startMs = this.mBaseParameters.mStartMs;
        if (this.mBaseParameters.mStartMs == -1L) {
            startMs = CommonUtils.getCurrentMs() + 5000L;
        }
        long endMs = startMs + warmupMs + durationMs;
        BenchContext context = new BenchContext(startMs, endMs);
        ArrayList<BenchThread> callables = new ArrayList<BenchThread>(((WorkerBenchParameters)this.mParameters).mThreads);
        for (int i = 0; i < ((WorkerBenchParameters)this.mParameters).mThreads; ++i) {
            callables.add(new BenchThread(context, this.mCachedFs[i % this.mCachedFs.length]));
        }
        service.invokeAll(callables, FormatUtils.parseTimeSize((String)this.mBaseParameters.mBenchTimeout), TimeUnit.MILLISECONDS);
        service.shutdownNow();
        service.awaitTermination(30L, TimeUnit.SECONDS);
        return context.getResult();
    }

    @Override
    public void validateParams() throws Exception {
    }

    private final class BenchThread
    implements Callable<Void> {
        private final BenchContext mContext;
        private final FileSystem mFs;
        private final byte[] mBuffer;
        private final WorkerBenchTaskResult mResult;
        private final boolean mIsRandomReed;
        private final FSDataInputStream[] mInStreams;

        @SuppressFBWarnings(value={"BC_UNCONFIRMED_CAST"})
        private BenchThread(BenchContext context, FileSystem fs) {
            this.mInStreams = new FSDataInputStream[StressWorkerBench.this.mFilePaths.length];
            this.mContext = context;
            this.mFs = fs;
            this.mBuffer = new byte[(int)FormatUtils.parseSpaceSize((String)((WorkerBenchParameters)((StressWorkerBench)StressWorkerBench.this).mParameters).mBufferSize)];
            this.mResult = new WorkerBenchTaskResult();
            this.mResult.setParameters((WorkerBenchParameters)StressWorkerBench.this.mParameters);
            this.mResult.setBaseParameters(StressWorkerBench.this.mBaseParameters);
            this.mIsRandomReed = ((WorkerBenchParameters)((StressWorkerBench)StressWorkerBench.this).mParameters).mIsRandom;
        }

        @Override
        public Void call() {
            try {
                this.runInternal();
            }
            catch (Exception e) {
                LOG.error(Thread.currentThread().getName() + ": failed", (Throwable)e);
                this.mResult.addErrorMessage(e.getMessage());
            }
            finally {
                for (int i = 0; i < this.mInStreams.length; ++i) {
                    this.closeInStream(i);
                }
            }
            this.mResult.setEndMs(CommonUtils.getCurrentMs());
            this.mContext.mergeThreadResult(this.mResult);
            return null;
        }

        @SuppressFBWarnings(value={"BC_UNCONFIRMED_CAST"})
        private void runInternal() throws Exception {
            long recordMs = this.mContext.getStartMs() + FormatUtils.parseTimeSize((String)((WorkerBenchParameters)((StressWorkerBench)StressWorkerBench.this).mParameters).mWarmup);
            this.mResult.setRecordStartMs(recordMs);
            long waitMs = this.mContext.getStartMs() - CommonUtils.getCurrentMs();
            if (waitMs < 0L) {
                throw new IllegalStateException(String.format("Thread missed barrier. Increase the start delay. start: %d current: %d", this.mContext.getStartMs(), CommonUtils.getCurrentMs()));
            }
            CommonUtils.sleepMs((long)waitMs);
            int i = 0;
            while (!Thread.currentThread().isInterrupted() && CommonUtils.getCurrentMs() < this.mContext.getEndMs() && i < StressWorkerBench.this.mFilePaths.length) {
                int ioBytes = this.applyOperation(i);
                long currentMs = CommonUtils.getCurrentMs();
                if (currentMs > recordMs && ioBytes > 0) {
                    this.mResult.incrementIOBytes((long)ioBytes);
                }
                if (++i < StressWorkerBench.this.mFilePaths.length) continue;
                i = 0;
            }
        }

        private int applyOperation(int i) throws IOException {
            Path filePath = StressWorkerBench.this.mFilePaths[i];
            int offset = StressWorkerBench.this.mOffsets[i];
            int length = StressWorkerBench.this.mLengths[i];
            if (this.mInStreams[i] == null) {
                this.mInStreams[i] = this.mFs.open(filePath);
            }
            int bytesRead = 0;
            if (this.mIsRandomReed) {
                while (length > 0) {
                    int actualReadLength = this.mInStreams[i].read((long)offset, this.mBuffer, 0, this.mBuffer.length);
                    if (actualReadLength < 0) {
                        this.closeInStream(i);
                        break;
                    }
                    bytesRead += actualReadLength;
                    length -= actualReadLength;
                    offset += actualReadLength;
                }
            } else {
                while (true) {
                    int actualReadLength;
                    if ((actualReadLength = this.mInStreams[i].read(this.mBuffer)) < 0) {
                        this.closeInStream(i);
                        this.mInStreams[i] = this.mFs.open(filePath);
                        break;
                    }
                    bytesRead += actualReadLength;
                }
            }
            return bytesRead;
        }

        private void closeInStream(int i) {
            try {
                if (this.mInStreams[i] != null) {
                    this.mInStreams[i].close();
                }
            }
            catch (IOException e) {
                this.mResult.addErrorMessage(e.getMessage());
            }
            finally {
                this.mInStreams[i] = null;
            }
        }
    }

    private static final class BenchContext {
        private final long mStartMs;
        private final long mEndMs;
        private WorkerBenchTaskResult mResult;

        public BenchContext(long startMs, long endMs) {
            this.mStartMs = startMs;
            this.mEndMs = endMs;
        }

        public long getStartMs() {
            return this.mStartMs;
        }

        public long getEndMs() {
            return this.mEndMs;
        }

        public synchronized void mergeThreadResult(WorkerBenchTaskResult threadResult) {
            if (this.mResult == null) {
                this.mResult = threadResult;
                return;
            }
            try {
                this.mResult.merge(threadResult);
            }
            catch (Exception e) {
                this.mResult.addErrorMessage(e.getMessage());
            }
        }

        synchronized WorkerBenchTaskResult getResult() {
            return this.mResult;
        }
    }
}

