/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode.erasurecode;

import java.util.Collection;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedBlockReconstructor;
import org.apache.hadoop.hdfs.server.datanode.erasurecode.StripedReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;

@InterfaceAudience.Private
public final class ErasureCodingWorker {
    private static final Logger LOG = DataNode.LOG;
    private final DataNode datanode;
    private final Configuration conf;
    private ThreadPoolExecutor stripedReconstructionPool;
    private ThreadPoolExecutor stripedReadPool;

    public ErasureCodingWorker(Configuration conf, DataNode datanode) {
        this.datanode = datanode;
        this.conf = conf;
        this.initializeStripedReadThreadPool(conf.getInt("dfs.datanode.ec.reconstruction.stripedread.threads", 20));
        this.initializeStripedBlkReconstructionThreadPool(conf.getInt("dfs.datanode.ec.reconstruction.stripedblock.threads.size", 8));
    }

    private void initializeStripedReadThreadPool(int num) {
        LOG.debug("Using striped reads; pool threads={}", (Object)num);
        this.stripedReadPool = new ThreadPoolExecutor(1, num, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), (ThreadFactory)new Daemon.DaemonFactory(){
            private final AtomicInteger threadIndex = new AtomicInteger(0);

            public Thread newThread(Runnable r) {
                Thread t = super.newThread(r);
                t.setName("stripedRead-" + this.threadIndex.getAndIncrement());
                return t;
            }
        }, new ThreadPoolExecutor.CallerRunsPolicy(){

            @Override
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
                LOG.info("Execution for striped reading rejected, Executing in current thread");
                super.rejectedExecution(runnable, e);
            }
        });
        this.stripedReadPool.allowCoreThreadTimeOut(true);
    }

    private void initializeStripedBlkReconstructionThreadPool(int numThreads) {
        LOG.debug("Using striped block reconstruction; pool threads={}", (Object)numThreads);
        this.stripedReconstructionPool = DFSUtilClient.getThreadPoolExecutor((int)2, (int)numThreads, (long)60L, (String)"StripedBlockReconstruction-", (boolean)false);
        this.stripedReconstructionPool.allowCoreThreadTimeOut(true);
    }

    public void processErasureCodingTasks(Collection<BlockECReconstructionCommand.BlockECReconstructionInfo> ecTasks) {
        for (BlockECReconstructionCommand.BlockECReconstructionInfo reconInfo : ecTasks) {
            try {
                StripedReconstructionInfo stripedReconInfo = new StripedReconstructionInfo(reconInfo.getExtendedBlock(), reconInfo.getErasureCodingPolicy(), reconInfo.getLiveBlockIndices(), reconInfo.getSourceDnInfos(), reconInfo.getTargetDnInfos(), reconInfo.getTargetStorageTypes());
                StripedBlockReconstructor task = new StripedBlockReconstructor(this, stripedReconInfo);
                if (task.hasValidTargets()) {
                    this.stripedReconstructionPool.submit(task);
                    continue;
                }
                LOG.warn("No missing internal block. Skip reconstruction for task:{}", (Object)reconInfo);
            }
            catch (Throwable e) {
                LOG.warn("Failed to reconstruct striped block {}", (Object)reconInfo.getExtendedBlock().getLocalBlock(), (Object)e);
            }
        }
    }

    DataNode getDatanode() {
        return this.datanode;
    }

    Configuration getConf() {
        return this.conf;
    }

    ThreadPoolExecutor getStripedReadPool() {
        return this.stripedReadPool;
    }
}

