/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.worker.shuffle;

import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.io.ByteSource;
import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.batch.parallel.GenericPartitionStat;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.indexing.worker.shuffle.IntermediaryDataManager;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.loading.StorageLocation;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.ReadablePeriod;

@ManageLifecycle
public class LocalIntermediaryDataManager
implements IntermediaryDataManager {
    private static final Logger LOG = new Logger(LocalIntermediaryDataManager.class);
    private final long intermediaryPartitionDiscoveryPeriodSec;
    private final long intermediaryPartitionCleanupPeriodSec;
    private final Period intermediaryPartitionTimeout;
    private final TaskConfig taskConfig;
    private final List<StorageLocation> shuffleDataLocations;
    private final OverlordClient overlordClient;
    private final ConcurrentHashMap<String, DateTime> supervisorTaskCheckTimes = new ConcurrentHashMap();
    private final Map<String, Iterator<StorageLocation>> locationIterators = new HashMap<String, Iterator<StorageLocation>>();
    private @MonotonicNonNull ScheduledExecutorService supervisorTaskChecker;

    @Inject
    public LocalIntermediaryDataManager(WorkerConfig workerConfig, TaskConfig taskConfig, OverlordClient overlordClient) {
        this.intermediaryPartitionDiscoveryPeriodSec = workerConfig.getIntermediaryPartitionDiscoveryPeriodSec();
        this.intermediaryPartitionCleanupPeriodSec = workerConfig.getIntermediaryPartitionCleanupPeriodSec();
        this.intermediaryPartitionTimeout = workerConfig.getIntermediaryPartitionTimeout();
        this.taskConfig = taskConfig;
        this.shuffleDataLocations = taskConfig.getShuffleDataLocations().stream().map(config -> new StorageLocation(config.getPath(), config.getMaxSize(), config.getFreeSpacePercent())).collect(Collectors.toList());
        this.overlordClient = overlordClient;
    }

    @Override
    @LifecycleStart
    public void start() {
        this.discoverSupervisorTaskPartitions();
        this.supervisorTaskChecker = Execs.scheduledSingleThreaded((String)"intermediary-data-manager-%d");
        this.supervisorTaskChecker.scheduleAtFixedRate(() -> {
            try {
                this.discoverSupervisorTaskPartitions();
            }
            catch (Exception e) {
                LOG.warn((Throwable)e, "Error while discovering supervisorTasks", new Object[0]);
            }
        }, this.intermediaryPartitionDiscoveryPeriodSec, this.intermediaryPartitionDiscoveryPeriodSec, TimeUnit.SECONDS);
        this.supervisorTaskChecker.scheduleAtFixedRate(() -> {
            try {
                this.deleteExpiredSupervisorTaskPartitionsIfNotRunning();
            }
            catch (Exception e) {
                LOG.warn((Throwable)e, "Error while cleaning up partitions for expired supervisors", new Object[0]);
            }
        }, this.intermediaryPartitionCleanupPeriodSec, this.intermediaryPartitionCleanupPeriodSec, TimeUnit.SECONDS);
    }

    @Override
    @LifecycleStop
    public void stop() {
        if (this.supervisorTaskChecker != null) {
            this.supervisorTaskChecker.shutdownNow();
            try {
                this.supervisorTaskChecker.awaitTermination(10L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                Throwables.propagate((Throwable)e);
            }
        }
        this.supervisorTaskCheckTimes.clear();
    }

    private void discoverSupervisorTaskPartitions() {
        for (StorageLocation location : this.shuffleDataLocations) {
            Path locationPath = location.getPath().toPath().toAbsolutePath();
            MutableInt numDiscovered = new MutableInt(0);
            File[] dirsPerSupervisorTask = location.getPath().listFiles();
            if (dirsPerSupervisorTask != null) {
                for (File supervisorTaskDir : dirsPerSupervisorTask) {
                    String supervisorTaskId = supervisorTaskDir.getName();
                    this.supervisorTaskCheckTimes.computeIfAbsent(supervisorTaskId, k -> {
                        for (File eachFile : FileUtils.listFiles((File)supervisorTaskDir, null, (boolean)true)) {
                            String relativeSegmentPath = locationPath.relativize(eachFile.toPath().toAbsolutePath()).toString();
                            File reservedFile = location.reserve(relativeSegmentPath, eachFile.getName(), eachFile.length());
                            if (reservedFile != null) continue;
                            LOG.warn("Can't add a discovered partition[%s]", new Object[]{eachFile.getAbsolutePath()});
                        }
                        numDiscovered.increment();
                        return this.getExpiryTimeFromNow();
                    });
                }
            }
            if (numDiscovered.getValue() <= 0) continue;
            LOG.info("Discovered partitions for [%s] new supervisor tasks under location[%s]", new Object[]{numDiscovered.getValue(), location.getPath()});
        }
    }

    private void deleteExpiredSupervisorTaskPartitionsIfNotRunning() {
        HashSet<String> expiredSupervisorTasks = new HashSet<String>();
        for (Map.Entry<String, DateTime> entry : this.supervisorTaskCheckTimes.entrySet()) {
            String supervisorTaskId = entry.getKey();
            DateTime checkTime = entry.getValue();
            if (!checkTime.isBeforeNow()) continue;
            expiredSupervisorTasks.add(supervisorTaskId);
        }
        if (!expiredSupervisorTasks.isEmpty()) {
            LOG.info("Found [%s] expired supervisor tasks", new Object[]{expiredSupervisorTasks.size()});
        }
        if (!expiredSupervisorTasks.isEmpty()) {
            Map taskStatuses = (Map)FutureUtils.getUnchecked((ListenableFuture)this.overlordClient.taskStatuses(expiredSupervisorTasks), (boolean)true);
            for (Map.Entry entry : taskStatuses.entrySet()) {
                String supervisorTaskId = (String)entry.getKey();
                TaskStatus status = (TaskStatus)entry.getValue();
                if (status.getStatusCode().isComplete()) {
                    try {
                        this.deletePartitions(supervisorTaskId);
                    }
                    catch (IOException e) {
                        LOG.warn((Throwable)e, "Failed to delete partitions for task[%s]", new Object[]{supervisorTaskId});
                    }
                    continue;
                }
                this.supervisorTaskCheckTimes.put(supervisorTaskId, this.getExpiryTimeFromNow());
            }
        }
    }

    /*
     * Exception decompiling
     */
    @Override
    public DataSegment addSegment(String supervisorTaskId, String subTaskId, DataSegment segment, File segmentDir) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [7[CATCHBLOCK], 12[FORLOOP]], but top level block is 4[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @Override
    public Optional<ByteSource> findPartitionFile(String supervisorTaskId, String subTaskId, Interval interval, int bucketId) {
        IdUtils.validateId((String)"supervisorTaskId", (String)supervisorTaskId);
        IdUtils.validateId((String)"subTaskId", (String)subTaskId);
        for (StorageLocation location : this.shuffleDataLocations) {
            File partitionDir = new File(location.getPath(), this.getPartitionDirPath(supervisorTaskId, interval, bucketId));
            if (!partitionDir.exists()) continue;
            this.supervisorTaskCheckTimes.put(supervisorTaskId, this.getExpiryTimeFromNow());
            File segmentFile = new File(partitionDir, subTaskId);
            if (segmentFile.exists()) {
                return Optional.of(Files.asByteSource((File)segmentFile));
            }
            return Optional.empty();
        }
        return Optional.empty();
    }

    @Override
    public GenericPartitionStat generatePartitionStat(TaskToolbox toolbox, DataSegment segment) {
        return new GenericPartitionStat(toolbox.getTaskExecutorNode().getHost(), toolbox.getTaskExecutorNode().getPortToUse(), toolbox.getTaskExecutorNode().isEnableTlsPort(), segment.getInterval(), (BucketNumberedShardSpec)segment.getShardSpec(), null, null);
    }

    private DateTime getExpiryTimeFromNow() {
        return DateTimes.nowUtc().plus((ReadablePeriod)this.intermediaryPartitionTimeout);
    }

    @Override
    public void deletePartitions(String supervisorTaskId) throws IOException {
        IdUtils.validateId((String)"supervisorTaskId", (String)supervisorTaskId);
        for (StorageLocation location : this.shuffleDataLocations) {
            File supervisorTaskPath = new File(location.getPath(), supervisorTaskId);
            if (!supervisorTaskPath.exists()) continue;
            LOG.info("Cleaning up [%s]", new Object[]{supervisorTaskPath});
            for (File eachFile : FileUtils.listFiles((File)supervisorTaskPath, null, (boolean)true)) {
                location.removeFile(eachFile);
            }
            FileUtils.forceDelete((File)supervisorTaskPath);
        }
        this.supervisorTaskCheckTimes.remove(supervisorTaskId);
    }

    private static /* synthetic */ Long lambda$addSegment$7(File tempZippedFile, OutputStream out) throws IOException {
        return Files.asByteSource((File)tempZippedFile).copyTo(out);
    }

    private static /* synthetic */ void lambda$addSegment$6(File taskTempDir) throws IOException {
        try {
            FileUtils.forceDelete((File)taskTempDir);
        }
        catch (IOException e) {
            LOG.warn((Throwable)e, "Failed to delete directory[%s]", new Object[]{taskTempDir.getAbsolutePath()});
        }
    }

    private /* synthetic */ Iterator lambda$addSegment$5(String k) {
        Iterator cyclicIterator = Iterators.cycle(this.shuffleDataLocations);
        int random = ThreadLocalRandom.current().nextInt(this.shuffleDataLocations.size());
        IntStream.range(0, random).forEach(i -> {
            StorageLocation cfr_ignored_0 = (StorageLocation)cyclicIterator.next();
        });
        return cyclicIterator;
    }
}

