/*
 * Decompiled with CFR 0.152.
 */
package alluxio.job.util;

import alluxio.client.Cancelable;
import alluxio.client.block.BlockStoreClient;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.block.policy.BlockLocationPolicy;
import alluxio.client.block.policy.LocalFirstPolicy;
import alluxio.client.block.stream.BlockInStream;
import alluxio.client.block.stream.BlockOutStream;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.URIStatus;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.collections.IndexDefinition;
import alluxio.collections.IndexedSet;
import alluxio.collections.Pair;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.exception.AlluxioException;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.NotFoundException;
import alluxio.grpc.CacheRequest;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.ReadPType;
import alluxio.proto.dataserver.Protocol;
import alluxio.resource.CloseableResource;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.BlockLocation;
import alluxio.wire.FileBlockInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

public final class JobUtils {
    private static final byte[] READ_BUF = new byte[0x800000];
    private static final IndexDefinition<BlockWorkerInfo, WorkerNetAddress> WORKER_ADDRESS_INDEX = IndexDefinition.ofUnique(BlockWorkerInfo::getNetAddress);

    public static BlockWorkerInfo getWorkerWithMostBlocks(List<BlockWorkerInfo> workers, List<FileBlockInfo> fileBlockInfos) {
        IndexedSet addressIndexedWorkers = new IndexedSet(WORKER_ADDRESS_INDEX, new IndexDefinition[0]);
        addressIndexedWorkers.addAll(workers);
        ConcurrentMap blocksPerWorker = Maps.newConcurrentMap();
        int maxBlocks = 0;
        BlockWorkerInfo mostBlocksWorker = null;
        for (FileBlockInfo fileBlockInfo : fileBlockInfos) {
            for (BlockLocation location : fileBlockInfo.getBlockInfo().getLocations()) {
                BlockWorkerInfo worker = (BlockWorkerInfo)addressIndexedWorkers.getFirstByField(WORKER_ADDRESS_INDEX, (Object)location.getWorkerAddress());
                if (worker == null) continue;
                blocksPerWorker.putIfAbsent(worker, 0);
                int newBlockCount = (Integer)blocksPerWorker.get(worker) + 1;
                blocksPerWorker.put(worker, newBlockCount);
                if (newBlockCount <= maxBlocks) continue;
                maxBlocks = newBlockCount;
                mostBlocksWorker = worker;
            }
        }
        return mostBlocksWorker;
    }

    public static void loadBlock(URIStatus status, FileSystemContext context, long blockId, WorkerNetAddress address, boolean directCache) throws AlluxioException, IOException {
        AlluxioConfiguration conf = Configuration.global();
        WorkerNetAddress localNetAddress = address;
        String localHostName = NetworkAddressUtils.getConnectHost((NetworkAddressUtils.ServiceAttributeProvider)NetworkAddressUtils.ServiceType.WORKER_RPC, (AlluxioConfiguration)conf);
        List netAddress = context.getCachedWorkers().stream().map(BlockWorkerInfo::getNetAddress).filter(x -> Objects.equals(x.getHost(), localHostName)).collect(Collectors.toList());
        if (localNetAddress == null && !netAddress.isEmpty()) {
            localNetAddress = (WorkerNetAddress)netAddress.get(0);
        }
        if (localNetAddress == null) {
            throw new NotFoundException(ExceptionMessage.NO_LOCAL_BLOCK_WORKER_LOAD_TASK.getMessage(new Object[]{blockId}));
        }
        Set pinnedLocation = status.getPinnedMediumTypes();
        if (pinnedLocation.size() > 1) {
            throw new AlluxioException(MessageFormat.format("File {0} pinned to multiple medium types", status.getPath()));
        }
        if (netAddress.size() <= 1 && !status.getFileInfo().isPinned() && status.isPersisted()) {
            if (directCache) {
                JobUtils.loadThroughCacheRequest(status, context, blockId, conf, localNetAddress);
            } else {
                JobUtils.loadThroughRead(status, context, blockId, conf);
            }
            return;
        }
        String medium = pinnedLocation.isEmpty() ? "" : (String)pinnedLocation.iterator().next();
        OpenFilePOptions openOptions = OpenFilePOptions.newBuilder().setReadType(ReadPType.NO_CACHE).build();
        InStreamOptions inOptions = new InStreamOptions(status, openOptions, conf, context);
        inOptions.setUfsReadLocationPolicy(BlockLocationPolicy.Factory.create(LocalFirstPolicy.class, (AlluxioConfiguration)conf));
        OutStreamOptions outOptions = OutStreamOptions.defaults((FileSystemContext)context);
        outOptions.setMediumType(medium);
        outOptions.setLocationPolicy(BlockLocationPolicy.Factory.create(LocalFirstPolicy.class, (AlluxioConfiguration)conf));
        BlockInfo blockInfo = status.getBlockInfo(blockId);
        Preconditions.checkNotNull((Object)blockInfo, (String)"Can not find block %s in status %s", (long)blockId, (Object)status);
        long blockSize = blockInfo.getLength();
        BlockStoreClient blockStore = BlockStoreClient.create((FileSystemContext)context);
        try (BlockOutStream outputStream = blockStore.getOutStream(blockId, blockSize, localNetAddress, outOptions);){
            try (BlockInStream inputStream = blockStore.getInStream(blockId, inOptions);){
                ByteStreams.copy((InputStream)inputStream, (OutputStream)outputStream);
            }
            catch (Throwable t) {
                try {
                    ((Cancelable)outputStream).cancel();
                }
                catch (Throwable t2) {
                    t.addSuppressed(t2);
                }
                throw t;
            }
        }
    }

