/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.salesforce.internal.client;

import com.google.protobuf.ByteString;
import com.salesforce.eventbus.protobuf.ConsumerEvent;
import com.salesforce.eventbus.protobuf.FetchRequest;
import com.salesforce.eventbus.protobuf.FetchResponse;
import com.salesforce.eventbus.protobuf.ProducerEvent;
import com.salesforce.eventbus.protobuf.PubSubGrpc;
import com.salesforce.eventbus.protobuf.PublishRequest;
import com.salesforce.eventbus.protobuf.PublishResponse;
import com.salesforce.eventbus.protobuf.ReplayPreset;
import com.salesforce.eventbus.protobuf.SchemaRequest;
import com.salesforce.eventbus.protobuf.TopicInfo;
import com.salesforce.eventbus.protobuf.TopicRequest;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.camel.component.salesforce.PubSubApiConsumer;
import org.apache.camel.component.salesforce.PubSubDeserializeType;
import org.apache.camel.component.salesforce.SalesforceLoginConfig;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.api.dto.pubsub.PublishResult;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.component.salesforce.internal.client.TokenCredentials;
import org.apache.camel.support.service.ServiceSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class PubSubApiClient
extends ServiceSupport {
    public static final String PUBSUB_ERROR_AUTH_ERROR = "sfdc.platform.eventbus.grpc.service.auth.error";
    private static final String PUBSUB_ERROR_AUTH_REFRESH_INVALID = "sfdc.platform.eventbus.grpc.service.auth.refresh.invalid";
    protected PubSubGrpc.PubSubStub asyncStub;
    protected PubSubGrpc.PubSubBlockingStub blockingStub;
    protected String accessToken;
    private final long backoffIncrement;
    private final long maxBackoff;
    private final String pubSubHost;
    private final int pubSubPort;
    private final Logger LOG = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    private final SalesforceLoginConfig loginConfig;
    private final SalesforceSession session;
    private final Map<String, Schema> schemaCache = new ConcurrentHashMap<String, Schema>();
    private final Map<String, String> schemaJsonCache = new ConcurrentHashMap<String, String>();
    private final Map<String, TopicInfo> topicInfoCache = new ConcurrentHashMap<String, TopicInfo>();
    private final ConcurrentHashMap<PubSubApiConsumer, StreamObserver<FetchRequest>> observerMap = new ConcurrentHashMap();
    private ManagedChannel channel;
    private boolean usePlainTextConnection = false;
    private ReplayPreset initialReplayPreset;
    private String initialReplayId;

    public PubSubApiClient(SalesforceSession session, SalesforceLoginConfig loginConfig, String pubSubHost, int pubSubPort, long backoffIncrement, long maxBackoff) {
        this.session = session;
        this.loginConfig = loginConfig;
        this.pubSubHost = pubSubHost;
        this.pubSubPort = pubSubPort;
        this.maxBackoff = maxBackoff;
        this.backoffIncrement = backoffIncrement;
    }

    public List<PublishResult> publishMessage(String topic, List<?> bodies) throws IOException {
        this.LOG.debug("Preparing to publish on topic {}", (Object)topic);
        TopicInfo topicInfo = this.getTopicInfo(topic);
        String busTopicName = topicInfo.getTopicName();
        Schema schema = this.getSchema(topicInfo.getSchemaId());
        ArrayList<ProducerEvent> events = new ArrayList<ProducerEvent>(bodies.size());
        for (Object body : bodies) {
            ProducerEvent event = this.createProducerEvent(topicInfo.getSchemaId(), schema, body);
            events.add(event);
        }
        PublishRequest publishRequest = PublishRequest.newBuilder().setTopicName(busTopicName).addAllEvents(events).build();
        PublishResponse response = this.blockingStub.publish(publishRequest);
        this.LOG.debug("Published on topic {}", (Object)topic);
        List<com.salesforce.eventbus.protobuf.PublishResult> results = response.getResultsList();
        ArrayList<PublishResult> publishResults = new ArrayList<PublishResult>(results.size());
        for (com.salesforce.eventbus.protobuf.PublishResult rawResult : results) {
            if (rawResult.hasError()) {
                this.LOG.error("{} {} ", (Object)rawResult.getError().getCode(), (Object)rawResult.getError().getMsg());
            }
            publishResults.add(new PublishResult(rawResult));
        }
        return publishResults;
    }

    public void subscribe(PubSubApiConsumer consumer, ReplayPreset replayPreset, String initialReplayId) {
        this.LOG.debug("Starting subscribe {}", (Object)consumer.getTopic());
        this.initialReplayPreset = replayPreset;
        this.initialReplayId = initialReplayId;
        if (replayPreset == ReplayPreset.CUSTOM && initialReplayId == null) {
            throw new RuntimeException("initialReplayId is required for ReplayPreset.CUSTOM");
        }
        ByteString replayId = null;
        if (initialReplayId != null) {
            replayId = PubSubApiClient.base64DecodeToByteString(initialReplayId);
        }
        String topic = consumer.getTopic();
        this.LOG.info("Subscribing to topic: {}.", (Object)topic);
        FetchResponseObserver responseObserver = new FetchResponseObserver(consumer);
        StreamObserver<FetchRequest> serverStream = this.asyncStub.subscribe(responseObserver);
        this.LOG.info("Subscribe successful.");
        responseObserver.setServerStream(serverStream);
        this.observerMap.put(consumer, serverStream);
        FetchRequest.Builder fetchRequestBuilder = FetchRequest.newBuilder().setReplayPreset(replayPreset).setTopicName(topic).setNumRequested(consumer.getBatchSize());
        if (replayPreset == ReplayPreset.CUSTOM) {
            fetchRequestBuilder.setReplayId(replayId);
        }
        serverStream.onNext((Object)fetchRequestBuilder.build());
    }

    public TopicInfo getTopicInfo(String name) {
        return this.topicInfoCache.computeIfAbsent(name, topic -> this.blockingStub.getTopic(TopicRequest.newBuilder().setTopicName((String)topic).build()));
    }

    public String getSchemaJson(String schemaId) {
        return this.schemaJsonCache.computeIfAbsent(schemaId, s -> this.blockingStub.getSchema(SchemaRequest.newBuilder().setSchemaId((String)s).build()).getSchemaJson());
    }

    public Schema getSchema(String schemaId) {
        return this.schemaCache.computeIfAbsent(schemaId, id -> new Schema.Parser().parse(this.getSchemaJson((String)id)));
    }

    public static String base64EncodeByteString(ByteString bs) {
        ByteBuffer bb = bs.asReadOnlyByteBuffer();
        bb.position(0);
        byte[] bytes = new byte[bb.limit()];
        bb.get(bytes, 0, bytes.length);
        return Base64.getEncoder().encodeToString(bytes);
    }

    public static ByteString base64DecodeToByteString(String b64) {
        byte[] decode = Base64.getDecoder().decode(b64);
        return ByteString.copyFrom((byte[])decode);
    }

    protected void doStart() throws Exception {
        super.doStart();
        ManagedChannelBuilder channelBuilder = ManagedChannelBuilder.forAddress((String)this.pubSubHost, (int)this.pubSubPort);
        if (this.usePlainTextConnection) {
            channelBuilder.usePlaintext();
        }
        this.channel = channelBuilder.build();
        TokenCredentials callCredentials = new TokenCredentials(this.session);
        this.asyncStub = (PubSubGrpc.PubSubStub)PubSubGrpc.newStub((Channel)this.channel).withCallCredentials(callCredentials);
        this.blockingStub = (PubSubGrpc.PubSubBlockingStub)PubSubGrpc.newBlockingStub((Channel)this.channel).withCallCredentials(callCredentials);
        this.accessToken = this.session.getAccessToken();
        if (this.accessToken == null && !this.loginConfig.isLazyLogin()) {
            try {
                this.accessToken = this.session.login(null);
            }
            catch (SalesforceException e) {
                throw new RuntimeException((Throwable)((Object)e));
            }
        }
    }

    protected void doStop() throws Exception {
        this.LOG.warn("Stopping PubSubApiClient");
        this.observerMap.values().forEach(observer -> {
            this.LOG.debug("Stopping subscription");
            observer.onCompleted();
        });
        this.channel.shutdown();
        this.channel.awaitTermination(10L, TimeUnit.SECONDS);
        super.doStop();
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private ProducerEvent createProducerEvent(String schemaId, Schema schema, Object body) throws IOException {
        byte[] bytes;
        if (body instanceof ProducerEvent) {
            return (ProducerEvent)body;
        }
        if (body instanceof IndexedRecord) {
            IndexedRecord indexedRecord = (IndexedRecord)body;
            if (body instanceof GenericRecord) {
                GenericRecord record = (GenericRecord)body;
                bytes = this.getBytes(body, (DatumWriter<Object>)new GenericDatumWriter(record.getSchema()));
                return ProducerEvent.newBuilder().setSchemaId(schemaId).setPayload(ByteString.copyFrom((byte[])bytes)).build();
            } else {
                if (!(body instanceof SpecificRecord)) throw new IllegalArgumentException("Body is of unexpected type: " + indexedRecord.getClass().getName());
                bytes = this.getBytes(body, (DatumWriter<Object>)new SpecificDatumWriter());
            }
            return ProducerEvent.newBuilder().setSchemaId(schemaId).setPayload(ByteString.copyFrom((byte[])bytes)).build();
        } else if (body instanceof byte[]) {
            byte[] bodyBytes;
            bytes = bodyBytes = (byte[])body;
            return ProducerEvent.newBuilder().setSchemaId(schemaId).setPayload(ByteString.copyFrom((byte[])bytes)).build();
        } else if (body instanceof String) {
            String json = (String)body;
            JsonAvroConverter converter = new JsonAvroConverter();
            bytes = converter.convertToAvro(json.getBytes(), schema);
            return ProducerEvent.newBuilder().setSchemaId(schemaId).setPayload(ByteString.copyFrom((byte[])bytes)).build();
        } else {
            bytes = this.getBytes(body, (DatumWriter<Object>)new ReflectDatumWriter(schema));
        }
        return ProducerEvent.newBuilder().setSchemaId(schemaId).setPayload(ByteString.copyFrom((byte[])bytes)).build();
    }

    private byte[] getBytes(Object body, DatumWriter<Object> writer) throws IOException {
        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
        BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder((OutputStream)buffer, null);
        writer.write(body, (Encoder)encoder);
        byte[] bytes = buffer.toByteArray();
        return bytes;
    }

    public void setUsePlainTextConnection(boolean usePlainTextConnection) {
        this.usePlainTextConnection = usePlainTextConnection;
    }

    private class FetchResponseObserver
    implements StreamObserver<FetchResponse> {
        private final Logger LOG = LoggerFactory.getLogger(this.getClass());
        private final PubSubApiConsumer consumer;
        private final Map<String, Class<?>> eventClassMap;
        private final Class<?> pojoClass;
        private String replayId;
        private StreamObserver<FetchRequest> serverStream;

        public FetchResponseObserver(PubSubApiConsumer consumer) {
            this.consumer = consumer;
            this.eventClassMap = consumer.getEventClassMap();
            this.pojoClass = consumer.getPojoClass();
        }

        public void onNext(FetchResponse fetchResponse) {
            String topic = this.consumer.getTopic();
            this.LOG.debug("Received {} events on topic: {}", (Object)fetchResponse.getEventsList().size(), (Object)topic);
            this.LOG.debug("rpcId: {}", (Object)fetchResponse.getRpcId());
            this.LOG.debug("pending_num_requested: {}", (Object)fetchResponse.getPendingNumRequested());
            for (ConsumerEvent ce : fetchResponse.getEventsList()) {
                try {
                    this.processEvent(ce);
                }
                catch (Exception e) {
                    this.LOG.error(e.toString(), (Throwable)e);
                }
            }
            this.replayId = PubSubApiClient.base64EncodeByteString(fetchResponse.getLatestReplayId());
            int nextRequestSize = this.consumer.getBatchSize() - fetchResponse.getPendingNumRequested();
            if (nextRequestSize > 0) {
                FetchRequest fetchRequest = FetchRequest.newBuilder().setTopicName(topic).setNumRequested(nextRequestSize).build();
                this.LOG.debug("Sending FetchRequest, num_requested: {}", (Object)nextRequestSize);
                this.serverStream.onNext((Object)fetchRequest);
            }
        }

        public void onError(Throwable throwable) {
            PubSubApiClient.this.observerMap.remove((Object)this.consumer);
            if (throwable instanceof StatusRuntimeException) {
                StatusRuntimeException e = (StatusRuntimeException)throwable;
                this.LOG.error("GRPC Exception", (Throwable)e);
                Metadata trailers = e.getTrailers();
                String errorCode = "";
                this.LOG.error("Trailers:");
                if (trailers != null) {
                    trailers.keys().forEach(trailer -> this.LOG.error("Trailer: {}, Value: {}", trailer, trailers.get(Metadata.Key.of((String)trailer, (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER))));
                    errorCode = (String)trailers.get(Metadata.Key.of((String)"error-code", (Metadata.AsciiMarshaller)Metadata.ASCII_STRING_MARSHALLER));
                }
                if (errorCode != null) {
                    switch (errorCode) {
                        case "sfdc.platform.eventbus.grpc.service.auth.error": 
                        case "sfdc.platform.eventbus.grpc.service.auth.refresh.invalid": {
                            this.LOG.error("attempting login");
                            PubSubApiClient.this.session.attemptLoginUntilSuccessful(PubSubApiClient.this.backoffIncrement, PubSubApiClient.this.maxBackoff);
                            this.LOG.debug("logged in {}", (Object)this.consumer.getTopic());
                            break;
                        }
                        default: {
                            this.LOG.error("unexpected errorCode: {}", (Object)errorCode);
                        }
                    }
                }
            } else {
                this.LOG.error("An unexpected error occurred.", throwable);
            }
            this.LOG.debug("Attempting subscribe after error");
            this.resubscribeOnError();
        }

        private void resubscribeOnError() {
            if (this.replayId != null) {
                PubSubApiClient.this.subscribe(this.consumer, ReplayPreset.CUSTOM, this.replayId);
            } else if (PubSubApiClient.this.initialReplayPreset == ReplayPreset.CUSTOM) {
                PubSubApiClient.this.subscribe(this.consumer, PubSubApiClient.this.initialReplayPreset, PubSubApiClient.this.initialReplayId);
            } else {
                PubSubApiClient.this.subscribe(this.consumer, PubSubApiClient.this.initialReplayPreset, null);
            }
        }

        public void onCompleted() {
            this.LOG.debug("onCompleted() called by server");
            PubSubApiClient.this.observerMap.remove((Object)this.consumer);
        }

        public void setServerStream(StreamObserver<FetchRequest> serverStream) {
            this.serverStream = serverStream;
        }

        private void processEvent(ConsumerEvent ce) throws IOException {
            Schema schema = PubSubApiClient.this.getSchema(ce.getEvent().getSchemaId());
            Object record = switch (this.consumer.getDeserializeType()) {
                default -> throw new IncompatibleClassChangeError();
                case PubSubDeserializeType.AVRO -> this.deserializeAvro(ce, schema);
                case PubSubDeserializeType.GENERIC_RECORD -> this.deserializeGenericRecord(ce, schema);
                case PubSubDeserializeType.SPECIFIC_RECORD -> this.deserializeSpecificRecord(ce, schema);
                case PubSubDeserializeType.POJO -> this.deserializePojo(ce, schema);
                case PubSubDeserializeType.JSON -> this.deserializeJson(ce, schema);
            };
            String replayId = PubSubApiClient.base64EncodeByteString(ce.getReplayId());
            this.consumer.processEvent(record, replayId);
        }

        private Object deserializeAvro(ConsumerEvent ce, Schema schema) throws IOException {
            if (this.eventClassMap.containsKey(schema.getFullName())) {
                return this.deserializeSpecificRecord(ce, schema);
            }
            this.LOG.debug("No DTO found for schema: {}. Using GenericRecord.", (Object)schema.getFullName());
            return this.deserializeGenericRecord(ce, schema);
        }

        private Object deserializeJson(ConsumerEvent ce, Schema schema) throws IOException {
            GenericRecord record = this.deserializeGenericRecord(ce, schema);
            JsonAvroConverter converter = new JsonAvroConverter();
            byte[] bytes = converter.convertToJson(record);
            return new String(bytes);
        }

        private Object deserializePojo(ConsumerEvent ce, Schema schema) throws IOException {
            ReflectDatumReader reader = new ReflectDatumReader(this.pojoClass);
            reader.setSchema(schema);
            ByteArrayInputStream in = new ByteArrayInputStream(ce.getEvent().getPayload().toByteArray());
            BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder((InputStream)in, null);
            return reader.read(null, (Decoder)decoder);
        }

        private GenericRecord deserializeGenericRecord(ConsumerEvent ce, Schema schema) throws IOException {
            GenericDatumReader reader = new GenericDatumReader(schema);
            ByteArrayInputStream in = new ByteArrayInputStream(ce.getEvent().getPayload().toByteArray());
            BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder((InputStream)in, null);
            return (GenericRecord)reader.read(null, (Decoder)decoder);
        }

        private Object deserializeSpecificRecord(ConsumerEvent ce, Schema schema) throws IOException {
            Class<?> clas = this.eventClassMap.get(schema.getFullName());
            SpecificDatumReader reader = new SpecificDatumReader(clas);
            ByteArrayInputStream in = new ByteArrayInputStream(ce.getEvent().getPayload().toByteArray());
            BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder((InputStream)in, null);
            return reader.read(null, (Decoder)decoder);
        }
    }
}

