/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.spark.procedure;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.spark.DynamicOverWrite$;
import org.apache.paimon.spark.SparkUtils;
import org.apache.paimon.spark.commands.WriteIntoPaimonTable;
import org.apache.paimon.spark.procedure.BaseProcedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
import org.apache.paimon.spark.procedure.ProcedureParameter;
import org.apache.paimon.spark.sort.TableSorter;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CompactionTaskSerializer;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public class CompactProcedure
extends BaseProcedure {
    private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[]{ProcedureParameter.required("table", DataTypes.StringType), ProcedureParameter.optional("partitions", DataTypes.StringType), ProcedureParameter.optional("order_strategy", DataTypes.StringType), ProcedureParameter.optional("order_by", DataTypes.StringType)};
    private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{new StructField("result", DataTypes.BooleanType, true, Metadata.empty())});

    protected CompactProcedure(TableCatalog tableCatalog) {
        super(tableCatalog);
    }

    @Override
    public ProcedureParameter[] parameters() {
        return PARAMETERS;
    }

    @Override
    public StructType outputType() {
        return OUTPUT_TYPE;
    }

    @Override
    public InternalRow[] call(InternalRow args) {
        List sortColumns;
        Preconditions.checkArgument(args.numFields() >= 1);
        Identifier tableIdent = this.toIdentifier(args.getString(0), PARAMETERS[0].name());
        String partitions = this.blank(args, 1) ? null : args.getString(1);
        String sortType = this.blank(args, 2) ? TableSorter.OrderType.NONE.name() : args.getString(2);
        List<Object> list = sortColumns = this.blank(args, 3) ? Collections.emptyList() : Arrays.asList(args.getString(3).split(","));
        if (TableSorter.OrderType.NONE.name().equals(sortType) && !sortColumns.isEmpty()) {
            throw new IllegalArgumentException("order_strategy \"none\" cannot work with order_by columns.");
        }
        return this.modifyPaimonTable(tableIdent, table -> {
            Preconditions.checkArgument(table instanceof FileStoreTable);
            InternalRow internalRow = this.newInternalRow(this.execute((FileStoreTable)table, sortType, sortColumns, partitions));
            return new InternalRow[]{internalRow};
        });
    }

    @Override
    public String description() {
        return "This procedure execute compact action on paimon table.";
    }

    private boolean blank(InternalRow args, int index) {
        return args.isNullAt(index) || StringUtils.isBlank(args.getString(index));
    }

    private boolean execute(FileStoreTable table, String sortType, List<String> sortColumns, @Nullable String partitions) {
        table = table.copy((Map)Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
        BucketMode bucketMode = table.bucketMode();
        TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
        if (orderType.equals((Object)TableSorter.OrderType.NONE)) {
            JavaSparkContext javaSparkContext = new JavaSparkContext(this.spark().sparkContext());
            Predicate filter = StringUtils.isBlank(partitions) ? null : PredicateBuilder.partitions(ParameterUtils.getPartitions(partitions), table.rowType());
            switch (bucketMode) {
                case FIXED: 
                case DYNAMIC: {
                    this.compactAwareBucketTable((FileStoreTable)table, filter, javaSparkContext);
                    break;
                }
                case UNAWARE: {
                    this.compactUnAwareBucketTable((FileStoreTable)table, filter, javaSparkContext);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Spark compact with " + (Object)((Object)bucketMode) + " is not support yet.");
                }
            }
        } else {
            switch (bucketMode) {
                case UNAWARE: {
                    this.sortCompactUnAwareBucketTable((FileStoreTable)table, orderType, sortColumns, partitions);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Spark compact with sort_type " + sortType + " only support unaware-bucket append-only table yet.");
                }
            }
        }
        return true;
    }

    private void compactAwareBucketTable(FileStoreTable table, @Nullable Predicate filter, JavaSparkContext javaSparkContext) {
        List partitionBuckets;
        InnerTableScan scan = table.newScan();
        if (filter != null) {
            scan.withFilter(filter);
        }
        if ((partitionBuckets = scan.plan().splits().stream().map(split -> (DataSplit)split).map(dataSplit -> Pair.of(dataSplit.partition(), dataSplit.bucket())).distinct().collect(Collectors.toList())).isEmpty()) {
            return;
        }
        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
        JavaRDD commitMessageJavaRDD = javaSparkContext.parallelize(partitionBuckets).mapPartitions((FlatMapFunction & Serializable)pairIterator -> {
            IOManager ioManager = SparkUtils.createIOManager();
            BatchTableWrite write = writeBuilder.newWrite();
            write.withIOManager(ioManager);
            try {
                while (pairIterator.hasNext()) {
                    Pair pair = (Pair)pairIterator.next();
                    write.compact((BinaryRow)pair.getLeft(), (Integer)pair.getRight(), true);
                }
                Iterator<CommitMessage> iterator = write.prepareCommit().iterator();
                return iterator;
            }
            finally {
                write.close();
                ioManager.close();
            }
        });
        try (BatchTableCommit commit = writeBuilder.newCommit();){
            commit.commit(commitMessageJavaRDD.collect());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void compactUnAwareBucketTable(FileStoreTable table, @Nullable Predicate filter, JavaSparkContext javaSparkContext) {
        List<AppendOnlyCompactionTask> compactionTasks = new AppendOnlyTableCompactionCoordinator(table, false, filter).run();
        if (compactionTasks.isEmpty()) {
            return;
        }
        CompactionTaskSerializer serializer = new CompactionTaskSerializer();
        ArrayList<byte[]> serializedTasks = new ArrayList<byte[]>();
        try {
            for (AppendOnlyCompactionTask compactionTask : compactionTasks) {
                serializedTasks.add(serializer.serialize(compactionTask));
            }
        }
        catch (IOException e) {
            throw new RuntimeException("serialize compaction task failed");
        }
        String commitUser = UUID.randomUUID().toString();
        JavaRDD commitMessageJavaRDD = javaSparkContext.parallelize(serializedTasks).mapPartitions((FlatMapFunction & Serializable)taskIterator -> {
            AppendOnlyFileStoreWrite write = (AppendOnlyFileStoreWrite)table.store().newWrite(commitUser);
            CompactionTaskSerializer ser = new CompactionTaskSerializer();
            ArrayList<CommitMessage> messages = new ArrayList<CommitMessage>();
            try {
                while (taskIterator.hasNext()) {
                    AppendOnlyCompactionTask task = ser.deserialize(ser.getVersion(), (byte[])taskIterator.next());
                    messages.add(task.doCompact(write));
                }
                Iterator iterator = messages.iterator();
                return iterator;
            }
            finally {
                write.close();
            }
        });
        try (TableCommitImpl commit = table.newCommit(commitUser);){
            commit.commit(commitMessageJavaRDD.collect());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void sortCompactUnAwareBucketTable(FileStoreTable table, TableSorter.OrderType orderType, List<String> sortColumns, @Nullable String partitions) {
        Dataset row = this.spark().read().format("paimon").load(table.coreOptions().path().toString());
        row = StringUtils.isBlank(partitions) ? row : row.where(CompactProcedure.toWhere(partitions));
        new WriteIntoPaimonTable(table, DynamicOverWrite$.MODULE$, TableSorter.getSorter(table, orderType, sortColumns).sort((Dataset<Row>)row), new Options()).run(this.spark());
    }

    @VisibleForTesting
    static String toWhere(String partitions) {
        List<Map<String, String>> maps = ParameterUtils.getPartitions(partitions.split(";"));
        return maps.stream().map(a -> a.entrySet().stream().map(entry -> (String)entry.getKey() + "=" + (String)entry.getValue()).reduce((s0, s1) -> s0 + " AND " + s1)).filter(Optional::isPresent).map(Optional::get).map(a -> "(" + a + ")").reduce((a, b) -> a + " OR " + b).orElse(null);
    }

    public static ProcedureBuilder builder() {
        return new BaseProcedure.Builder<CompactProcedure>(){

            @Override
            public CompactProcedure doBuild() {
                return new CompactProcedure(this.tableCatalog());
            }
        };
    }
}

