/*
 * Decompiled with CFR 0.152.
 */
package com.lancedb.lance.spark.write;

import com.lancedb.lance.FragmentMetadata;
import com.lancedb.lance.WriteParams;
import com.lancedb.lance.spark.LanceConfig;
import com.lancedb.lance.spark.SparkOptions;
import com.lancedb.lance.spark.internal.LanceDatasetAdapter;
import com.lancedb.lance.spark.write.LanceArrowWriter;
import com.lancedb.lance.spark.write.LanceBatchWrite;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;

public class LanceDataWriter
implements DataWriter<InternalRow> {
    private LanceArrowWriter arrowWriter;
    private FutureTask<List<FragmentMetadata>> fragmentCreationTask;
    private Thread fragmentCreationThread;

    public LanceDataWriter(LanceArrowWriter arrowWriter, FutureTask<List<FragmentMetadata>> fragmentCreationTask, Thread fragmentCreationThread) {
        this.arrowWriter = arrowWriter;
        this.fragmentCreationThread = fragmentCreationThread;
        this.fragmentCreationTask = fragmentCreationTask;
    }

    public void write(InternalRow record) throws IOException {
        this.arrowWriter.write(record);
    }

    public WriterCommitMessage commit() throws IOException {
        this.arrowWriter.setFinished();
        try {
            List<FragmentMetadata> fragmentMetadata = this.fragmentCreationTask.get();
            return new LanceBatchWrite.TaskCommit(fragmentMetadata);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Interrupted while waiting for reader thread to finish", e);
        }
        catch (ExecutionException e) {
            throw new IOException("Exception in reader thread", e);
        }
    }

    public void abort() throws IOException {
        this.fragmentCreationThread.interrupt();
        try {
            this.fragmentCreationTask.get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new IOException("Failed to abort the reader thread", e);
        }
        this.close();
    }

    public void close() throws IOException {
        this.arrowWriter.close();
    }

    public static class WriterFactory
    implements DataWriterFactory {
        private final LanceConfig config;
        private final StructType schema;

        protected WriterFactory(StructType schema, LanceConfig config) {
            this.schema = schema;
            this.config = config;
        }

        public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
            int batch_size = SparkOptions.getBatchSize(this.config);
            LanceArrowWriter arrowWriter = LanceDatasetAdapter.getArrowWriter(this.schema, batch_size);
            WriteParams params = SparkOptions.genWriteParamsFromConfig(this.config);
            Callable<List> fragmentCreator = () -> LanceDatasetAdapter.createFragment(this.config.getDatasetUri(), arrowWriter, params);
            FutureTask<List<FragmentMetadata>> fragmentCreationTask = new FutureTask<List<FragmentMetadata>>(fragmentCreator);
            Thread fragmentCreationThread = new Thread(fragmentCreationTask);
            fragmentCreationThread.start();
            return new LanceDataWriter(arrowWriter, fragmentCreationTask, fragmentCreationThread);
        }
    }
}

