/*
 * Decompiled with CFR 0.152.
 */
package io.cdap.plugin.salesforce.plugin.source.streaming;

import com.sforce.ws.ConnectionException;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.format.UnexpectedFormatException;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.streaming.StreamingContext;
import io.cdap.plugin.salesforce.SObjectDescriptor;
import io.cdap.plugin.salesforce.SalesforceSchemaUtil;
import io.cdap.plugin.salesforce.plugin.source.streaming.SalesforceReceiver;
import io.cdap.plugin.salesforce.plugin.source.streaming.SalesforceStreamingSourceConfig;
import java.io.Serializable;
import java.time.Instant;
import java.time.LocalTime;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.Receiver;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class SalesforceStreamingSourceUtil {
    private static final Logger LOG = LoggerFactory.getLogger(SalesforceStreamingSourceUtil.class);

    static JavaDStream<StructuredRecord> getStructuredRecordJavaDStream(StreamingContext streamingContext, SalesforceStreamingSourceConfig config) throws ConnectionException {
        config.ensurePushTopicExistAndWithCorrectFields();
        Schema schema = streamingContext.getOutputSchema();
        if (schema == null) {
            schema = SalesforceSchemaUtil.getSchema(config.getAuthenticatorCredentials(), SObjectDescriptor.fromQuery(config.getQuery()));
        }
        LOG.debug("Schema is {}", (Object)schema);
        JavaStreamingContext jssc = streamingContext.getSparkStreamingContext();
        Schema finalSchema = schema;
        return jssc.receiverStream((Receiver)new SalesforceReceiver(config.getAuthenticatorCredentials(), config.getPushTopicName())).map((Function & Serializable)jsonMessage -> SalesforceStreamingSourceUtil.getStructuredRecord(jsonMessage, finalSchema)).filter(Objects::nonNull);
    }

    private static StructuredRecord getStructuredRecord(String jsonMessage, Schema schema) {
        JSONObject sObjectFields;
        StructuredRecord.Builder builder = StructuredRecord.builder((Schema)schema);
        try {
            sObjectFields = new JSONObject(jsonMessage).getJSONObject("sobject");
        }
        catch (JSONException e) {
            throw new IllegalStateException(String.format("Cannot retrieve /data/sobject from json message %s", jsonMessage), e);
        }
        for (Map.Entry entry : sObjectFields.toMap().entrySet()) {
            String fieldName = (String)entry.getKey();
            Object value = entry.getValue();
            Schema.Field field = schema.getField(fieldName, true);
            if (field == null) continue;
            builder.set(field.getName(), SalesforceStreamingSourceUtil.convertValue(value, field));
        }
        return builder.build();
    }

    private static Object convertValue(Object value, Schema.Field field) {
        if (value == null) {
            return null;
        }
        Schema fieldSchema = field.getSchema();
        if (fieldSchema.isNullable()) {
            fieldSchema = fieldSchema.getNonNullable();
        }
        Schema.Type fieldSchemaType = fieldSchema.getType();
        Schema.LogicalType logicalType = fieldSchema.getLogicalType();
        if (fieldSchema.getLogicalType() != null) {
            String valueString = (String)value;
            switch (logicalType) {
                case DATE: {
                    return Math.toIntExact(ChronoUnit.DAYS.between(Instant.EPOCH, Instant.parse(valueString)));
                }
                case TIMESTAMP_MICROS: {
                    return TimeUnit.MILLISECONDS.toMicros(Instant.parse(valueString).toEpochMilli());
                }
                case TIME_MICROS: {
                    return TimeUnit.NANOSECONDS.toMicros(LocalTime.parse(valueString).toNanoOfDay());
                }
            }
            throw new UnexpectedFormatException(String.format("Field '%s' is of unsupported type '%s'", field.getName(), logicalType.getToken()));
        }
        if (value instanceof Map) {
            if (fieldSchemaType.equals((Object)Schema.Type.STRING)) {
                return value.toString();
            }
            throw new UnexpectedFormatException(String.format("Field '%s' is of type '%s', but value found is '%s'", field.getName(), fieldSchemaType.toString(), value.toString()));
        }
        return value;
    }

    private SalesforceStreamingSourceUtil() {
    }
}

