/*
 * Decompiled with CFR 0.152.
 */
package alluxio.job.plan.replicate;

import alluxio.AlluxioURI;
import alluxio.ClientContext;
import alluxio.client.block.BlockStoreClient;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.block.stream.BlockInStream;
import alluxio.client.block.stream.BlockOutStream;
import alluxio.client.block.stream.TestBlockInStream;
import alluxio.client.block.stream.TestBlockOutStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.URIStatus;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.collections.Pair;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.status.NotFoundException;
import alluxio.job.JobServerContext;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.replicate.Mode;
import alluxio.job.plan.replicate.SetReplicaConfig;
import alluxio.job.plan.replicate.SetReplicaDefinition;
import alluxio.job.plan.replicate.SetReplicaTask;
import alluxio.underfs.UfsManager;
import alluxio.util.io.BufferUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.BlockLocation;
import alluxio.wire.FileBlockInfo;
import alluxio.wire.FileInfo;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

public final class SetReplicaDefinitionReplicateTest {
    private static final long TEST_BLOCK_ID = 1L;
    private static final long TEST_BLOCK_SIZE = 512L;
    private static final int MAX_BYTES = 1000;
    private static final WorkerNetAddress ADDRESS_1 = new WorkerNetAddress().setHost("host1").setDataPort(10);
    private static final WorkerNetAddress ADDRESS_2 = new WorkerNetAddress().setHost("host2").setDataPort(10);
    private static final WorkerNetAddress ADDRESS_3 = new WorkerNetAddress().setHost("host3").setDataPort(10);
    private static final WorkerNetAddress LOCAL_ADDRESS = new WorkerNetAddress().setHost(NetworkAddressUtils.getLocalHostName((int)((int)Configuration.getMs((PropertyKey)PropertyKey.NETWORK_HOST_RESOLUTION_TIMEOUT_MS)))).setDataPort(10);
    private static final WorkerInfo WORKER_INFO_1 = new WorkerInfo().setAddress(ADDRESS_1);
    private static final WorkerInfo WORKER_INFO_2 = new WorkerInfo().setAddress(ADDRESS_2);
    private static final WorkerInfo WORKER_INFO_3 = new WorkerInfo().setAddress(ADDRESS_3);
    private static final String TEST_PATH = "/test";
    private FileSystemContext mMockFileSystemContext;
    private BlockStoreClient mMockBlockStore;
    private FileSystem mMockFileSystem;
    private JobServerContext mMockJobServerContext;
    private UfsManager mMockUfsManager;
    private BlockInfo mTestBlockInfo;
    private URIStatus mTestStatus;
    private MockedStatic<BlockStoreClient> mMockStaticBlockStore;
    @Rule
    public final ExpectedException mThrown = ExpectedException.none();

    @Before
    public void before() throws Exception {
        this.mMockFileSystemContext = (FileSystemContext)Mockito.mock(FileSystemContext.class);
        Mockito.when((Object)this.mMockFileSystemContext.getClientContext()).thenReturn((Object)ClientContext.create((AlluxioConfiguration)Configuration.global()));
        Mockito.when((Object)this.mMockFileSystemContext.getClusterConf()).thenReturn((Object)Configuration.global());
        this.mMockBlockStore = (BlockStoreClient)Mockito.mock(BlockStoreClient.class);
        this.mMockFileSystem = (FileSystem)Mockito.mock(FileSystem.class);
        this.mMockUfsManager = (UfsManager)Mockito.mock(UfsManager.class);
        this.mMockJobServerContext = new JobServerContext(this.mMockFileSystem, this.mMockFileSystemContext, this.mMockUfsManager);
        this.mMockStaticBlockStore = Mockito.mockStatic(BlockStoreClient.class);
        this.mMockStaticBlockStore.when(() -> BlockStoreClient.create((FileSystemContext)this.mMockFileSystemContext)).thenReturn((Object)this.mMockBlockStore);
        this.mTestBlockInfo = new BlockInfo().setBlockId(1L).setLength(512L);
        Mockito.when((Object)this.mMockBlockStore.getInfo(1L)).thenReturn((Object)this.mTestBlockInfo);
        this.mTestStatus = new URIStatus(new FileInfo().setPath(TEST_PATH).setBlockIds((List)Lists.newArrayList((Object[])new Long[]{1L})).setPersisted(true).setFileBlockInfos((List)Lists.newArrayList((Object[])new FileBlockInfo[]{new FileBlockInfo().setBlockInfo(this.mTestBlockInfo)})));
    }

