/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.flink.sink;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.flink.serialization.CommitWrapperSerializer;
import org.apache.seatunnel.translation.flink.serialization.FlinkSimpleVersionedSerializer;
import org.apache.seatunnel.translation.flink.serialization.FlinkWriterStateSerializer;
import org.apache.seatunnel.translation.flink.sink.CommitWrapper;
import org.apache.seatunnel.translation.flink.sink.FlinkCommitter;
import org.apache.seatunnel.translation.flink.sink.FlinkGlobalCommitter;
import org.apache.seatunnel.translation.flink.sink.FlinkSinkWriter;
import org.apache.seatunnel.translation.flink.sink.FlinkSinkWriterContext;
import org.apache.seatunnel.translation.flink.sink.FlinkWriterState;

public class FlinkSink<InputT, CommT, WriterStateT, GlobalCommT>
implements Sink<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>, GlobalCommT> {
    private final SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink;
    private final List<CatalogTable> catalogTables;
    private final int parallelism;

    public FlinkSink(SeaTunnelSink<SeaTunnelRow, WriterStateT, CommT, GlobalCommT> sink, List<CatalogTable> catalogTables, int parallelism) {
        this.sink = sink;
        this.catalogTables = catalogTables;
        this.parallelism = parallelism;
    }

    public SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> createWriter(Sink.InitContext context, List<FlinkWriterState<WriterStateT>> states) throws IOException {
        FlinkSinkWriterContext stContext = new FlinkSinkWriterContext(context, this.parallelism);
        if (states == null || states.isEmpty()) {
            return new FlinkSinkWriter(this.sink.createWriter((SinkWriter.Context)stContext), 1L, stContext);
        }
        List restoredState = states.stream().map(FlinkWriterState::getState).collect(Collectors.toList());
        return new FlinkSinkWriter(this.sink.restoreWriter((SinkWriter.Context)stContext, restoredState), states.get(0).getCheckpointId() + 1L, stContext);
    }

    public Optional<Committer<CommitWrapper<CommT>>> createCommitter() throws IOException {
        return this.sink.createCommitter().map(FlinkCommitter::new);
    }

    public Optional<GlobalCommitter<CommitWrapper<CommT>, GlobalCommT>> createGlobalCommitter() throws IOException {
        return this.sink.createAggregatedCommitter().map(FlinkGlobalCommitter::new);
    }

    public Optional<SimpleVersionedSerializer<CommitWrapper<CommT>>> getCommittableSerializer() {
        try {
            if (this.sink.createCommitter().isPresent() || this.sink.createAggregatedCommitter().isPresent()) {
                return this.sink.getCommitInfoSerializer().map(CommitWrapperSerializer::new);
            }
            return Optional.empty();
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to create Committer or AggregatedCommitter", e);
        }
    }

    public Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer() {
        try {
            if (this.sink.createAggregatedCommitter().isPresent()) {
                return this.sink.getAggregatedCommitInfoSerializer().map(FlinkSimpleVersionedSerializer::new);
            }
            return Optional.empty();
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to create AggregatedCommitter", e);
        }
    }

    public Optional<SimpleVersionedSerializer<FlinkWriterState<WriterStateT>>> getWriterStateSerializer() {
        return this.sink.getWriterStateSerializer().map(FlinkWriterStateSerializer::new);
    }
}

