/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.deltalake.transactionlog.checkpoint;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.concurrent.BoundedExecutor;
import io.airlift.json.JsonCodec;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.filesystem.TrinoOutputFile;
import io.trino.plugin.base.metrics.FileFormatDataSourceStats;
import io.trino.plugin.deltalake.DeltaLakeColumnHandle;
import io.trino.plugin.deltalake.DeltaLakeConfig;
import io.trino.plugin.deltalake.DeltaLakeErrorCode;
import io.trino.plugin.deltalake.ForDeltaLakeMetadata;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogUtil;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointBuilder;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntries;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointEntryIterator;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointSchemaManager;
import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriter;
import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint;
import io.trino.plugin.hive.NodeVersion;
import io.trino.spi.ErrorCodeSupplier;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;

public class CheckpointWriterManager {
    private final TypeManager typeManager;
    private final CheckpointSchemaManager checkpointSchemaManager;
    private final TrinoFileSystemFactory fileSystemFactory;
    private final String trinoVersion;
    private final TransactionLogAccess transactionLogAccess;
    private final FileFormatDataSourceStats fileFormatDataSourceStats;
    private final JsonCodec<LastCheckpoint> lastCheckpointCodec;
    private final Executor executorService;
    private final int checkpointProcessingParallelism;

    @Inject
    public CheckpointWriterManager(TypeManager typeManager, CheckpointSchemaManager checkpointSchemaManager, TrinoFileSystemFactory fileSystemFactory, NodeVersion nodeVersion, TransactionLogAccess transactionLogAccess, FileFormatDataSourceStats fileFormatDataSourceStats, JsonCodec<LastCheckpoint> lastCheckpointCodec, DeltaLakeConfig deltaLakeConfig, @ForDeltaLakeMetadata ExecutorService executorService) {
        this.typeManager = Objects.requireNonNull(typeManager, "typeManager is null");
        this.checkpointSchemaManager = Objects.requireNonNull(checkpointSchemaManager, "checkpointSchemaManager is null");
        this.fileSystemFactory = Objects.requireNonNull(fileSystemFactory, "fileSystemFactory is null");
        this.trinoVersion = nodeVersion.toString();
        this.transactionLogAccess = Objects.requireNonNull(transactionLogAccess, "transactionLogAccess is null");
        this.fileFormatDataSourceStats = Objects.requireNonNull(fileFormatDataSourceStats, "fileFormatDataSourceStats is null");
        this.lastCheckpointCodec = Objects.requireNonNull(lastCheckpointCodec, "lastCheckpointCodec is null");
        this.executorService = Objects.requireNonNull(executorService, "ExecutorService is null");
        this.checkpointProcessingParallelism = deltaLakeConfig.getCheckpointProcessingParallelism();
    }

    public void writeCheckpoint(ConnectorSession session, TableSnapshot snapshot) {
        try {
            List checkpointLogEntries;
            SchemaTableName table = snapshot.getTable();
            long newCheckpointVersion = snapshot.getVersion();
            snapshot.getLastCheckpointVersion().ifPresent(lastCheckpoint -> Preconditions.checkArgument((newCheckpointVersion > lastCheckpoint ? 1 : 0) != 0, (String)"written checkpoint %s for table %s must be greater than last checkpoint version %s", (Object)newCheckpointVersion, (Object)table, (Object)lastCheckpoint));
            CheckpointBuilder checkpointBuilder = new CheckpointBuilder();
            TrinoFileSystem fileSystem = this.fileSystemFactory.create(session);
            try (Stream<DeltaLakeTransactionLogEntry> checkpointLogEntriesStream = snapshot.getCheckpointTransactionLogEntries(session, (Set<CheckpointEntryIterator.EntryType>)ImmutableSet.of((Object)((Object)CheckpointEntryIterator.EntryType.METADATA), (Object)((Object)CheckpointEntryIterator.EntryType.PROTOCOL)), this.checkpointSchemaManager, this.typeManager, fileSystem, this.fileFormatDataSourceStats, Optional.empty(), (TupleDomain<DeltaLakeColumnHandle>)TupleDomain.all(), Optional.empty(), (Executor)new BoundedExecutor(this.executorService, this.checkpointProcessingParallelism));){
                checkpointLogEntries = (List)checkpointLogEntriesStream.filter(entry -> entry.getMetaData() != null || entry.getProtocol() != null).collect(ImmutableList.toImmutableList());
            }
            if (!checkpointLogEntries.isEmpty()) {
                this.transactionLogAccess.getMetadataEntry(session, snapshot);
                DeltaLakeTransactionLogEntry metadataLogEntry = checkpointLogEntries.stream().filter(logEntry -> logEntry.getMetaData() != null).findFirst().orElseThrow(() -> new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Metadata not found in transaction log for " + String.valueOf(snapshot.getTable())));
                DeltaLakeTransactionLogEntry protocolLogEntry = checkpointLogEntries.stream().filter(logEntry -> logEntry.getProtocol() != null).findFirst().orElseThrow(() -> new TrinoException((ErrorCodeSupplier)DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Protocol not found in transaction log for " + String.valueOf(snapshot.getTable())));
                checkpointBuilder.addLogEntry(metadataLogEntry);
                checkpointBuilder.addLogEntry(protocolLogEntry);
                try (Stream<DeltaLakeTransactionLogEntry> checkpointLogEntriesStream = snapshot.getCheckpointTransactionLogEntries(session, (Set<CheckpointEntryIterator.EntryType>)ImmutableSet.of((Object)((Object)CheckpointEntryIterator.EntryType.TRANSACTION), (Object)((Object)CheckpointEntryIterator.EntryType.ADD), (Object)((Object)CheckpointEntryIterator.EntryType.REMOVE)), this.checkpointSchemaManager, this.typeManager, fileSystem, this.fileFormatDataSourceStats, Optional.of(new TableSnapshot.MetadataAndProtocolEntry(metadataLogEntry.getMetaData(), protocolLogEntry.getProtocol())), (TupleDomain<DeltaLakeColumnHandle>)TupleDomain.all(), Optional.of(Predicates.alwaysTrue()), (Executor)new BoundedExecutor(this.executorService, this.checkpointProcessingParallelism));){
                    checkpointLogEntriesStream.forEach(checkpointBuilder::addLogEntry);
                }
            }
            snapshot.getJsonTransactionLogEntries(fileSystem).forEach(checkpointBuilder::addLogEntry);
            Location transactionLogDir = Location.of((String)TransactionLogUtil.getTransactionLogDir(snapshot.getTableLocation()));
            Location targetFile = transactionLogDir.appendPath("%020d.checkpoint.parquet".formatted(newCheckpointVersion));
            CheckpointWriter checkpointWriter = new CheckpointWriter(this.typeManager, this.checkpointSchemaManager, this.trinoVersion);
            CheckpointEntries checkpointEntries = checkpointBuilder.build();
            TrinoOutputFile checkpointFile = this.fileSystemFactory.create(session).newOutputFile(targetFile);
            checkpointWriter.write(checkpointEntries, checkpointFile);
            LastCheckpoint newLastCheckpoint = new LastCheckpoint(newCheckpointVersion, checkpointEntries.size(), Optional.empty(), Optional.empty());
            Location checkpointPath = transactionLogDir.appendPath("_last_checkpoint");
            TrinoOutputFile outputFile = fileSystem.newOutputFile(checkpointPath);
            outputFile.createOrOverwrite(this.lastCheckpointCodec.toJsonBytes((Object)newLastCheckpoint));
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