    @After
    public void after() {
        this.mMockStaticBlockStore.close();
    }

    private Set<Pair<WorkerInfo, SetReplicaTask>> selectExecutorsTestHelper(int numReplicas, List<WorkerInfo> workerInfoList) throws Exception {
        SetReplicaConfig config = new SetReplicaConfig(TEST_PATH, 1L, numReplicas);
        SetReplicaDefinition definition = new SetReplicaDefinition();
        return definition.selectExecutors(config, workerInfoList, new SelectExecutorsContext(1L, this.mMockJobServerContext));
    }

    private void runTaskReplicateTestHelper(List<BlockWorkerInfo> blockWorkers, BlockInStream mockInStream, BlockOutStream mockOutStream) throws Exception {
        Mockito.when((Object)this.mMockFileSystem.getStatus((AlluxioURI)ArgumentMatchers.any(AlluxioURI.class))).thenReturn((Object)this.mTestStatus);
        Mockito.when((Object)this.mMockFileSystemContext.getCachedWorkers()).thenReturn(blockWorkers);
        Mockito.when((Object)this.mMockBlockStore.getInStream(ArgumentMatchers.anyLong(), (InStreamOptions)ArgumentMatchers.any(InStreamOptions.class))).thenReturn((Object)mockInStream);
        Mockito.when((Object)this.mMockBlockStore.getInStream((BlockInfo)ArgumentMatchers.any(BlockInfo.class), (InStreamOptions)ArgumentMatchers.any(InStreamOptions.class), (Map)ArgumentMatchers.any(Map.class))).thenReturn((Object)mockInStream);
        Mockito.when((Object)this.mMockBlockStore.getOutStream(ArgumentMatchers.eq((long)1L), ArgumentMatchers.eq((long)512L), (WorkerNetAddress)ArgumentMatchers.eq((Object)LOCAL_ADDRESS), (OutStreamOptions)ArgumentMatchers.any(OutStreamOptions.class))).thenReturn((Object)mockOutStream);
        Mockito.when((Object)this.mMockBlockStore.getInfo(1L)).thenReturn((Object)this.mTestBlockInfo.setLocations((List)Lists.newArrayList((Object[])new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)})));
        SetReplicaConfig config = new SetReplicaConfig(TEST_PATH, 1L, 1);
        SetReplicaDefinition definition = new SetReplicaDefinition();
        definition.runTask(config, new SetReplicaTask(Mode.REPLICATE), new RunTaskContext(1L, 1L, this.mMockJobServerContext));
    }

    @Test
    public void selectExecutorsOnlyOneWorkerAvailable() throws Exception {
        this.mTestBlockInfo.setLocations((List)Lists.newArrayList());
        Set<Pair<WorkerInfo, SetReplicaTask>> result = this.selectExecutorsTestHelper(2, Lists.newArrayList((Object[])new WorkerInfo[]{WORKER_INFO_1}));
        HashSet expected = Sets.newHashSet();
        expected.add(new Pair((Object)WORKER_INFO_1, (Object)new SetReplicaTask(Mode.REPLICATE)));
        Assert.assertEquals((Object)expected, result);
    }

    @Test
    public void selectExecutorsOnlyOneWorkerValid() throws Exception {
        this.mTestBlockInfo.setLocations((List)Lists.newArrayList((Object[])new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)}));
        Set<Pair<WorkerInfo, SetReplicaTask>> result = this.selectExecutorsTestHelper(2, Lists.newArrayList((Object[])new WorkerInfo[]{WORKER_INFO_1, WORKER_INFO_2}));
        HashSet expected = Sets.newHashSet();
        expected.add(new Pair((Object)WORKER_INFO_2, (Object)new SetReplicaTask(Mode.REPLICATE)));
        Assert.assertEquals((Object)expected, result);
    }

    @Test
    public void selectExecutorsTwoWorkersValid() throws Exception {
        this.mTestBlockInfo.setLocations((List)Lists.newArrayList((Object[])new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)}));
        Set<Pair<WorkerInfo, SetReplicaTask>> result = this.selectExecutorsTestHelper(3, Lists.newArrayList((Object[])new WorkerInfo[]{WORKER_INFO_1, WORKER_INFO_2, WORKER_INFO_3}));
        HashSet expected = Sets.newHashSet();
        expected.add(new Pair((Object)WORKER_INFO_2, (Object)new SetReplicaTask(Mode.REPLICATE)));
        expected.add(new Pair((Object)WORKER_INFO_3, (Object)new SetReplicaTask(Mode.REPLICATE)));
        Assert.assertEquals((Object)expected, result);
    }

    @Test
    public void selectExecutorsOneOutOFTwoWorkersValid() throws Exception {
        this.mTestBlockInfo.setLocations((List)Lists.newArrayList((Object[])new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)}));
        Set<Pair<WorkerInfo, SetReplicaTask>> result = this.selectExecutorsTestHelper(2, Lists.newArrayList((Object[])new WorkerInfo[]{WORKER_INFO_1, WORKER_INFO_2, WORKER_INFO_3}));
        Assert.assertEquals((long)1L, (long)result.size());
        Assert.assertEquals((Object)new SetReplicaTask(Mode.REPLICATE), (Object)result.iterator().next().getSecond());
    }

    @Test
    public void selectExecutorsNoWorkerValid() throws Exception {
        this.mTestBlockInfo.setLocations((List)Lists.newArrayList((Object[])new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)}));
        Set<Pair<WorkerInfo, SetReplicaTask>> result = this.selectExecutorsTestHelper(2, Lists.newArrayList((Object[])new WorkerInfo[]{WORKER_INFO_1}));
        ImmutableSet expected = ImmutableSet.of();
        Assert.assertEquals((Object)expected, result);
    }

    @Test
    public void selectExecutorsInsufficientWorkerValid() throws Exception {
        this.mTestBlockInfo.setLocations((List)Lists.newArrayList((Object[])new BlockLocation[]{new BlockLocation().setWorkerAddress(ADDRESS_1)}));
        Set<Pair<WorkerInfo, SetReplicaTask>> result = this.selectExecutorsTestHelper(3, Lists.newArrayList((Object[])new WorkerInfo[]{WORKER_INFO_1, WORKER_INFO_2}));
        HashSet expected = Sets.newHashSet();
        expected.add(new Pair((Object)WORKER_INFO_2, (Object)new SetReplicaTask(Mode.REPLICATE)));
        Assert.assertEquals((Object)expected, result);
    }

    @Test
    public void runTaskNoBlockWorker() throws Exception {
        byte[] input = BufferUtils.getIncreasingByteArray((int)0, (int)512);
        TestBlockInStream mockInStream = new TestBlockInStream(input, 1L, (long)input.length, BlockInStream.BlockInStreamSource.NODE_LOCAL);
        TestBlockOutStream mockOutStream = new TestBlockOutStream(ByteBuffer.allocate(1000), 512L);
        this.mThrown.expect(NotFoundException.class);
        this.mThrown.expectMessage(ExceptionMessage.NO_LOCAL_BLOCK_WORKER_LOAD_TASK.getMessage(new Object[]{1L}));
        this.runTaskReplicateTestHelper(Lists.newArrayList(), (BlockInStream)mockInStream, (BlockOutStream)mockOutStream);
    }

    @Test
    public void runTaskLocalBlockWorkerDifferentFileStatus() throws Exception {
        for (boolean persisted : new boolean[]{true, false}) {
            for (boolean pinned : new boolean[]{true, false}) {
                this.mTestStatus.getFileInfo().setPersisted(persisted).setPinned(pinned).setMediumTypes(pinned ? Sets.newHashSet((Object[])new String[]{"MEM"}) : Collections.emptySet());
                byte[] input = BufferUtils.getIncreasingByteArray((int)0, (int)512);
                TestBlockInStream mockInStream = new TestBlockInStream(input, 1L, (long)input.length, BlockInStream.BlockInStreamSource.NODE_LOCAL);
                TestBlockOutStream mockOutStream = new TestBlockOutStream(ByteBuffer.allocate(1000), 512L);
                BlockWorkerInfo localBlockWorker = new BlockWorkerInfo(LOCAL_ADDRESS, 512L, 0L);
                try (MockedStatic mockBlockInStream = Mockito.mockStatic(BlockInStream.class);){
                    mockBlockInStream.when(() -> BlockInStream.create((FileSystemContext)((FileSystemContext)ArgumentMatchers.any(FileSystemContext.class)), (BlockInfo)((BlockInfo)ArgumentMatchers.any(BlockInfo.class)), (WorkerNetAddress)((WorkerNetAddress)ArgumentMatchers.any(WorkerNetAddress.class)), (BlockInStream.BlockInStreamSource)((BlockInStream.BlockInStreamSource)ArgumentMatchers.any(BlockInStream.BlockInStreamSource.class)), (InStreamOptions)((InStreamOptions)ArgumentMatchers.any(InStreamOptions.class)))).thenReturn((Object)mockInStream);
                    this.runTaskReplicateTestHelper(Lists.newArrayList((Object[])new BlockWorkerInfo[]{localBlockWorker}), (BlockInStream)mockInStream, (BlockOutStream)mockOutStream);
                    Assert.assertEquals((long)512L, (long)mockInStream.getBytesRead());
                    if (persisted && !pinned) continue;
                    Assert.assertArrayEquals((String)String.format("input-output mismatched: pinned=%s, persisted=%s", pinned, persisted), (byte[])input, (byte[])mockOutStream.getWrittenData());
                }
            }
        }
    }

    @Test
    public void runTaskInputIOException() throws Exception {
        this.mTestStatus.getFileInfo().setPinned(true);
        this.mTestStatus.getFileInfo().setMediumTypes((Set)Sets.newHashSet((Object[])new String[]{"MEM"}));
        BlockInStream mockInStream = (BlockInStream)Mockito.mock(BlockInStream.class);
        BlockOutStream mockOutStream = (BlockOutStream)Mockito.mock(BlockOutStream.class);
        BlockWorkerInfo localBlockWorker = new BlockWorkerInfo(LOCAL_ADDRESS, 512L, 0L);
        ((BlockInStream)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("test")}).when((Object)mockInStream)).read((byte[])ArgumentMatchers.any(byte[].class), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
        ((BlockInStream)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("test")}).when((Object)mockInStream)).read((byte[])ArgumentMatchers.any(byte[].class));
        try {
            this.runTaskReplicateTestHelper(Lists.newArrayList((Object[])new BlockWorkerInfo[]{localBlockWorker}), mockInStream, mockOutStream);
            Assert.fail((String)"Expected the task to throw and IOException");
        }
        catch (IOException e) {
            Assert.assertEquals((Object)"test", (Object)e.getMessage());
        }
        ((BlockOutStream)Mockito.verify((Object)mockOutStream)).cancel();
    }
}

