/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.worker.shuffle;

import com.fasterxml.jackson.databind.AnnotationIntrospector;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteSource;
import com.google.common.io.FileWriteMode;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.google.inject.Injector;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import org.apache.druid.client.indexing.NoopOverlordClient;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.LocalIntermediaryDataManager;
import org.apache.druid.indexing.worker.shuffle.ShuffleDataSegmentPusher;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.LoadSpec;
import org.apache.druid.segment.loading.LocalDataSegmentPuller;
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.LocalLoadSpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.utils.CompressionUtils;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;

@RunWith(value=Parameterized.class)
public class ShuffleDataSegmentPusherTest {
    private static final String LOCAL = "local";
    private static final String DEEPSTORE = "deepstore";
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();
    private IntermediaryDataManager intermediaryDataManager;
    private ShuffleDataSegmentPusher segmentPusher;
    private ObjectMapper mapper;
    private final String intermediateDataStore;
    private File localDeepStore;

    @Parameterized.Parameters(name="intermediateDataManager={0}")
    public static Collection<Object[]> data() {
        return ImmutableList.of((Object)new Object[]{LOCAL}, (Object)new Object[]{DEEPSTORE});
    }

    public ShuffleDataSegmentPusherTest(String intermediateDataStore) {
        this.intermediateDataStore = intermediateDataStore;
    }

    @Before
    public void setup() throws IOException {
        WorkerConfig workerConfig = new WorkerConfig();
        TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, (List)ImmutableList.of((Object)new StorageLocationConfig(this.temporaryFolder.newFolder(), null, null)), false, false, TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(), null);
        NoopOverlordClient overlordClient = new NoopOverlordClient();
        if (LOCAL.equals(this.intermediateDataStore)) {
            this.intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, taskConfig, (OverlordClient)overlordClient);
        } else if (DEEPSTORE.equals(this.intermediateDataStore)) {
            this.localDeepStore = this.temporaryFolder.newFolder("localStorage");
            this.intermediaryDataManager = new DeepStorageIntermediaryDataManager((DataSegmentPusher)new LocalDataSegmentPusher(new LocalDataSegmentPusherConfig(){

                public File getStorageDirectory() {
                    return ShuffleDataSegmentPusherTest.this.localDeepStore;
                }
            }));
        }
        this.intermediaryDataManager.start();
        this.segmentPusher = new ShuffleDataSegmentPusher("supervisorTaskId", "subTaskId", this.intermediaryDataManager);
        Injector injector = GuiceInjectors.makeStartupInjectorWithModules((Iterable)ImmutableList.of(binder -> binder.bind(LocalDataSegmentPuller.class)));
        this.mapper = new DefaultObjectMapper();
        this.mapper.registerModule((Module)new SimpleModule("loadSpecTest").registerSubtypes(new Class[]{LocalLoadSpec.class}));
        this.mapper.setInjectableValues((InjectableValues)new GuiceInjectableValues(injector));
        GuiceAnnotationIntrospector guiceIntrospector = new GuiceAnnotationIntrospector();
        this.mapper.setAnnotationIntrospectors((AnnotationIntrospector)new AnnotationIntrospectorPair((AnnotationIntrospector)guiceIntrospector, this.mapper.getSerializationConfig().getAnnotationIntrospector()), (AnnotationIntrospector)new AnnotationIntrospectorPair((AnnotationIntrospector)guiceIntrospector, this.mapper.getDeserializationConfig().getAnnotationIntrospector()));
    }

    @After
    public void teardown() {
        this.intermediaryDataManager.stop();
    }

    @Test
    public void testPush() throws IOException, SegmentLoadingException {
        File segmentDir = this.generateSegmentDir();
        DataSegment segment = this.newSegment(Intervals.of((String)"2018/2019"));
        DataSegment pushed = this.segmentPusher.push(segmentDir, segment, true);
        Assert.assertEquals((long)9L, (long)pushed.getBinaryVersion().intValue());
        Assert.assertEquals((long)14L, (long)pushed.getSize());
        File tempDir = this.temporaryFolder.newFolder();
        if (this.intermediaryDataManager instanceof LocalIntermediaryDataManager) {
            Optional zippedSegment = this.intermediaryDataManager.findPartitionFile("supervisorTaskId", "subTaskId", segment.getInterval(), segment.getShardSpec().getPartitionNum());
            Assert.assertTrue((boolean)zippedSegment.isPresent());
            CompressionUtils.unzip((ByteSource)((ByteSource)zippedSegment.get()), (File)tempDir, (Predicate)FileUtils.IS_EXCEPTION, (boolean)false);
        } else if (this.intermediaryDataManager instanceof DeepStorageIntermediaryDataManager) {
            LoadSpec loadSpec = (LoadSpec)this.mapper.convertValue((Object)pushed.getLoadSpec(), LoadSpec.class);
            Assert.assertTrue((boolean)pushed.getLoadSpec().get("path").toString().startsWith(this.localDeepStore.getAbsolutePath() + "/" + "shuffle-data"));
            loadSpec.loadSegment(tempDir);
        }
        List<File> unzippedFiles = Arrays.asList(tempDir.listFiles());
        unzippedFiles.sort(Comparator.comparing(File::getName));
        File dataFile = unzippedFiles.get(0);
        Assert.assertEquals((Object)"test", (Object)dataFile.getName());
        Assert.assertEquals((Object)"test data.", (Object)Files.readFirstLine((File)dataFile, (Charset)StandardCharsets.UTF_8));
        File versionFile = unzippedFiles.get(1);
        Assert.assertEquals((Object)"version.bin", (Object)versionFile.getName());
        Assert.assertArrayEquals((byte[])Ints.toByteArray((int)9), (byte[])Files.toByteArray((File)versionFile));
    }

    private File generateSegmentDir() throws IOException {
        File segmentDir = this.temporaryFolder.newFolder();
        Files.asByteSink((File)new File(segmentDir, "version.bin"), (FileWriteMode[])new FileWriteMode[0]).write(Ints.toByteArray((int)9));
        org.apache.commons.io.FileUtils.write((File)new File(segmentDir, "test"), (CharSequence)"test data.", (Charset)StandardCharsets.UTF_8);
        return segmentDir;
    }

    private DataSegment newSegment(Interval interval) {
        BucketNumberedShardSpec shardSpec = (BucketNumberedShardSpec)Mockito.mock(BucketNumberedShardSpec.class);
        Mockito.when((Object)shardSpec.getBucketId()).thenReturn((Object)0);
        return new DataSegment("dataSource", interval, "version", null, null, null, (ShardSpec)shardSpec, Integer.valueOf(9), 0L);
    }
}

