/*
 * Decompiled with CFR 0.152.
 */
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcResponseModel;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc.GrpcStreamHandler;
import dev.openfeature.flagd.grpc.sync.FlagSyncServiceGrpc;
import dev.openfeature.flagd.grpc.sync.Sync;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings(value={"PREDICTABLE_RANDOM", "EI_EXPOSE_REP"}, justification="Random is used to generate a variation & flag configurations require exposing")
public class GrpcStreamConnector
implements Connector {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GrpcStreamConnector.class);
    private static final Random RANDOM = new Random();
    private static final int INIT_BACK_OFF = 2000;
    private static final int MAX_BACK_OFF = 120000;
    private static final int QUEUE_SIZE = 5;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final BlockingQueue<StreamPayload> blockingQueue = new LinkedBlockingQueue<StreamPayload>(5);
    private final ManagedChannel channel;
    private final FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub;
    private final int deadline;
    private final String selector;

    public GrpcStreamConnector(FlagdOptions options) {
        this.channel = ChannelBuilder.nettyChannel(options);
        this.serviceStub = FlagSyncServiceGrpc.newStub((Channel)this.channel);
        this.deadline = options.getDeadline();
        this.selector = options.getSelector();
    }

    @Override
    public void init() {
        Thread listener = new Thread(() -> {
            try {
                Sync.SyncFlagsRequest.Builder requestBuilder = Sync.SyncFlagsRequest.newBuilder();
                if (this.selector != null) {
                    requestBuilder.setSelector(this.selector);
                }
                GrpcStreamConnector.observeEventStream(this.blockingQueue, this.shutdown, this.serviceStub, requestBuilder.build());
            }
            catch (InterruptedException e) {
                log.warn("gRPC event stream interrupted, flag configurations are stale", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        });
        listener.setDaemon(true);
        listener.start();
    }

    @Override
    public BlockingQueue<StreamPayload> getStream() {
        return this.blockingQueue;
    }

    @Override
    public void shutdown() throws InterruptedException {
        block5: {
            if (this.shutdown.getAndSet(true)) {
                return;
            }
            try {
                if (this.channel == null || this.channel.isShutdown()) break block5;
                this.channel.shutdown();
                this.channel.awaitTermination((long)this.deadline, TimeUnit.MILLISECONDS);
            }
            catch (Throwable throwable) {
                if (this.channel != null && !this.channel.isShutdown()) {
                    this.channel.shutdownNow();
                    this.channel.awaitTermination((long)this.deadline, TimeUnit.MILLISECONDS);
                    log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
                }
                throw throwable;
            }
        }
        if (this.channel != null && !this.channel.isShutdown()) {
            this.channel.shutdownNow();
            this.channel.awaitTermination((long)this.deadline, TimeUnit.MILLISECONDS);
            log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
        }
    }

    static void observeEventStream(BlockingQueue<StreamPayload> writeTo, AtomicBoolean shutdown, FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub, Sync.SyncFlagsRequest request) throws InterruptedException {
        LinkedBlockingQueue<GrpcResponseModel> streamReceiver = new LinkedBlockingQueue<GrpcResponseModel>(5);
        int retryDelay = 2000;
        log.info("Initializing sync stream observer");
        while (!shutdown.get()) {
            log.debug("Initializing sync stream request");
            serviceStub.syncFlags(request, new GrpcStreamHandler(streamReceiver));
            while (!shutdown.get()) {
                GrpcResponseModel response = (GrpcResponseModel)streamReceiver.take();
                if (response.isComplete()) {
                    log.info("Sync stream completed");
                    break;
                }
                if (response.getError() != null) {
                    log.error(String.format("Error from grpc connection, retrying in %dms", retryDelay), response.getError());
                    if (writeTo.offer(new StreamPayload(StreamPayloadType.ERROR, "Error from stream connection, retrying"))) break;
                    log.error("Failed to convey ERROR status, queue is full");
                    break;
                }
                Sync.SyncFlagsResponse flagsResponse = response.getSyncFlagsResponse();
                String data = flagsResponse.getFlagConfiguration();
                log.debug("Got stream response: " + data);
                if (!writeTo.offer(new StreamPayload(StreamPayloadType.DATA, data))) {
                    log.error("Stream writing failed");
                }
                retryDelay = 2000;
            }
            if (shutdown.get()) {
                log.info("Shutdown invoked, exiting event stream listener");
                return;
            }
            log.warn(String.format("Stream failed, retrying in %dms", retryDelay));
            Thread.sleep(retryDelay + RANDOM.nextInt(2000));
            if (retryDelay >= 120000) continue;
            retryDelay = 2 * retryDelay;
        }
        log.info("Shutdown invoked, exiting event stream listener");
    }
}