    private static void loadThroughCacheRequest(URIStatus status, FileSystemContext context, long blockId, AlluxioConfiguration conf, WorkerNetAddress localNetAddress) throws IOException {
        BlockStoreClient blockStore = BlockStoreClient.create((FileSystemContext)context);
        OpenFilePOptions openOptions = OpenFilePOptions.newBuilder().setReadType(ReadPType.CACHE).build();
        InStreamOptions inOptions = new InStreamOptions(status, openOptions, conf, context);
        BlockLocationPolicy policy = BlockLocationPolicy.Factory.create(LocalFirstPolicy.class, (AlluxioConfiguration)conf);
        inOptions.setUfsReadLocationPolicy(policy);
        Protocol.OpenUfsBlockOptions openUfsBlockOptions = inOptions.getOpenUfsBlockOptions(blockId);
        BlockInfo info = (BlockInfo)Preconditions.checkNotNull((Object)status.getBlockInfo(blockId));
        long blockLength = info.getLength();
        Pair dataSourceAndType = blockStore.getDataSourceAndType(status.getBlockInfo(blockId), status, policy, (Map)ImmutableMap.of());
        WorkerNetAddress dataSource = (WorkerNetAddress)dataSourceAndType.getFirst();
        String host = dataSource.getHost();
        if (!dataSource.getContainerHost().equals("")) {
            host = dataSource.getContainerHost();
        }
        CacheRequest request = CacheRequest.newBuilder().setBlockId(blockId).setLength(blockLength).setOpenUfsBlockOptions(openUfsBlockOptions).setSourceHost(host).setSourcePort(dataSource.getDataPort()).build();
        try (CloseableResource blockWorker = context.acquireBlockWorkerClient(localNetAddress);){
            ((BlockWorkerClient)blockWorker.get()).cache(request);
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    private static void loadThroughRead(URIStatus status, FileSystemContext context, long blockId, AlluxioConfiguration conf) throws IOException {
        BlockStoreClient blockStore = BlockStoreClient.create((FileSystemContext)context);
        OpenFilePOptions openOptions = OpenFilePOptions.newBuilder().setReadType(ReadPType.CACHE).build();
        InStreamOptions inOptions = new InStreamOptions(status, openOptions, conf, context);
        inOptions.setUfsReadLocationPolicy(BlockLocationPolicy.Factory.create(LocalFirstPolicy.class, (AlluxioConfiguration)conf));
        BlockInfo info = (BlockInfo)Preconditions.checkNotNull((Object)status.getBlockInfo(blockId));
        try (BlockInStream inputStream = blockStore.getInStream(info, inOptions, (Map)ImmutableMap.of());){
            while (inputStream.read(READ_BUF) != -1) {
            }
        }
    }

    private JobUtils() {
    }
}

