/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake;

import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.parquet.writer.ParquetSchemaConverter;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.plugin.deltalake.DataFileInfo;
import io.trino.plugin.deltalake.DeltaHiveTypeTranslator;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeErrorCode;
import io.trino.plugin.deltalake.DeltaLakeSessionProperties;
import io.trino.plugin.deltalake.DeltaLakeWriter;
import io.trino.plugin.deltalake.DeltaLakeWriterStats;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.hive.FileWriter;
import io.trino.plugin.hive.HdfsEnvironment;
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.plugin.hive.HiveStorageFormat;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.HiveTypeName;
import io.trino.plugin.hive.RecordFileWriter;
import io.trino.plugin.hive.metastore.StorageFormat;
import io.trino.plugin.hive.parquet.ParquetFileWriter;
import io.trino.plugin.hive.util.CompressionConfigUtil;
import io.trino.plugin.hive.util.ConfigurationUtils;
import io.trino.plugin.hive.util.HiveWriteUtils;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.Page;
import io.trino.spi.PageIndexer;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.TrinoException;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorPageSink;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTimeZone;

public class DeltaLakePageSink
implements ConnectorPageSink {
    private static final Logger LOG = Logger.get(DeltaLakePageSink.class);
    private static final int MAX_PAGE_POSITIONS = 4096;
    private final List<DeltaLakeColumnHandle> dataColumnHandles;
    private final int[] dataColumnInputIndex;
    private final List<String> dataColumnNames;
    private final List<Type> dataColumnTypes;
    private final int[] partitionColumnsInputIndex;
    private final List<String> originalPartitionColumnNames;
    private final List<Type> partitionColumnTypes;
    private final PageIndexer pageIndexer;
    private final HdfsEnvironment hdfsEnvironment;
    private final int maxOpenWriters;
    private final JsonCodec<DataFileInfo> dataFileInfoCodec;
    private final List<DeltaLakeWriter> writers = new ArrayList<DeltaLakeWriter>();
    private final String outputPath;
    private final ConnectorSession session;
    private final DeltaLakeWriterStats stats;
    private final JobConf conf;
    private final TypeManager typeManager;
    private final String trinoVersion;
    private long writtenBytes;
    private long memoryUsage;

    public DeltaLakePageSink(List<DeltaLakeColumnHandle> inputColumns, List<String> originalPartitionColumns, PageIndexerFactory pageIndexerFactory, HdfsEnvironment hdfsEnvironment, int maxOpenWriters, JsonCodec<DataFileInfo> dataFileInfoCodec, String outputPath, ConnectorSession session, DeltaLakeWriterStats stats, TypeManager typeManager, String trinoVersion) {
        Objects.requireNonNull(inputColumns, "inputColumns is null");
        Objects.requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
        this.hdfsEnvironment = Objects.requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
        this.maxOpenWriters = maxOpenWriters;
        this.dataFileInfoCodec = Objects.requireNonNull(dataFileInfoCodec, "dataFileInfoCodec is null");
        ImmutableList.Builder partitionColumnInputIndex = ImmutableList.builder();
        ImmutableList.Builder dataColumnsInputIndex = ImmutableList.builder();
        ImmutableList.Builder partitionColumnTypes = ImmutableList.builder();
        ImmutableList.Builder originalPartitionColumnNames = ImmutableList.builder();
        ImmutableList.Builder dataColumnHandles = ImmutableList.builder();
        ImmutableList.Builder dataColumnTypes = ImmutableList.builder();
        ImmutableList.Builder dataColumnNames = ImmutableList.builder();
        Map canonicalToOriginalPartitionColumns = (Map)originalPartitionColumns.stream().collect(ImmutableMap.toImmutableMap(TransactionLogAccess::canonicalizeColumnName, Function.identity()));
        block4: for (int inputIndex = 0; inputIndex < inputColumns.size(); ++inputIndex) {
            DeltaLakeColumnHandle column = inputColumns.get(inputIndex);
            switch (column.getColumnType()) {
                case PARTITION_KEY: {
                    partitionColumnInputIndex.add((Object)inputIndex);
                    originalPartitionColumnNames.add((Object)((String)canonicalToOriginalPartitionColumns.get(column.getName())));
                    partitionColumnTypes.add((Object)column.getType());
                    continue block4;
                }
                case REGULAR: {
                    dataColumnHandles.add((Object)column);
                    dataColumnsInputIndex.add((Object)inputIndex);
                    dataColumnNames.add((Object)column.getName());
                    dataColumnTypes.add((Object)column.getType());
                    continue block4;
                }
                default: {
                    throw new IllegalStateException("Unexpected column type: " + column.getColumnType());
                }
            }
        }
        this.partitionColumnsInputIndex = Ints.toArray((Collection)partitionColumnInputIndex.build());
        this.dataColumnInputIndex = Ints.toArray((Collection)dataColumnsInputIndex.build());
        this.originalPartitionColumnNames = originalPartitionColumnNames.build();
        this.dataColumnHandles = dataColumnHandles.build();
        this.partitionColumnTypes = partitionColumnTypes.build();
        this.dataColumnNames = dataColumnNames.build();
        this.dataColumnTypes = dataColumnTypes.build();
        this.pageIndexer = pageIndexerFactory.createPageIndexer(this.partitionColumnTypes);
        this.outputPath = outputPath;
        this.session = Objects.requireNonNull(session, "session is null");
        this.stats = stats;
        Configuration conf = hdfsEnvironment.getConfiguration(new HdfsEnvironment.HdfsContext(session), new Path(outputPath));
        CompressionConfigUtil.configureCompression((Configuration)conf, (HiveCompressionCodec)DeltaLakeSessionProperties.getCompressionCodec(session));
        this.conf = ConfigurationUtils.toJobConf((Configuration)conf);
        this.typeManager = Objects.requireNonNull(typeManager, "typeManager is null");
        this.trinoVersion = Objects.requireNonNull(trinoVersion, "trinoVersion is null");
    }

    public long getCompletedBytes() {
        return this.writtenBytes;
    }

    public long getMemoryUsage() {
        return this.memoryUsage;
    }

    public long getValidationCpuNanos() {
        return 0L;
    }

    public CompletableFuture<Collection<Slice>> finish() {
        ListenableFuture result = (ListenableFuture)this.hdfsEnvironment.doAs(this.session.getIdentity(), this::doFinish);
        return MoreFutures.toCompletableFuture((ListenableFuture)result);
    }

    private ListenableFuture<Collection<Slice>> doFinish() {
        ImmutableList.Builder dataFileInfos = ImmutableList.builder();
        Optional<Object> commitException = Optional.empty();
        for (DeltaLakeWriter writer : this.writers) {
            writer.commit();
            try {
                DataFileInfo dataFileInfo = writer.getDataFileInfo();
                dataFileInfos.add((Object)Slices.wrappedBuffer((byte[])this.dataFileInfoCodec.toJsonBytes((Object)dataFileInfo)));
            }
            catch (IOException e) {
                LOG.warn("exception '%s' while finishing write on %s", new Object[]{e, writer});
                commitException = Optional.of(e);
            }
        }
        if (commitException.isPresent()) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE, "Error committing Parquet file to Delta Lake", (Throwable)commitException.get());
        }
        ImmutableList result = dataFileInfos.build();
        this.writtenBytes = this.writers.stream().mapToLong(DeltaLakeWriter::getWrittenBytes).sum();
        return Futures.immediateFuture((Object)result);
    }

    public void abort() {
        this.hdfsEnvironment.doAs(this.session.getIdentity(), this::doAbort);
    }

    private void doAbort() {
        Optional<Object> rollbackException = Optional.empty();
        for (DeltaLakeWriter writer : this.writers) {
            if (writer == null) continue;
            try {
                writer.rollback();
            }
            catch (Exception e) {
                LOG.warn("exception '%s' while rollback on %s", new Object[]{e, writer});
                rollbackException = Optional.of(e);
            }
        }
        if (rollbackException.isPresent()) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE, "Error rolling back write to Delta Lake", (Throwable)rollbackException.get());
        }
    }

    public CompletableFuture<?> appendPage(Page page) {
        if (page.getPositionCount() > 0) {
            this.hdfsEnvironment.doAs(this.session.getIdentity(), () -> this.doAppend(page));
        }
        return NOT_BLOCKED;
    }

    private void doAppend(Page page) {
        while (page.getPositionCount() > 4096) {
            Page chunk = page.getRegion(0, 4096);
            page = page.getRegion(4096, page.getPositionCount() - 4096);
            this.writePage(chunk);
        }
        this.writePage(page);
    }

    private void writePage(Page page) {
        int index;
        int[] writerIndexes = this.getWriterIndexes(page);
        int[] sizes = new int[this.writers.size()];
        int[] nArray = writerIndexes;
        int n = nArray.length;
        for (int i = 0; i < n; ++i) {
            int n2 = index = nArray[i];
            sizes[n2] = sizes[n2] + 1;
        }
        int[][] writerPositions = new int[this.writers.size()][];
        int[] counts = new int[this.writers.size()];
        int position = 0;
        while (position < page.getPositionCount()) {
            index = writerIndexes[position];
            int count = counts[index];
            if (count == 0) {
                writerPositions[index] = new int[sizes[index]];
            }
            writerPositions[index][count] = position++;
            counts[index] = count + 1;
        }
        Page dataPage = this.getDataPage(page);
        for (index = 0; index < writerPositions.length; ++index) {
            int[] positions = writerPositions[index];
            if (positions == null) continue;
            Page pageForWriter = dataPage;
            if (positions.length != dataPage.getPositionCount()) {
                Verify.verify((positions.length == counts[index] ? 1 : 0) != 0);
                pageForWriter = pageForWriter.getPositions(positions, 0, positions.length);
            }
            DeltaLakeWriter writer = this.writers.get(index);
            long currentWritten = writer.getWrittenBytes();
            long currentMemory = writer.getMemoryUsage();
            writer.appendRows(pageForWriter);
            this.writtenBytes += writer.getWrittenBytes() - currentWritten;
            this.memoryUsage += writer.getMemoryUsage() - currentMemory;
        }
    }

    private int[] getWriterIndexes(Page page) {
        Page partitionColumns = DeltaLakePageSink.extractColumns(page, this.partitionColumnsInputIndex);
        int[] writerIndexes = this.pageIndexer.indexPage(partitionColumns);
        if (this.pageIndexer.getMaxIndex() >= this.maxOpenWriters) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE, String.format("Exceeded limit of %s open writers for partitions", this.maxOpenWriters));
        }
        while (this.writers.size() <= this.pageIndexer.getMaxIndex()) {
            this.writers.add(null);
        }
        for (int position = 0; position < page.getPositionCount(); ++position) {
            int writerIndex = writerIndexes[position];
            if (this.writers.get(writerIndex) != null) continue;
            Path filePath = new Path(this.outputPath);
            List<String> partitionValues = DeltaLakePageSink.createPartitionValues(this.partitionColumnTypes, partitionColumns, position);
            Optional<Object> partitionName = Optional.empty();
            if (!this.originalPartitionColumnNames.isEmpty()) {
                String partName = DeltaLakePageSink.makePartName(this.originalPartitionColumnNames, partitionValues);
                filePath = new Path(this.outputPath, partName);
                partitionName = Optional.of(partName);
            }
            String fileName = this.session.getQueryId() + "-" + UUID.randomUUID();
            filePath = new Path(filePath, fileName);
            FileWriter fileWriter = DeltaLakeSessionProperties.isParquetOptimizedWriterEnabled(this.session) ? this.createParquetFileWriter(filePath) : this.createRecordFileWriter(filePath);
            Path rootTableLocation = new Path(this.outputPath);
            try {
                DeltaLakeWriter writer = new DeltaLakeWriter(this.hdfsEnvironment.getFileSystem(this.session.getIdentity(), rootTableLocation, (Configuration)this.conf), fileWriter, rootTableLocation, partitionName.map(partition -> new Path(partition, fileName).toString()).orElse(fileName), partitionValues, this.stats, this.dataColumnHandles);
                this.writers.set(writerIndex, writer);
                continue;
            }
            catch (IOException e) {
                throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE, "Unable to create writer for location: " + this.outputPath, (Throwable)e);
            }
        }
        Verify.verify((this.writers.size() == this.pageIndexer.getMaxIndex() + 1 ? 1 : 0) != 0);
        Verify.verify((!this.writers.contains(null) ? 1 : 0) != 0);
        return writerIndexes;
    }

    private static String makePartName(List<String> partitionColumns, List<String> partitionValues) {
        StringBuilder name = new StringBuilder();
        for (int i = 0; i < partitionColumns.size(); ++i) {
            if (i > 0) {
                name.append("/");
            }
            name.append(FileUtils.escapePathName((String)partitionColumns.get(i), null));
            name.append('=');
            name.append(FileUtils.escapePathName((String)partitionValues.get(i), null));
        }
        return name.toString();
    }

    public static List<String> createPartitionValues(List<Type> partitionColumnTypes, Page partitionColumns, int position) {
        return HiveWriteUtils.createPartitionValues(partitionColumnTypes, (Page)partitionColumns, (int)position).stream().map(value -> value.equals("__HIVE_DEFAULT_PARTITION__") ? null : value).collect(Collectors.toList());
    }

    private FileWriter createParquetFileWriter(Path path) {
        ParquetWriterOptions parquetWriterOptions = ParquetWriterOptions.builder().setMaxBlockSize(DeltaLakeSessionProperties.getParquetWriterBlockSize(this.session)).setMaxPageSize(DeltaLakeSessionProperties.getParquetWriterPageSize(this.session)).build();
        CompressionCodecName compressionCodecName = DeltaLakeSessionProperties.getCompressionCodec(this.session).getParquetCompressionCodec();
        try {
            FileSystem fileSystem = this.hdfsEnvironment.getFileSystem(this.session.getIdentity(), path, (Configuration)this.conf);
            Callable<Void> rollbackAction = () -> {
                fileSystem.delete(path, false);
                return null;
            };
            List parquetTypes = (List)this.dataColumnTypes.stream().map(type -> {
                if (type instanceof TimestampWithTimeZoneType) {
                    Verify.verify((((TimestampWithTimeZoneType)type).getPrecision() == 3 ? 1 : 0) != 0, (String)"Unsupported type: %s", (Object)type);
                    return TimestampType.TIMESTAMP_MILLIS;
                }
                return type;
            }).collect(ImmutableList.toImmutableList());
            int[] identityMapping = new int[this.dataColumnTypes.size()];
            for (int i = 0; i < identityMapping.length; ++i) {
                identityMapping[i] = i;
            }
            ParquetSchemaConverter schemaConverter = new ParquetSchemaConverter(parquetTypes, this.dataColumnNames);
            return new ParquetFileWriter((OutputStream)fileSystem.create(path), rollbackAction, parquetTypes, schemaConverter.getMessageType(), schemaConverter.getPrimitiveTypes(), parquetWriterOptions, identityMapping, compressionCodecName, this.trinoVersion);
        }
        catch (IOException e) {
            throw new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE, "Error creating Parquet file", (Throwable)e);
        }
    }

    private FileWriter createRecordFileWriter(Path path) {
        Properties schema = DeltaLakePageSink.buildSchemaProperties(this.dataColumnNames, this.dataColumnTypes);
        return new RecordFileWriter(path, this.dataColumnNames, StorageFormat.fromHiveStorageFormat((HiveStorageFormat)HiveStorageFormat.PARQUET), schema, HiveStorageFormat.PARQUET.getEstimatedWriterMemoryUsage(), this.conf, this.typeManager, DateTimeZone.UTC, this.session);
    }

    static Properties buildSchemaProperties(List<String> columnNames, List<Type> columnTypes) {
        Properties schema = new Properties();
        schema.setProperty("columns", String.join((CharSequence)",", columnNames));
        schema.setProperty("columns.types", columnTypes.stream().map(DeltaHiveTypeTranslator::toHiveType).map(HiveType::getHiveTypeName).map(HiveTypeName::toString).collect(Collectors.joining(":")));
        return schema;
    }

    private Page getDataPage(Page page) {
        Block[] blocks = new Block[this.dataColumnInputIndex.length];
        for (int i = 0; i < this.dataColumnInputIndex.length; ++i) {
            int dataColumn = this.dataColumnInputIndex[i];
            blocks[i] = page.getBlock(dataColumn);
        }
        return new Page(page.getPositionCount(), blocks);
    }

    private static Page extractColumns(Page page, int[] columns) {
        Block[] blocks = new Block[columns.length];
        for (int i = 0; i < columns.length; ++i) {
            int dataColumn = columns[i];
            blocks[i] = page.getBlock(dataColumn);
        }
        return new Page(page.getPositionCount(), blocks);
    }
}

