/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.IcebergFilesCommitter;
import org.apache.iceberg.flink.sink.IcebergStreamWriter;
import org.apache.iceberg.flink.sink.PartitionKeySelector;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSink {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSink.class);
    private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
    private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();

    private FlinkSink() {
    }

    public static <T> Builder builderFor(DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) {
        return new Builder().forMapperOutputType(input, mapper, (TypeInformation<RowData>)outputType);
    }

    public static Builder forRow(DataStream<Row> input, TableSchema tableSchema) {
        RowType rowType = (RowType)tableSchema.toRowDataType().getLogicalType();
        DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
        DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
        return FlinkSink.builderFor(input, arg_0 -> ((DataFormatConverters.RowConverter)rowConverter).toInternal(arg_0), FlinkCompatibilityUtil.toTypeInfo(rowType)).tableSchema(tableSchema);
    }

    public static Builder forRowData(DataStream<RowData> input) {
        return new Builder().forRowData((DataStream<RowData>)input);
    }

    static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) {
        if (requestedSchema != null) {
            Schema writeSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), schema);
            TypeUtil.validateWriteSchema(schema, writeSchema, true, true);
            return (RowType)requestedSchema.toRowDataType().getLogicalType();
        }
        return FlinkSchemaUtil.convert(schema);
    }

    static IcebergStreamWriter<RowData> createStreamWriter(Table table, RowType flinkRowType, List<Integer> equalityFieldIds) {
        Map<String, String> props = table.properties();
        long targetFileSize = FlinkSink.getTargetFileSizeBytes(props);
        FileFormat fileFormat = FlinkSink.getFileFormat(props);
        Table serializableTable = SerializableTable.copyOf(table);
        RowDataTaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(serializableTable, flinkRowType, targetFileSize, fileFormat, equalityFieldIds);
        return new IcebergStreamWriter<RowData>(table.name(), taskWriterFactory);
    }

    private static FileFormat getFileFormat(Map<String, String> properties) {
        String formatString = properties.getOrDefault("write.format.default", "parquet");
        return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
    }

    private static long getTargetFileSizeBytes(Map<String, String> properties) {
        return PropertyUtil.propertyAsLong(properties, "write.target-file-size-bytes", 0x20000000L);
    }

    public static class Builder {
        private Function<String, DataStream<RowData>> inputCreator = null;
        private TableLoader tableLoader;
        private Table table;
        private TableSchema tableSchema;
        private boolean overwrite = false;
        private DistributionMode distributionMode = null;
        private Integer writeParallelism = null;
        private List<String> equalityFieldColumns = null;
        private String uidPrefix = null;

        private Builder() {
        }

        private Builder forRowData(DataStream<RowData> newRowDataInput) {
            this.inputCreator = ignored -> newRowDataInput;
            return this;
        }

        private <T> Builder forMapperOutputType(DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) {
            this.inputCreator = newUidPrefix -> {
                if (newUidPrefix != null) {
                    return input.map(mapper, outputType).name(this.operatorName((String)newUidPrefix)).uid(newUidPrefix + "-mapper");
                }
                return input.map(mapper, outputType);
            };
            return this;
        }

        public Builder table(Table newTable) {
            this.table = newTable;
            return this;
        }

        public Builder tableLoader(TableLoader newTableLoader) {
            this.tableLoader = newTableLoader;
            return this;
        }

        public Builder tableSchema(TableSchema newTableSchema) {
            this.tableSchema = newTableSchema;
            return this;
        }

        public Builder overwrite(boolean newOverwrite) {
            this.overwrite = newOverwrite;
            return this;
        }

        public Builder distributionMode(DistributionMode mode) {
            Preconditions.checkArgument(!DistributionMode.RANGE.equals((Object)mode), "Flink does not support 'range' write distribution mode now.");
            this.distributionMode = mode;
            return this;
        }

        public Builder writeParallelism(int newWriteParallelism) {
            this.writeParallelism = newWriteParallelism;
            return this;
        }

        public Builder equalityFieldColumns(List<String> columns) {
            this.equalityFieldColumns = columns;
            return this;
        }

        public Builder uidPrefix(String newPrefix) {
            this.uidPrefix = newPrefix;
            return this;
        }

        public DataStreamSink<RowData> build() {
            Preconditions.checkArgument(this.inputCreator != null, "Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
            Preconditions.checkNotNull(this.tableLoader, "Table loader shouldn't be null");
            DataStream<RowData> rowDataInput = this.inputCreator.apply(this.uidPrefix);
            if (this.table == null) {
                this.tableLoader.open();
                try (TableLoader loader = this.tableLoader;){
                    this.table = loader.loadTable();
                }
                catch (IOException e) {
                    throw new UncheckedIOException("Failed to load iceberg table from table loader: " + this.tableLoader, e);
                }
            }
            RowType flinkRowType = FlinkSink.toFlinkRowType(this.table.schema(), this.tableSchema);
            DataStream<RowData> distributeStream = this.distributeDataStream(rowDataInput, this.table.properties(), this.table.spec(), this.table.schema(), flinkRowType);
            SingleOutputStreamOperator<WriteResult> writerStream = this.appendWriter(distributeStream, flinkRowType);
            SingleOutputStreamOperator<Void> committerStream = this.appendCommitter(writerStream);
            return this.appendDummySink(committerStream);
        }

        private String operatorName(String suffix) {
            return this.uidPrefix != null ? this.uidPrefix + "-" + suffix : suffix;
        }

        private DataStreamSink<RowData> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
            DataStreamSink resultStream = committerStream.addSink((SinkFunction)new DiscardingSink()).name(this.operatorName(String.format("IcebergSink %s", this.table.name()))).setParallelism(1);
            if (this.uidPrefix != null) {
                resultStream = resultStream.uid(this.uidPrefix + "-dummysink");
            }
            return resultStream;
        }

        private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) {
            IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);
            SingleOutputStreamOperator committerStream = writerStream.transform(this.operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, (OneInputStreamOperator)filesCommitter).setParallelism(1).setMaxParallelism(1);
            if (this.uidPrefix != null) {
                committerStream = committerStream.uid(this.uidPrefix + "-committer");
            }
            return committerStream;
        }

        private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) {
            ArrayList<Integer> equalityFieldIds = Lists.newArrayList();
            if (this.equalityFieldColumns != null && this.equalityFieldColumns.size() > 0) {
                for (String column : this.equalityFieldColumns) {
                    Types.NestedField field = this.table.schema().findField(column);
                    Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s", (Object)column, (Object)this.table.schema());
                    equalityFieldIds.add(field.fieldId());
                }
            }
            IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(this.table, flinkRowType, equalityFieldIds);
            int parallelism = this.writeParallelism == null ? input.getParallelism() : this.writeParallelism.intValue();
            SingleOutputStreamOperator writerStream = input.transform(this.operatorName(ICEBERG_STREAM_WRITER_NAME), TypeInformation.of(WriteResult.class), streamWriter).setParallelism(parallelism);
            if (this.uidPrefix != null) {
                writerStream = writerStream.uid(this.uidPrefix + "-writer");
            }
            return writerStream;
        }

        private DataStream<RowData> distributeDataStream(DataStream<RowData> input, Map<String, String> properties, PartitionSpec partitionSpec, Schema iSchema, RowType flinkRowType) {
            DistributionMode writeMode;
            if (this.distributionMode == null) {
                String modeName = PropertyUtil.propertyAsString(properties, "write.distribution-mode", "none");
                writeMode = DistributionMode.fromName(modeName);
            } else {
                writeMode = this.distributionMode;
            }
            switch (writeMode) {
                case NONE: {
                    return input;
                }
                case HASH: {
                    if (partitionSpec.isUnpartitioned()) {
                        return input;
                    }
                    return input.keyBy((KeySelector)new PartitionKeySelector(partitionSpec, iSchema, flinkRowType));
                }
                case RANGE: {
                    LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now", (Object)"write.distribution-mode", (Object)DistributionMode.RANGE.modeName());
                    return input;
                }
            }
            throw new RuntimeException("Unrecognized write.distribution-mode: " + (Object)((Object)writeMode));
        }
    }
}

