/*
 * Decompiled with CFR 0.152.
 */
package alluxio.job.plan.replicate;

import alluxio.AlluxioURI;
import alluxio.client.block.BlockStoreClient;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.URIStatus;
import alluxio.collections.Pair;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.exception.status.NotFoundException;
import alluxio.grpc.RemoveBlockRequest;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.AbstractVoidPlanDefinition;
import alluxio.job.plan.replicate.Mode;
import alluxio.job.plan.replicate.SetReplicaConfig;
import alluxio.job.plan.replicate.SetReplicaTask;
import alluxio.job.util.JobUtils;
import alluxio.job.util.SerializableVoid;
import alluxio.resource.CloseableResource;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.BlockLocation;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class SetReplicaDefinition
extends AbstractVoidPlanDefinition<SetReplicaConfig, SetReplicaTask> {
    private static final Logger LOG = LoggerFactory.getLogger(SetReplicaDefinition.class);

    @Override
    public Class<SetReplicaConfig> getJobConfigClass() {
        return SetReplicaConfig.class;
    }

    @Override
    public Set<Pair<WorkerInfo, SetReplicaTask>> selectExecutors(SetReplicaConfig config, List<WorkerInfo> jobWorkerInfoList, SelectExecutorsContext context) throws Exception {
        Mode mode;
        Preconditions.checkArgument((!jobWorkerInfoList.isEmpty() ? 1 : 0) != 0, (Object)"No worker is available");
        long blockId = config.getBlockId();
        int numReplicas = config.getReplicas();
        Preconditions.checkArgument((numReplicas >= 0 ? 1 : 0) != 0);
        BlockStoreClient blockStore = BlockStoreClient.create((FileSystemContext)context.getFsContext());
        BlockInfo blockInfo = blockStore.getInfo(blockId);
        int currentNumReplicas = blockInfo.getLocations().size();
        HashSet result = Sets.newHashSet();
        if (numReplicas == currentNumReplicas) {
            LOG.warn("Evict target has already been satisfied for job:{}", (Object)config);
            return result;
        }
        int numToOperate = currentNumReplicas - numReplicas;
        if (numToOperate > 0) {
            mode = Mode.EVICT;
        } else {
            numToOperate = Math.abs(numToOperate);
            mode = Mode.REPLICATE;
        }
        Set hosts = blockInfo.getLocations().stream().map(BlockLocation::getWorkerAddress).map(WorkerNetAddress::getHost).collect(Collectors.toSet());
        Collections.shuffle(jobWorkerInfoList);
        for (WorkerInfo workerInfo : jobWorkerInfoList) {
            boolean condition = hosts.contains(workerInfo.getAddress().getHost());
            if (mode == Mode.REPLICATE) {
                boolean bl = condition = !condition;
            }
            if (!condition) continue;
            result.add(new Pair((Object)workerInfo, (Object)new SetReplicaTask(mode)));
            if (result.size() < numToOperate) continue;
            break;
        }
        return result;
    }

    @Override
    public SerializableVoid runTask(SetReplicaConfig config, SetReplicaTask task, RunTaskContext context) throws Exception {
        switch (task.getMode()) {
            case EVICT: {
                this.evict(config, context);
                break;
            }
            case REPLICATE: {
                this.replicate(config, context);
                break;
            }
            default: {
                throw new IllegalArgumentException(String.format("Unexpected replication mode {}.", new Object[]{task.getMode()}));
            }
        }
        return null;
    }

    private void evict(SetReplicaConfig config, RunTaskContext context) throws Exception {
        long blockId = config.getBlockId();
        String localHostName = NetworkAddressUtils.getConnectHost((NetworkAddressUtils.ServiceAttributeProvider)NetworkAddressUtils.ServiceType.WORKER_RPC, (AlluxioConfiguration)Configuration.global());
        List workerInfoList = context.getFsContext().getCachedWorkers();
        WorkerNetAddress localNetAddress = null;
        for (BlockWorkerInfo workerInfo : workerInfoList) {
            if (!workerInfo.getNetAddress().getHost().equals(localHostName)) continue;
            localNetAddress = workerInfo.getNetAddress();
            break;
        }
        if (localNetAddress == null) {
            String message = String.format("Cannot find a local block worker to evict block %d", blockId);
            throw new NotFoundException(message);
        }
        RemoveBlockRequest request = RemoveBlockRequest.newBuilder().setBlockId(blockId).build();
        try (CloseableResource blockWorker = context.getFsContext().acquireBlockWorkerClient(localNetAddress);){
            ((BlockWorkerClient)blockWorker.get()).removeBlock(request);
        }
        catch (NotFoundException e) {
            LOG.warn("Failed to delete block {} on {}: block does not exist", (Object)blockId, (Object)localNetAddress);
        }
    }

    private void replicate(SetReplicaConfig config, RunTaskContext context) throws Exception {
        URIStatus status = context.getFileSystem().getStatus(new AlluxioURI(config.getPath()));
        JobUtils.loadBlock(status, context.getFsContext(), config.getBlockId(), null, false);
        LOG.info("Replicated file " + config.getPath() + " block " + config.getBlockId());
    }
}

