/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.iceberg.flink.source.BoundedTestSource;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class BoundedTableFactory
implements DynamicTableSourceFactory {
    private static final AtomicInteger DATA_SET_ID = new AtomicInteger(0);
    private static final Map<String, List<List<Row>>> DATA_SETS = Maps.newHashMap();
    private static final ConfigOption<String> DATA_ID = ConfigOptions.key((String)"data-id").stringType().noDefaultValue();

    public static String registerDataSet(List<List<Row>> dataSet) {
        String dataSetId = String.valueOf(DATA_SET_ID.incrementAndGet());
        DATA_SETS.put(dataSetId, dataSet);
        return dataSetId;
    }

    public static void clearDataSets() {
        DATA_SETS.clear();
    }

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema((TableSchema)context.getCatalogTable().getSchema());
        Configuration configuration = Configuration.fromMap((Map)context.getCatalogTable().getOptions());
        String dataId = configuration.getString(DATA_ID);
        Preconditions.checkArgument((boolean)DATA_SETS.containsKey(dataId), (String)"data-id %s does not found in registered data set.", (Object)dataId);
        return new BoundedTableSource(DATA_SETS.get(dataId), tableSchema);
    }

    public String factoryIdentifier() {
        return "BoundedSource";
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return ImmutableSet.of();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return ImmutableSet.of(DATA_ID);
    }

    private static class BoundedTableSource
    implements ScanTableSource {
        private final List<List<Row>> elementsPerCheckpoint;
        private final TableSchema tableSchema;

        private BoundedTableSource(List<List<Row>> elementsPerCheckpoint, TableSchema tableSchema) {
            this.elementsPerCheckpoint = elementsPerCheckpoint;
            this.tableSchema = tableSchema;
        }

        private BoundedTableSource(BoundedTableSource toCopy) {
            this.elementsPerCheckpoint = toCopy.elementsPerCheckpoint;
            this.tableSchema = toCopy.tableSchema;
        }

        public ChangelogMode getChangelogMode() {
            Supplier<Stream> supplier = () -> this.elementsPerCheckpoint.stream().flatMap(Collection::stream);
            ChangelogMode.Builder builder = ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT);
            if (supplier.get().anyMatch(r -> r.getKind() == RowKind.DELETE)) {
                builder.addContainedKind(RowKind.DELETE);
            }
            if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_BEFORE)) {
                builder.addContainedKind(RowKind.UPDATE_BEFORE);
            }
            if (supplier.get().anyMatch(r -> r.getKind() == RowKind.UPDATE_AFTER)) {
                builder.addContainedKind(RowKind.UPDATE_AFTER);
            }
            return builder.build();
        }

        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
            return new DataStreamScanProvider(){

                public DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment env) {
                    boolean checkpointEnabled = env.getCheckpointConfig().isCheckpointingEnabled();
                    BoundedTestSource source = new BoundedTestSource(elementsPerCheckpoint, checkpointEnabled);
                    RowType rowType = (RowType)tableSchema.toRowDataType().getLogicalType();
                    DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(tableSchema.getFieldDataTypes());
                    return env.addSource(source, (TypeInformation)new RowTypeInfo(tableSchema.getFieldTypes())).map(arg_0 -> ((DataFormatConverters.RowConverter)rowConverter).toInternal(arg_0), FlinkCompatibilityUtil.toTypeInfo((RowType)rowType));
                }

                public boolean isBounded() {
                    return true;
                }
            };
        }

        public DynamicTableSource copy() {
            return new BoundedTableSource(this);
        }

        public String asSummaryString() {
            return "Bounded test table source";
        }
    }
}

