/*
 * Decompiled with CFR 0.152.
 */
package com.lancedb.lance.spark.write;

import com.google.common.collect.ImmutableList;
import com.lancedb.lance.FragmentMetadata;
import com.lancedb.lance.RowAddress;
import com.lancedb.lance.WriteParams;
import com.lancedb.lance.spark.LanceConfig;
import com.lancedb.lance.spark.SparkOptions;
import com.lancedb.lance.spark.internal.LanceDatasetAdapter;
import com.lancedb.lance.spark.write.LanceArrowWriter;
import com.lancedb.lance.spark.write.LanceBatchWrite;
import com.lancedb.lance.spark.write.LanceDataWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.NullOrdering;
import org.apache.spark.sql.connector.expressions.SortDirection;
import org.apache.spark.sql.connector.expressions.SortOrder;
import org.apache.spark.sql.connector.expressions.SortValue;
import org.apache.spark.sql.connector.write.DeltaBatchWrite;
import org.apache.spark.sql.connector.write.DeltaWrite;
import org.apache.spark.sql.connector.write.DeltaWriter;
import org.apache.spark.sql.connector.write.DeltaWriterFactory;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import org.roaringbitmap.RoaringBitmap;

public class SparkPositionDeltaWrite
implements DeltaWrite,
RequiresDistributionAndOrdering {
    private final StructType sparkSchema;
    private final LanceConfig config;

    public SparkPositionDeltaWrite(StructType sparkSchema, LanceConfig config) {
        this.sparkSchema = sparkSchema;
        this.config = config;
    }

    public Distribution requiredDistribution() {
        NamedReference segmentId = Expressions.column((String)"_fragid");
        return Distributions.clustered((Expression[])new NamedReference[]{segmentId});
    }

    public SortOrder[] requiredOrdering() {
        NamedReference segmentId = Expressions.column((String)"_rowaddr");
        SortValue sortValue = new SortValue((Expression)segmentId, SortDirection.ASCENDING, NullOrdering.NULLS_FIRST);
        return new SortValue[]{sortValue};
    }

    public DeltaBatchWrite toBatch() {
        return new PositionDeltaBatchWrite();
    }

    private class PositionDeltaBatchWrite
    implements DeltaBatchWrite {
        private PositionDeltaBatchWrite() {
        }

        public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
            return new PositionDeltaWriteFactory(SparkPositionDeltaWrite.this.sparkSchema, SparkPositionDeltaWrite.this.config);
        }

        public void commit(WriterCommitMessage[] messages) {
            ArrayList<Long> removedFragmentIds = new ArrayList<Long>();
            ArrayList<FragmentMetadata> updatedFragments = new ArrayList<FragmentMetadata>();
            ArrayList<FragmentMetadata> newFragments = new ArrayList<FragmentMetadata>();
            Arrays.stream(messages).map(m -> (DeltaWriteTaskCommit)m).forEach(m -> {
                removedFragmentIds.addAll(m.removedFragmentIds());
                updatedFragments.addAll(m.updatedFragments());
                newFragments.addAll(m.newFragments());
            });
            LanceDatasetAdapter.updateFragments(SparkPositionDeltaWrite.this.config, removedFragmentIds, updatedFragments, newFragments);
        }

        public void abort(WriterCommitMessage[] messages) {
        }
    }

    private static class DeltaWriteTaskCommit
    implements WriterCommitMessage {
        private List<Long> removedFragmentIds;
        private List<FragmentMetadata> updatedFragments;
        private List<FragmentMetadata> newFragments;

        DeltaWriteTaskCommit(List<Long> removedFragmentIds, List<FragmentMetadata> updatedFragments, List<FragmentMetadata> newFragments) {
            this.removedFragmentIds = removedFragmentIds;
            this.updatedFragments = updatedFragments;
            this.newFragments = newFragments;
        }

        public List<Long> removedFragmentIds() {
            return this.removedFragmentIds == null ? Collections.emptyList() : this.removedFragmentIds;
        }

        public List<FragmentMetadata> updatedFragments() {
            return this.updatedFragments == null ? Collections.emptyList() : this.updatedFragments;
        }

        public List<FragmentMetadata> newFragments() {
            return this.newFragments == null ? Collections.emptyList() : this.newFragments;
        }
    }

    private static class LanceDeltaWriter
    implements DeltaWriter<InternalRow> {
        private final LanceConfig config;
        private final LanceDataWriter writer;
        private final Map<Integer, RoaringBitmap> deletedRows;

        private LanceDeltaWriter(LanceConfig config, LanceDataWriter writer) {
            this.config = config;
            this.writer = writer;
            this.deletedRows = new HashMap<Integer, RoaringBitmap>();
        }

        public void delete(InternalRow metadata, InternalRow id) throws IOException {
            int fragmentId = metadata.getInt(0);
            this.deletedRows.compute(fragmentId, (k, v) -> {
                if (v == null) {
                    v = new RoaringBitmap();
                }
                v.add(RowAddress.rowIndex((long)id.getLong(0)));
                return v;
            });
        }

        public void update(InternalRow metadata, InternalRow id, InternalRow row) throws IOException {
            throw new UnsupportedOperationException("Update is not supported");
        }

        public void insert(InternalRow row) throws IOException {
            this.writer.write(row);
        }

        public WriterCommitMessage commit() throws IOException {
            LanceBatchWrite.TaskCommit append = (LanceBatchWrite.TaskCommit)this.writer.commit();
            List<FragmentMetadata> newFragments = append.getFragments();
            ArrayList<Long> removedFragmentIds = new ArrayList<Long>();
            ArrayList<FragmentMetadata> updatedFragments = new ArrayList<FragmentMetadata>();
            this.deletedRows.forEach((fragmentId, rowIndexes) -> {
                FragmentMetadata updatedFragment = LanceDatasetAdapter.deleteRows(this.config, fragmentId, (List<Integer>)ImmutableList.copyOf((Iterable)rowIndexes));
                if (updatedFragment != null) {
                    updatedFragments.add(updatedFragment);
                } else {
                    removedFragmentIds.add(Long.valueOf(fragmentId.intValue()));
                }
            });
            return new DeltaWriteTaskCommit(removedFragmentIds, updatedFragments, newFragments);
        }

        public void abort() throws IOException {
            this.writer.abort();
        }

        public void close() throws IOException {
            this.writer.close();
        }
    }

    private static class PositionDeltaWriteFactory
    implements DeltaWriterFactory {
        private final StructType sparkSchema;
        private final LanceConfig config;

        PositionDeltaWriteFactory(StructType sparkSchema, LanceConfig config) {
            this.sparkSchema = sparkSchema;
            this.config = config;
        }

        public DeltaWriter<InternalRow> createWriter(int partitionId, long taskId) {
            int batch_size = SparkOptions.getBatchSize(this.config);
            LanceArrowWriter arrowWriter = LanceDatasetAdapter.getArrowWriter(this.sparkSchema, batch_size);
            WriteParams params = SparkOptions.genWriteParamsFromConfig(this.config);
            Callable<List> fragmentCreator = () -> LanceDatasetAdapter.createFragment(this.config.getDatasetUri(), arrowWriter, params);
            FutureTask<List<FragmentMetadata>> fragmentCreationTask = new FutureTask<List<FragmentMetadata>>(fragmentCreator);
            Thread fragmentCreationThread = new Thread(fragmentCreationTask);
            fragmentCreationThread.start();
            return new LanceDeltaWriter(this.config, new LanceDataWriter(arrowWriter, fragmentCreationTask, fragmentCreationThread));
        }
    }
}

