/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.JobMode;

public class SinkFlowTestUtils {
    public static void runBatchWithCheckpointDisabled(CatalogTable catalogTable, ReadonlyConfig options, TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory, List<SeaTunnelRow> rows) throws IOException {
        JobContext context = new JobContext(Long.valueOf(System.currentTimeMillis()));
        context.setJobMode(JobMode.BATCH);
        context.setEnableCheckpoint(false);
        SinkFlowTestUtils.runWithContext(catalogTable, options, factory, rows, context, 1);
    }

    public static void runBatchWithCheckpointEnabled(CatalogTable catalogTable, ReadonlyConfig options, TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory, List<SeaTunnelRow> rows) throws IOException {
        JobContext context = new JobContext(Long.valueOf(System.currentTimeMillis()));
        context.setJobMode(JobMode.BATCH);
        context.setEnableCheckpoint(true);
        SinkFlowTestUtils.runWithContext(catalogTable, options, factory, rows, context, 1);
    }

    public static void runParallelSubtasksBatchWithCheckpointDisabled(CatalogTable catalogTable, ReadonlyConfig options, TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory, List<SeaTunnelRow> rows, int parallelism) throws IOException {
        JobContext context = new JobContext(Long.valueOf(System.currentTimeMillis()));
        context.setJobMode(JobMode.BATCH);
        context.setEnableCheckpoint(false);
        SinkFlowTestUtils.runWithContext(catalogTable, options, factory, rows, context, parallelism);
    }

    private static void runWithContext(CatalogTable catalogTable, ReadonlyConfig options, TableSinkFactory<SeaTunnelRow, ?, ?, ?> factory, List<SeaTunnelRow> rows, JobContext context, int parallelism) throws IOException {
        SeaTunnelSink sink = factory.createSink(new TableSinkFactoryContext(catalogTable, options, Thread.currentThread().getContextClassLoader())).createSink();
        sink.setJobContext(context);
        ArrayList commitInfos = new ArrayList();
        for (int i = 0; i < parallelism; ++i) {
            SinkWriter sinkWriter = sink.createWriter((SinkWriter.Context)new DefaultSinkWriterContext(i, parallelism));
            for (SeaTunnelRow row : rows) {
                sinkWriter.write((Object)row);
            }
            Optional commitInfo = sinkWriter.prepareCommit(1L);
            sinkWriter.snapshotState(1L);
            sinkWriter.close();
            if (!commitInfo.isPresent()) continue;
            commitInfos.add(commitInfo.get());
        }
        Optional sinkCommitter = sink.createCommitter();
        Optional aggregatedCommitterOptional = sink.createAggregatedCommitter();
        if (!commitInfos.isEmpty()) {
            if (aggregatedCommitterOptional.isPresent()) {
                SinkAggregatedCommitter aggregatedCommitter = (SinkAggregatedCommitter)aggregatedCommitterOptional.get();
                MultiTableResourceManager resourceManager = null;
                if (aggregatedCommitter instanceof SupportMultiTableSinkAggregatedCommitter) {
                    resourceManager = ((SupportMultiTableSinkAggregatedCommitter)aggregatedCommitter).initMultiTableResourceManager(1, 1);
                }
                aggregatedCommitter.init();
                if (resourceManager != null) {
                    ((SupportMultiTableSinkAggregatedCommitter)aggregatedCommitter).setMultiTableResourceManager(resourceManager, 0);
                }
                Object aggregatedCommitInfoT = aggregatedCommitter.combine(commitInfos);
                aggregatedCommitter.commit(Collections.singletonList(aggregatedCommitInfoT));
                aggregatedCommitter.close();
            } else if (sinkCommitter.isPresent()) {
                ((SinkCommitter)sinkCommitter.get()).commit(commitInfos);
            } else {
                throw new RuntimeException("No committer found");
            }
        }
    }
}

