/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.mongodb.source;

import com.google.auto.service.AutoService;
import java.util.ArrayList;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbClientProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.internal.MongodbCollectionProvider;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentDeserializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.serde.DocumentRowDataDeserializer;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.config.MongodbReadOptions;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.enumerator.MongodbSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.reader.MongodbReader;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplit;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.MongoSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.mongodb.source.split.SamplingSplitStrategy;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.bson.BsonDocument;

@AutoService(value={SeaTunnelSource.class})
public class MongodbSource
implements SeaTunnelSource<SeaTunnelRow, MongoSplit, ArrayList<MongoSplit>>,
SupportColumnProjection {
    private static final long serialVersionUID = 1L;
    private MongodbClientProvider clientProvider;
    private DocumentDeserializer<SeaTunnelRow> deserializer;
    private MongoSplitStrategy splitStrategy;
    private SeaTunnelRowType rowType;
    private MongodbReadOptions mongodbReadOptions;

    public String getPluginName() {
        return "MongoDB";
    }

    public void prepare(Config pluginConfig) throws PrepareFailException {
        if (pluginConfig.hasPath(MongodbConfig.URI.key()) && pluginConfig.hasPath(MongodbConfig.DATABASE.key()) && pluginConfig.hasPath(MongodbConfig.COLLECTION.key())) {
            String connection = pluginConfig.getString(MongodbConfig.URI.key());
            String database = pluginConfig.getString(MongodbConfig.DATABASE.key());
            String collection = pluginConfig.getString(MongodbConfig.COLLECTION.key());
            this.clientProvider = MongodbCollectionProvider.builder().connectionString(connection).database(database).collection(collection).build();
        }
        this.rowType = pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key()) ? CatalogTableUtil.buildWithConfig((Config)pluginConfig).getSeaTunnelRowType() : CatalogTableUtil.buildSimpleTextSchema();
        this.deserializer = pluginConfig.hasPath(MongodbConfig.FLAT_SYNC_STRING.key()) ? new DocumentRowDataDeserializer(this.rowType.getFieldNames(), (SeaTunnelDataType<?>)this.rowType, pluginConfig.getBoolean(MongodbConfig.FLAT_SYNC_STRING.key())) : new DocumentRowDataDeserializer(this.rowType.getFieldNames(), (SeaTunnelDataType<?>)this.rowType, (Boolean)MongodbConfig.FLAT_SYNC_STRING.defaultValue());
        SamplingSplitStrategy.Builder splitStrategyBuilder = SamplingSplitStrategy.builder();
        if (pluginConfig.hasPath(MongodbConfig.MATCH_QUERY.key())) {
            splitStrategyBuilder.setMatchQuery(BsonDocument.parse(pluginConfig.getString(MongodbConfig.MATCH_QUERY.key())));
        }
        if (pluginConfig.hasPath(MongodbConfig.SPLIT_KEY.key())) {
            splitStrategyBuilder.setSplitKey(pluginConfig.getString(MongodbConfig.SPLIT_KEY.key()));
        }
        if (pluginConfig.hasPath(MongodbConfig.SPLIT_SIZE.key())) {
            splitStrategyBuilder.setSizePerSplit(pluginConfig.getLong(MongodbConfig.SPLIT_SIZE.key()));
        }
        if (pluginConfig.hasPath(MongodbConfig.PROJECTION.key())) {
            splitStrategyBuilder.setProjection(BsonDocument.parse(pluginConfig.getString(MongodbConfig.PROJECTION.key())));
        }
        this.splitStrategy = splitStrategyBuilder.setClientProvider(this.clientProvider).build();
        MongodbReadOptions.MongoReadOptionsBuilder mongoReadOptionsBuilder = MongodbReadOptions.builder();
        if (pluginConfig.hasPath(MongodbConfig.MAX_TIME_MIN.key())) {
            mongoReadOptionsBuilder.setMaxTimeMS(pluginConfig.getLong(MongodbConfig.MAX_TIME_MIN.key()));
        }
        if (pluginConfig.hasPath(MongodbConfig.FETCH_SIZE.key())) {
            mongoReadOptionsBuilder.setFetchSize(pluginConfig.getInt(MongodbConfig.FETCH_SIZE.key()));
        }
        if (pluginConfig.hasPath(MongodbConfig.CURSOR_NO_TIMEOUT.key())) {
            mongoReadOptionsBuilder.setNoCursorTimeout(pluginConfig.getBoolean(MongodbConfig.CURSOR_NO_TIMEOUT.key()));
        }
        this.mongodbReadOptions = mongoReadOptionsBuilder.build();
    }

    public Boundedness getBoundedness() {
        return Boundedness.BOUNDED;
    }

    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
        return this.rowType;
    }

    public SourceReader<SeaTunnelRow, MongoSplit> createReader(SourceReader.Context readerContext) throws Exception {
        return new MongodbReader(readerContext, this.clientProvider, this.deserializer, this.mongodbReadOptions);
    }

    public SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>> createEnumerator(SourceSplitEnumerator.Context<MongoSplit> enumeratorContext) throws Exception {
        return new MongodbSplitEnumerator(enumeratorContext, this.clientProvider, this.splitStrategy);
    }

    public SourceSplitEnumerator<MongoSplit, ArrayList<MongoSplit>> restoreEnumerator(SourceSplitEnumerator.Context<MongoSplit> enumeratorContext, ArrayList<MongoSplit> checkpointState) throws Exception {
        return new MongodbSplitEnumerator(enumeratorContext, this.clientProvider, this.splitStrategy, checkpointState);
    }
}

