/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.comet.execution.shuffle;

import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import org.apache.comet.CometConf$;
import org.apache.comet.shaded.guava.io.Closeables;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.internal.config.package$;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
import org.apache.spark.shuffle.api.ShufflePartitionWriter;
import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
import org.apache.spark.shuffle.comet.CometShuffleChecksumSupport;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.comet.execution.shuffle.CometBypassMergeSortShuffleHandle;
import org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter;
import org.apache.spark.sql.comet.execution.shuffle.CometShuffleDependency;
import org.apache.spark.sql.comet.execution.shuffle.ShuffleThreadPool;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.FileSegment;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.None$;
import scala.Option;
import scala.Product2;
import scala.Tuple2;
import scala.collection.Iterator;

final class CometBypassMergeSortShuffleWriter<K, V>
extends ShuffleWriter<K, V>
implements CometShuffleChecksumSupport {
    private static final Logger logger = LoggerFactory.getLogger(CometBypassMergeSortShuffleWriter.class);
    private final int fileBufferSize;
    private final boolean transferToEnabled;
    private final int numPartitions;
    private final BlockManager blockManager;
    private final TaskMemoryManager memoryManager;
    private final TaskContext taskContext;
    private final SerializerInstance serializer;
    private final Partitioner partitioner;
    private final ShuffleWriteMetricsReporter writeMetrics;
    private final int shuffleId;
    private final long mapId;
    private final ShuffleExecutorComponents shuffleExecutorComponents;
    private final StructType schema;
    private CometDiskBlockWriter[] partitionWriters;
    private FileSegment[] partitionWriterSegments;
    private MapStatus mapStatus;
    private long[] partitionLengths;
    private final long[] partitionChecksums;
    private final boolean isAsync;
    private final int asyncThreadNum;
    private final ExecutorService threadPool;
    private boolean stopping = false;
    private final SparkConf conf;

    CometBypassMergeSortShuffleWriter(BlockManager blockManager, TaskMemoryManager memoryManager, TaskContext taskContext, CometBypassMergeSortShuffleHandle<K, V> handle, long mapId, SparkConf conf, ShuffleWriteMetricsReporter writeMetrics, ShuffleExecutorComponents shuffleExecutorComponents) {
        this.fileBufferSize = (int)((Long)conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE())).longValue() * 1024;
        this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
        this.conf = conf;
        this.blockManager = blockManager;
        this.memoryManager = memoryManager;
        this.taskContext = taskContext;
        ShuffleDependency dep = handle.dependency();
        this.mapId = mapId;
        this.serializer = dep.serializer().newInstance();
        this.shuffleId = dep.shuffleId();
        this.partitioner = dep.partitioner();
        this.numPartitions = this.partitioner.numPartitions();
        this.writeMetrics = writeMetrics;
        this.shuffleExecutorComponents = shuffleExecutorComponents;
        this.schema = (StructType)((CometShuffleDependency)dep).schema().get();
        this.partitionChecksums = this.createPartitionChecksums(this.numPartitions, conf);
        this.isAsync = (Boolean)CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED().get();
        this.asyncThreadNum = (Integer)CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_ASYNC_THREAD_NUM().get();
        if (this.isAsync) {
            logger.info("Async shuffle writer enabled");
            this.threadPool = ShuffleThreadPool.getThreadPool();
        } else {
            logger.info("Async shuffle writer disabled");
            this.threadPool = null;
        }
    }

    public void write(Iterator<Product2<K, V>> records) throws IOException {
        assert (this.partitionWriters == null);
        ShuffleMapOutputWriter mapOutputWriter = this.shuffleExecutorComponents.createMapOutputWriter(this.shuffleId, this.mapId, this.numPartitions);
        try {
            if (!records.hasNext()) {
                this.partitionLengths = mapOutputWriter.commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();
                this.mapStatus = MapStatus$.MODULE$.apply(this.blockManager.shuffleServerId(), this.partitionLengths, this.mapId);
                return;
            }
            long openStartTime = System.nanoTime();
            this.partitionWriters = new CometDiskBlockWriter[this.numPartitions];
            this.partitionWriterSegments = new FileSegment[this.numPartitions];
            String checksumAlgorithm = this.getChecksumAlgorithm(this.conf);
            for (int i = 0; i < this.numPartitions; ++i) {
                Tuple2 tempShuffleBlockIdPlusFile = this.blockManager.diskBlockManager().createTempShuffleBlock();
                File file = (File)tempShuffleBlockIdPlusFile._2();
                CometDiskBlockWriter writer = new CometDiskBlockWriter(file, this.memoryManager, this.taskContext, this.serializer, this.schema, this.writeMetrics, this.conf, this.isAsync, this.asyncThreadNum, this.threadPool);
                if (this.partitionChecksums.length > 0) {
                    writer.setChecksum(this.partitionChecksums[i]);
                    writer.setChecksumAlgo(checksumAlgorithm);
                }
                this.partitionWriters[i] = writer;
            }
            this.writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
            long outputRows = 0L;
            while (records.hasNext()) {
                ++outputRows;
                Product2 record = (Product2)records.next();
                Object key = record._1();
                int partition_id = this.partitioner.getPartition(key);
                this.partitionWriters[this.partitioner.getPartition(key)].insertRow((UnsafeRow)record._2(), partition_id);
            }
            long spillRecords = 0L;
            for (int i = 0; i < this.numPartitions; ++i) {
                CometDiskBlockWriter writer = this.partitionWriters[i];
                this.partitionWriterSegments[i] = writer.close();
                spillRecords += writer.getOutputRecords();
            }
            if (outputRows != spillRecords) {
                throw new RuntimeException("outputRows(" + outputRows + ") != spillRecords(" + spillRecords + "). Please file a bug report.");
            }
            this.partitionLengths = this.writePartitionedData(mapOutputWriter);
            this.mapStatus = MapStatus$.MODULE$.apply(this.blockManager.shuffleServerId(), this.partitionLengths, this.mapId);
        }
        catch (Exception e) {
            try {
                mapOutputWriter.abort((Throwable)e);
            }
            catch (Exception e2) {
                logger.error("Failed to abort the writer after failing to write map output.", (Throwable)e2);
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    public long[] getPartitionLengths() {
        return this.partitionLengths;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
        if (this.partitionWriters != null) {
            int i;
            long writeStartTime = System.nanoTime();
            boolean encryptionEnabled = this.blockManager.serializerManager().encryptionEnabled();
            for (i = 0; i < this.partitionChecksums.length; ++i) {
                this.partitionChecksums[i] = this.partitionWriters[i].getChecksum();
            }
            try {
                for (i = 0; i < this.numPartitions; ++i) {
                    File file = this.partitionWriterSegments[i].file();
                    ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
                    if (!file.exists()) continue;
                    if (this.transferToEnabled && !encryptionEnabled) {
                        Optional maybeOutputChannel = writer.openChannelWrapper();
                        if (maybeOutputChannel.isPresent()) {
                            this.writePartitionedDataWithChannel(file, (WritableByteChannelWrapper)maybeOutputChannel.get());
                        } else {
                            this.writePartitionedDataWithStream(file, writer);
                        }
                    } else {
                        this.writePartitionedDataWithStream(file, writer);
                    }
                    if (file.delete()) continue;
                    logger.error("Unable to delete file for partition {}", (Object)i);
                }
            }
            finally {
                this.writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
            }
            this.partitionWriters = null;
        }
        return mapOutputWriter.commitAllPartitions(this.partitionChecksums).getPartitionLengths();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writePartitionedDataWithChannel(File file, WritableByteChannelWrapper outputChannel) throws IOException {
        boolean copyThrewException = true;
        try {
            FileInputStream in = new FileInputStream(file);
            try (FileChannel inputChannel = in.getChannel();){
                Utils.copyFileStreamNIO((FileChannel)inputChannel, (WritableByteChannel)outputChannel.channel(), (long)0L, (long)inputChannel.size());
                copyThrewException = false;
            }
            finally {
                Closeables.close(in, copyThrewException);
            }
        }
        finally {
            Closeables.close((Closeable)outputChannel, copyThrewException);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writePartitionedDataWithStream(File file, ShufflePartitionWriter writer) throws IOException {
        boolean copyThrewException = true;
        FileInputStream in = new FileInputStream(file);
        try {
            OutputStream outputStream = this.blockManager.serializerManager().wrapForEncryption(writer.openStream());
            try {
                Utils.copyStream((InputStream)in, (OutputStream)outputStream, (boolean)false, (boolean)false);
                copyThrewException = false;
            }
            finally {
                Closeables.close(outputStream, copyThrewException);
            }
        }
        finally {
            Closeables.close(in, copyThrewException);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Option<MapStatus> stop(boolean success) {
        if (this.stopping) {
            return None$.empty();
        }
        this.stopping = true;
        if (success) {
            if (this.mapStatus == null) {
                throw new IllegalStateException("Cannot call stop(true) without having called write()");
            }
            return Option.apply((Object)this.mapStatus);
        }
        if (this.partitionWriters != null) {
            try {
                for (CometDiskBlockWriter writer : this.partitionWriters) {
                    writer.freeMemory();
                    File file = writer.getFile();
                    if (file.delete()) continue;
                    logger.error("Error while deleting file {}", (Object)file.getAbsolutePath());
                }
            }
            finally {
                this.partitionWriters = null;
            }
        }
        return None$.empty();
    }
}

