/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.sink.FlinkAppenderFactory;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopOutputFile;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.StructLikeSet;
import org.apache.iceberg.util.StructLikeWrapper;
import org.awaitility.Awaitility;
import org.junit.Assert;

public class SimpleDataUtil {
    public static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional((int)1, (String)"id", (Type)Types.IntegerType.get()), Types.NestedField.optional((int)2, (String)"data", (Type)Types.StringType.get())});
    public static final TableSchema FLINK_SCHEMA = TableSchema.builder().field("id", DataTypes.INT()).field("data", DataTypes.STRING()).build();
    public static final RowType ROW_TYPE = (RowType)FLINK_SCHEMA.toRowDataType().getLogicalType();
    public static final Record RECORD = GenericRecord.create((Schema)SCHEMA);

    private SimpleDataUtil() {
    }

    public static Table createTable(String path, Map<String, String> properties, boolean partitioned) {
        PartitionSpec spec = partitioned ? PartitionSpec.builderFor((Schema)SCHEMA).identity("data").build() : PartitionSpec.unpartitioned();
        return new HadoopTables().create(SCHEMA, spec, properties, path);
    }

    public static Record createRecord(Integer id, String data) {
        Record record = RECORD.copy();
        record.setField("id", (Object)id);
        record.setField("data", (Object)data);
        return record;
    }

    public static RowData createRowData(Integer id, String data) {
        return GenericRowData.of((Object[])new Object[]{id, StringData.fromString((String)data)});
    }

    public static RowData createInsert(Integer id, String data) {
        return GenericRowData.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{id, StringData.fromString((String)data)});
    }

    public static RowData createDelete(Integer id, String data) {
        return GenericRowData.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{id, StringData.fromString((String)data)});
    }

    public static RowData createUpdateBefore(Integer id, String data) {
        return GenericRowData.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{id, StringData.fromString((String)data)});
    }

    public static RowData createUpdateAfter(Integer id, String data) {
        return GenericRowData.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{id, StringData.fromString((String)data)});
    }

    public static DataFile writeFile(Table table, Schema schema, PartitionSpec spec, Configuration conf, String location, String filename, List<RowData> rows) throws IOException {
        return SimpleDataUtil.writeFile(table, schema, spec, conf, location, filename, rows, null);
    }

    public static DataFile writeFile(Table table, Schema schema, PartitionSpec spec, Configuration conf, String location, String filename, List<RowData> rows, StructLike partition) throws IOException {
        FileAppender appender;
        Path path = new Path(location, filename);
        FileFormat fileFormat = FileFormat.fromFileName((CharSequence)filename);
        Preconditions.checkNotNull((Object)fileFormat, (String)"Cannot determine format for file: %s", (Object)filename);
        RowType flinkSchema = FlinkSchemaUtil.convert((Schema)schema);
        FlinkAppenderFactory appenderFactory = new FlinkAppenderFactory(table, schema, flinkSchema, (Map)ImmutableMap.of(), spec, null, null, null);
        try (FileAppender closeableAppender = appender = appenderFactory.newAppender(HadoopOutputFile.fromPath((Path)path, (Configuration)conf), fileFormat);){
            closeableAppender.addAll(rows);
        }
        DataFiles.Builder builder = DataFiles.builder((PartitionSpec)spec).withInputFile((InputFile)HadoopInputFile.fromPath((Path)path, (Configuration)conf)).withMetrics(appender.metrics());
        if (partition != null) {
            builder = builder.withPartition(partition);
        }
        return builder.build();
    }

    public static DeleteFile writeEqDeleteFile(Table table, FileFormat format, String filename, FileAppenderFactory<RowData> appenderFactory, List<RowData> deletes) throws IOException {
        EqualityDeleteWriter eqWriter;
        EncryptedOutputFile outputFile = table.encryption().encrypt(HadoopOutputFile.fromPath((Path)new Path(table.location(), filename), (Configuration)new Configuration()));
        try (EqualityDeleteWriter writer = eqWriter = appenderFactory.newEqDeleteWriter(outputFile, format, null);){
            writer.write(deletes);
        }
        return eqWriter.toDeleteFile();
    }

    public static DeleteFile writePosDeleteFile(Table table, FileFormat format, String filename, FileAppenderFactory<RowData> appenderFactory, List<Pair<CharSequence, Long>> positions) throws IOException {
        EncryptedOutputFile outputFile = table.encryption().encrypt(HadoopOutputFile.fromPath((Path)new Path(table.location(), filename), (Configuration)new Configuration()));
        PositionDeleteWriter posWriter = appenderFactory.newPosDeleteWriter(outputFile, format, null);
        PositionDelete posDelete = PositionDelete.create();
        try (PositionDeleteWriter writer = posWriter;){
            for (Pair<CharSequence, Long> p : positions) {
                writer.write(posDelete.set((CharSequence)p.first(), ((Long)p.second()).longValue(), null));
            }
        }
        return posWriter.toDeleteFile();
    }

    private static List<Record> convertToRecords(List<RowData> rows) {
        ArrayList records = Lists.newArrayList();
        for (RowData row : rows) {
            Integer id = row.isNullAt(0) ? null : Integer.valueOf(row.getInt(0));
            String data = row.isNullAt(1) ? null : row.getString(1).toString();
            records.add(SimpleDataUtil.createRecord(id, data));
        }
        return records;
    }

    public static void assertTableRows(String tablePath, List<RowData> expected, String branch) throws IOException {
        SimpleDataUtil.assertTableRecords(tablePath, SimpleDataUtil.convertToRecords(expected), branch);
    }

    public static void assertTableRows(Table table, List<RowData> expected) throws IOException {
        SimpleDataUtil.assertTableRecords(table, SimpleDataUtil.convertToRecords(expected), "main");
    }

    public static void assertTableRows(Table table, List<RowData> expected, String branch) throws IOException {
        SimpleDataUtil.assertTableRecords(table, SimpleDataUtil.convertToRecords(expected), branch);
    }

    public static List<Record> tableRecords(Table table) throws IOException {
        table.refresh();
        ArrayList records = Lists.newArrayList();
        try (CloseableIterable iterable = IcebergGenerics.read((Table)table).build();){
            for (Record record : iterable) {
                records.add(record);
            }
        }
        return records;
    }

    public static boolean equalsRecords(List<Record> expected, List<Record> actual, Schema schema) {
        if (expected.size() != actual.size()) {
            return false;
        }
        Types.StructType type = schema.asStruct();
        StructLikeSet expectedSet = StructLikeSet.create((Types.StructType)type);
        expectedSet.addAll(expected);
        StructLikeSet actualSet = StructLikeSet.create((Types.StructType)type);
        actualSet.addAll(actual);
        return expectedSet.equals((Object)actualSet);
    }

    public static void assertRecordsEqual(List<Record> expected, List<Record> actual, Schema schema) {
        Assert.assertEquals((long)expected.size(), (long)actual.size());
        Types.StructType type = schema.asStruct();
        StructLikeSet expectedSet = StructLikeSet.create((Types.StructType)type);
        expectedSet.addAll(expected);
        StructLikeSet actualSet = StructLikeSet.create((Types.StructType)type);
        actualSet.addAll(actual);
        Assert.assertEquals((Object)expectedSet, (Object)actualSet);
    }

    public static void assertTableRecords(Table table, List<Record> expected, Duration timeout) {
        Awaitility.await((String)"expected list of records should be produced").atMost(timeout).untilAsserted(() -> {
            SimpleDataUtil.equalsRecords(expected, SimpleDataUtil.tableRecords(table), table.schema());
            SimpleDataUtil.assertRecordsEqual(expected, SimpleDataUtil.tableRecords(table), table.schema());
        });
    }

    public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
        SimpleDataUtil.assertTableRecords(table, expected, "main");
    }

    public static void assertTableRecords(Table table, List<Record> expected, String branch) throws IOException {
        table.refresh();
        Snapshot snapshot = SimpleDataUtil.latestSnapshot(table, branch);
        if (snapshot == null) {
            Assert.assertEquals(expected, (Object)ImmutableList.of());
            return;
        }
        Types.StructType type = table.schema().asStruct();
        StructLikeSet expectedSet = StructLikeSet.create((Types.StructType)type);
        expectedSet.addAll(expected);
        try (CloseableIterable iterable = IcebergGenerics.read((Table)table).useSnapshot(snapshot.snapshotId()).build();){
            StructLikeSet actualSet = StructLikeSet.create((Types.StructType)type);
            for (Record record : iterable) {
                actualSet.add((StructLike)record);
            }
            Assert.assertEquals((String)"Should produce the expected record", (Object)expectedSet, (Object)actualSet);
        }
    }

    public static Snapshot latestSnapshot(Table table, String branch) {
        if (branch.equals("main")) {
            return table.currentSnapshot();
        }
        return table.snapshot(branch);
    }

    public static void assertTableRecords(String tablePath, List<Record> expected) throws IOException {
        Preconditions.checkArgument((expected != null ? 1 : 0) != 0, (Object)"expected records shouldn't be null");
        SimpleDataUtil.assertTableRecords(new HadoopTables().load(tablePath), expected, "main");
    }

    public static void assertTableRecords(String tablePath, List<Record> expected, String branch) throws IOException {
        Preconditions.checkArgument((expected != null ? 1 : 0) != 0, (Object)"expected records shouldn't be null");
        SimpleDataUtil.assertTableRecords(new HadoopTables().load(tablePath), expected, branch);
    }

    public static StructLikeSet expectedRowSet(Table table, Record ... records) {
        StructLikeSet set = StructLikeSet.create((Types.StructType)table.schema().asStruct());
        InternalRecordWrapper wrapper = new InternalRecordWrapper(table.schema().asStruct());
        for (Record record : records) {
            set.add((StructLike)wrapper.copyFor((StructLike)record));
        }
        return set;
    }

    public static StructLikeSet actualRowSet(Table table, String ... columns) throws IOException {
        return SimpleDataUtil.actualRowSet(table, null, columns);
    }

    public static StructLikeSet actualRowSet(Table table, Long snapshotId, String ... columns) throws IOException {
        table.refresh();
        StructLikeSet set = StructLikeSet.create((Types.StructType)table.schema().asStruct());
        InternalRecordWrapper wrapper = new InternalRecordWrapper(table.schema().asStruct());
        try (CloseableIterable reader = IcebergGenerics.read((Table)table).useSnapshot(snapshotId == null ? table.currentSnapshot().snapshotId() : snapshotId.longValue()).select(columns).build();){
            reader.forEach(record -> set.add((StructLike)wrapper.copyFor((StructLike)record)));
        }
        return set;
    }

    public static List<DataFile> partitionDataFiles(Table table, Map<String, Object> partitionValues) throws IOException {
        table.refresh();
        Types.StructType partitionType = table.spec().partitionType();
        GenericRecord partitionRecord = GenericRecord.create((Types.StructType)partitionType).copy(partitionValues);
        StructLikeWrapper expectedWrapper = StructLikeWrapper.forType((Types.StructType)partitionType).set((StructLike)partitionRecord);
        ArrayList dataFiles = Lists.newArrayList();
        try (CloseableIterable fileScanTasks = table.newScan().planFiles();){
            for (FileScanTask scanTask : fileScanTasks) {
                StructLikeWrapper wrapper = StructLikeWrapper.forType((Types.StructType)partitionType).set(((DataFile)scanTask.file()).partition());
                if (!expectedWrapper.equals((Object)wrapper)) continue;
                dataFiles.add((DataFile)scanTask.file());
            }
        }
        return dataFiles;
    }

    public static Map<Long, List<DataFile>> snapshotToDataFiles(Table table) throws IOException {
        table.refresh();
        HashMap result = Maps.newHashMap();
        Snapshot current = table.currentSnapshot();
        while (current != null) {
            TableScan tableScan = table.newScan();
            tableScan = current.parentId() != null ? tableScan.appendsBetween(current.parentId().longValue(), current.snapshotId()) : tableScan.useSnapshot(current.snapshotId());
            try (CloseableIterable scanTasks = tableScan.planFiles();){
                result.put(current.snapshotId(), ImmutableList.copyOf((Iterable)Iterables.transform((Iterable)scanTasks, ContentScanTask::file)));
            }
            if (current.parentId() == null) break;
            current = table.snapshot(current.parentId().longValue());
        }
        return result;
    }

    public static List<DataFile> matchingPartitions(List<DataFile> dataFiles, PartitionSpec partitionSpec, Map<String, Object> partitionValues) {
        Types.StructType partitionType = partitionSpec.partitionType();
        GenericRecord partitionRecord = GenericRecord.create((Types.StructType)partitionType).copy(partitionValues);
        StructLikeWrapper expected = StructLikeWrapper.forType((Types.StructType)partitionType).set((StructLike)partitionRecord);
        return dataFiles.stream().filter(df -> {
            StructLikeWrapper wrapper = StructLikeWrapper.forType((Types.StructType)partitionType).set(df.partition());
            return wrapper.equals((Object)expected);
        }).collect(Collectors.toList());
    }
}

