/*
 * Decompiled with CFR 0.152.
 */
package co.decodable.sdk.pipeline.internal;

import co.decodable.sdk.pipeline.DecodableStreamSink;
import co.decodable.sdk.pipeline.DecodableStreamSinkBuilder;
import co.decodable.sdk.pipeline.EnvironmentAccess;
import co.decodable.sdk.pipeline.internal.DecodableStreamSinkImpl;
import co.decodable.sdk.pipeline.internal.config.StreamConfig;
import co.decodable.sdk.pipeline.internal.config.StreamConfigMapping;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;

public class DecodableStreamSinkBuilderImpl<T>
implements DecodableStreamSinkBuilder<T> {
    private String streamId;
    private String streamName;
    private SerializationSchema<T> serializationSchema;

    @Override
    public DecodableStreamSinkBuilder<T> withStreamName(String streamName) {
        this.streamName = streamName;
        return this;
    }

    @Override
    public DecodableStreamSinkBuilder<T> withStreamId(String streamId) {
        this.streamId = streamId;
        return this;
    }

    @Override
    public DecodableStreamSinkBuilder<T> withSerializationSchema(SerializationSchema<T> serializationSchema) {
        this.serializationSchema = serializationSchema;
        return this;
    }

    @Override
    public DecodableStreamSink<T> build() {
        Objects.requireNonNull(this.serializationSchema, "serializationSchema");
        Map<String, String> environment = EnvironmentAccess.getEnvironment().getEnvironmentConfiguration();
        StreamConfig streamConfig = new StreamConfigMapping(environment).determineConfig(this.streamName, this.streamId);
        KafkaSink delegate = KafkaSink.builder().setBootstrapServers(streamConfig.bootstrapServers()).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(streamConfig.topic()).setValueSerializationSchema(this.serializationSchema).build()).setDeliveryGuarantee("exactly-once".equals(streamConfig.deliveryGuarantee()) ? DeliveryGuarantee.EXACTLY_ONCE : ("at-least-once".equals(streamConfig.deliveryGuarantee()) ? DeliveryGuarantee.AT_LEAST_ONCE : DeliveryGuarantee.NONE)).setTransactionalIdPrefix(streamConfig.transactionalIdPrefix()).setKafkaProducerConfig(DecodableStreamSinkBuilderImpl.toProperties(streamConfig.kafkaProperties())).build();
        return new DecodableStreamSinkImpl(delegate);
    }

    private static Properties toProperties(Map<String, String> map) {
        Properties p = new Properties();
        p.putAll(map);
        return p;
    }
}

