/*
 * Decompiled with CFR 0.152.
 */
package dev.openfeature.contrib.providers.flagd.resolver.grpc;

import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.EventStreamObserver;
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
import dev.openfeature.flagd.grpc.evaluation.Evaluation;
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc;
import dev.openfeature.sdk.ProviderState;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings(justification="cache needs to be read and write by multiple objects")
public class GrpcConnector {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(GrpcConnector.class);
    private final Object sync = new Object();
    private final AtomicBoolean connected = new AtomicBoolean(false);
    private final Random random = new Random();
    private final ServiceGrpc.ServiceBlockingStub serviceBlockingStub;
    private final ServiceGrpc.ServiceStub serviceStub;
    private final ManagedChannel channel;
    private final int maxEventStreamRetries;
    private final int startEventStreamRetryBackoff;
    private final long deadline;
    private final Cache cache;
    private final Consumer<ProviderState> stateConsumer;
    private int eventStreamAttempt = 1;
    private int eventStreamRetryBackoff;
    private Thread eventObserverThread;

    public GrpcConnector(FlagdOptions options, Cache cache, Consumer<ProviderState> stateConsumer) {
        this.channel = ChannelBuilder.nettyChannel(options);
        this.serviceStub = ServiceGrpc.newStub((Channel)this.channel);
        this.serviceBlockingStub = ServiceGrpc.newBlockingStub((Channel)this.channel);
        this.maxEventStreamRetries = options.getMaxEventStreamRetries();
        this.startEventStreamRetryBackoff = options.getRetryBackoffMs();
        this.eventStreamRetryBackoff = options.getRetryBackoffMs();
        this.deadline = options.getDeadline();
        this.cache = cache;
        this.stateConsumer = stateConsumer;
    }

    public void initialize() throws Exception {
        this.eventObserverThread = new Thread(this::observeEventStream);
        this.eventObserverThread.setDaemon(true);
        this.eventObserverThread.start();
        Util.busyWaitAndCheck(this.deadline, this.connected);
    }

    public void shutdown() throws Exception {
        if (this.eventObserverThread != null) {
            this.eventObserverThread.interrupt();
        }
        try {
            if (this.channel != null && !this.channel.isShutdown()) {
                this.channel.shutdown();
                this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
            }
            this.cache.clear();
        }
        catch (Throwable throwable) {
            this.cache.clear();
            if (this.channel != null && !this.channel.isShutdown()) {
                this.channel.shutdownNow();
                this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
                log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
            }
            this.stateConsumer.accept(ProviderState.NOT_READY);
            throw throwable;
        }
        if (this.channel != null && !this.channel.isShutdown()) {
            this.channel.shutdownNow();
            this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
            log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
        }
        this.stateConsumer.accept(ProviderState.NOT_READY);
    }

    public ServiceGrpc.ServiceBlockingStub getResolver() {
        return (ServiceGrpc.ServiceBlockingStub)this.serviceBlockingStub.withDeadlineAfter(this.deadline, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void observeEventStream() {
        while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
            EventStreamObserver responseObserver = new EventStreamObserver(this.sync, this.cache, this::grpcStateConsumer);
            this.serviceStub.eventStream(Evaluation.EventStreamRequest.getDefaultInstance(), responseObserver);
            try {
                Object object = this.sync;
                synchronized (object) {
                    this.sync.wait();
                }
            }
            catch (InterruptedException e) {
                log.debug("interruption while waiting for condition", (Throwable)e);
                Thread.currentThread().interrupt();
            }
            ++this.eventStreamAttempt;
            this.eventStreamRetryBackoff = 2 * this.eventStreamRetryBackoff + this.random.nextInt(100);
            try {
                Thread.sleep(this.eventStreamRetryBackoff);
            }
            catch (InterruptedException e) {
                log.warn("interrupted while restarting gRPC Event Stream");
                Thread.currentThread().interrupt();
            }
        }
        log.error("failed to connect to event stream, exhausted retries");
        this.grpcStateConsumer(ProviderState.ERROR);
    }

    private void grpcStateConsumer(ProviderState state) {
        if (ProviderState.READY.equals((Object)state)) {
            this.eventStreamAttempt = 1;
            this.eventStreamRetryBackoff = this.startEventStreamRetryBackoff;
            this.connected.set(true);
        } else if (ProviderState.ERROR.equals((Object)state)) {
            this.connected.set(false);
        }
        this.stateConsumer.accept(state);
    }
}

