/*
 * Decompiled with CFR 0.152.
 */
package co.decodable.sdk.pipeline.internal;

import co.decodable.sdk.pipeline.DecodableStreamSource;
import co.decodable.sdk.pipeline.DecodableStreamSourceBuilder;
import co.decodable.sdk.pipeline.EnvironmentAccess;
import co.decodable.sdk.pipeline.StartupMode;
import co.decodable.sdk.pipeline.internal.DecodableStreamSourceImpl;
import co.decodable.sdk.pipeline.internal.config.StreamConfig;
import co.decodable.sdk.pipeline.internal.config.StreamConfigMapping;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

public class DecodableStreamSourceBuilderImpl<T>
implements DecodableStreamSourceBuilder<T> {
    private String streamId;
    private String streamName;
    private StartupMode startupMode;
    private DeserializationSchema<T> deserializationSchema;

    @Override
    public DecodableStreamSourceBuilder<T> withStreamName(String streamName) {
        this.streamName = streamName;
        return this;
    }

    @Override
    public DecodableStreamSourceBuilder<T> withStreamId(String streamId) {
        this.streamId = streamId;
        return this;
    }

    @Override
    public DecodableStreamSourceBuilder<T> withStartupMode(StartupMode startupMode) {
        this.startupMode = startupMode;
        return this;
    }

    @Override
    public DecodableStreamSourceBuilder<T> withDeserializationSchema(DeserializationSchema<T> deserializationSchema) {
        this.deserializationSchema = deserializationSchema;
        return this;
    }

    @Override
    public DecodableStreamSource<T> build() {
        Objects.requireNonNull(this.deserializationSchema, "deserializationSchema");
        Map<String, String> environment = EnvironmentAccess.getEnvironment().getEnvironmentConfiguration();
        StreamConfig streamConfig = new StreamConfigMapping(environment).determineConfig(this.streamName, this.streamId);
        KafkaSourceBuilder builder = KafkaSource.builder().setBootstrapServers(streamConfig.bootstrapServers()).setTopics(new String[]{streamConfig.topic()}).setProperties(DecodableStreamSourceBuilderImpl.toProperties(streamConfig.kafkaProperties())).setValueOnlyDeserializer(this.deserializationSchema);
        if (streamConfig.startupMode() != null) {
            builder.setStartingOffsets(this.toOffsetsInitializer(streamConfig.startupMode()));
        } else if (this.startupMode != null) {
            builder.setStartingOffsets(this.toOffsetsInitializer(this.startupMode));
        }
        KafkaSource delegate = builder.build();
        return new DecodableStreamSourceImpl(delegate);
    }

    private static Properties toProperties(Map<String, String> map) {
        Properties p = new Properties();
        p.putAll(map);
        return p;
    }

    private OffsetsInitializer toOffsetsInitializer(StartupMode startupMode) {
        switch (startupMode) {
            case EARLIEST_OFFSET: {
                return OffsetsInitializer.earliest();
            }
            case LATEST_OFFSET: {
                return OffsetsInitializer.latest();
            }
        }
        throw new IllegalArgumentException("Unexpected startup mode: " + String.valueOf((Object)startupMode));
    }
}

