/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axondb.client;

import io.axoniq.axondb.Event;
import io.axoniq.axondb.client.AppendEventTransaction;
import io.axoniq.axondb.client.AxonDBConfiguration;
import io.axoniq.axondb.client.ChannelManager;
import io.axoniq.axondb.client.ClientConnectionException;
import io.axoniq.axondb.client.ContextAddingInterceptor;
import io.axoniq.axondb.client.TokenAddingInterceptor;
import io.axoniq.axondb.client.util.Broadcaster;
import io.axoniq.axondb.client.util.EventCipher;
import io.axoniq.axondb.client.util.EventStoreClientException;
import io.axoniq.axondb.client.util.GrpcExceptionParser;
import io.axoniq.axondb.grpc.Confirmation;
import io.axoniq.axondb.grpc.EventStoreGrpc;
import io.axoniq.axondb.grpc.EventWithToken;
import io.axoniq.axondb.grpc.GetAggregateEventsRequest;
import io.axoniq.axondb.grpc.GetEventsRequest;
import io.axoniq.axondb.grpc.QueryEventsRequest;
import io.axoniq.axondb.grpc.QueryEventsResponse;
import io.axoniq.platform.grpc.ClientIdentification;
import io.axoniq.platform.grpc.NodeInfo;
import io.axoniq.platform.grpc.PlatformInfo;
import io.axoniq.platform.grpc.PlatformServiceGrpc;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonDBClient {
    private final Logger logger = LoggerFactory.getLogger(AxonDBClient.class);
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
    private final AxonDBConfiguration eventStoreConfiguration;
    private final ClientInterceptor[] interceptors;
    private final EventCipher eventCipher;
    private final AtomicReference<PlatformInfo> eventStoreServer = new AtomicReference();
    private final ChannelManager channelManager;
    private boolean shutdown;

    public AxonDBClient(AxonDBConfiguration eventStoreConfiguration) {
        this.eventStoreConfiguration = eventStoreConfiguration;
        this.interceptors = new ClientInterceptor[]{new TokenAddingInterceptor(eventStoreConfiguration.getToken()), new ContextAddingInterceptor(eventStoreConfiguration.getContext())};
        this.channelManager = new ChannelManager(eventStoreConfiguration.isSslEnabled(), eventStoreConfiguration.getCertFile());
        this.eventCipher = eventStoreConfiguration.getEventCipher();
    }

    public void shutdown() {
        this.shutdown = true;
        this.channelManager.cleanup();
    }

    private EventStoreGrpc.EventStoreStub eventStoreStub() {
        return (EventStoreGrpc.EventStoreStub)EventStoreGrpc.newStub(this.getChannelToEventStore()).withInterceptors(this.interceptors);
    }

    private PlatformInfo discoverEventStore() {
        this.eventStoreServer.set(null);
        Broadcaster<PlatformInfo> b = new Broadcaster<PlatformInfo>(this.eventStoreConfiguration.getServerNodes(), this::retrieveClusterInfo, this::nodeReceived);
        try {
            b.broadcast(1, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ClientConnectionException("Thread was interrupted while attempting to connect to the server", e);
        }
        return this.eventStoreServer.get();
    }

    private void nodeReceived(PlatformInfo node) {
        this.logger.info("Received: {}:{}", (Object)node.getPrimary().getHostName(), (Object)node.getPrimary().getGrpcPort());
        this.eventStoreServer.set(node);
    }

    private void retrieveClusterInfo(NodeInfo nodeInfo, StreamObserver<PlatformInfo> streamObserver) {
        ManagedChannel channel = this.channelManager.getChannel(nodeInfo);
        PlatformServiceGrpc.PlatformServiceStub clusterManagerStub = (PlatformServiceGrpc.PlatformServiceStub)PlatformServiceGrpc.newStub((Channel)channel).withInterceptors(new ClientInterceptor[]{new TokenAddingInterceptor(this.eventStoreConfiguration.getToken())});
        clusterManagerStub.getPlatformServer(ClientIdentification.newBuilder().build(), streamObserver);
    }

    private Channel getChannelToEventStore() {
        if (this.shutdown) {
            return null;
        }
        CompletableFuture<PlatformInfo> masterInfoCompletableFuture = new CompletableFuture<PlatformInfo>();
        this.getEventStoreAsync(this.eventStoreConfiguration.getConnectionRetryCount(), masterInfoCompletableFuture);
        try {
            return this.channelManager.getChannel(masterInfoCompletableFuture.get().getPrimary());
        }
        catch (ExecutionException e) {
            throw (RuntimeException)e.getCause();
        }
        catch (InterruptedException e) {
            throw new EventStoreClientException("AXONIQ-0001", e.getMessage(), e);
        }
    }

    private void getEventStoreAsync(int retries, CompletableFuture<PlatformInfo> result) {
        PlatformInfo currentEventStore = this.eventStoreServer.get();
        if (currentEventStore != null) {
            result.complete(currentEventStore);
        } else {
            currentEventStore = this.discoverEventStore();
            if (currentEventStore != null) {
                result.complete(currentEventStore);
            } else if (retries > 0) {
                this.executorService.schedule(() -> this.getEventStoreAsync(retries - 1, result), this.eventStoreConfiguration.getConnectionRetry(), TimeUnit.MILLISECONDS);
            } else {
                result.completeExceptionally(new EventStoreClientException("AXONIQ-0001", "No available event store server"));
            }
        }
    }

    private void stopChannelToEventStore() {
        PlatformInfo current = this.eventStoreServer.getAndSet(null);
        if (current != null) {
            this.logger.info("Shutting down gRPC channel");
            this.channelManager.shutdown(current);
        }
    }

    public Stream<Event> listAggregateEvents(final GetAggregateEventsRequest request) throws ExecutionException, InterruptedException {
        final CompletableFuture stream = new CompletableFuture();
        final long before = System.currentTimeMillis();
        this.eventStoreStub().listAggregateEvents(request, new StreamObserver<Event>(){
            Stream.Builder<Event> eventStream = Stream.builder();
            int count;

            public void onNext(Event event) {
                this.eventStream.accept(AxonDBClient.this.eventCipher.decrypt(event));
                ++this.count;
            }

            public void onError(Throwable throwable) {
                AxonDBClient.this.checkConnectionException(throwable);
                stream.completeExceptionally(GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
                stream.complete(this.eventStream.build());
                if (AxonDBClient.this.logger.isDebugEnabled()) {
                    AxonDBClient.this.logger.debug("Done request for {}: {}ms, {} events", new Object[]{request.getAggregateId(), System.currentTimeMillis() - before, this.count});
                }
            }
        });
        return (Stream)stream.get();
    }

    public StreamObserver<GetEventsRequest> listEvents(final StreamObserver<EventWithToken> responseStreamObserver) {
        StreamObserver<EventWithToken> wrappedStreamObserver = new StreamObserver<EventWithToken>(){

            public void onNext(EventWithToken eventWithToken) {
                responseStreamObserver.onNext((Object)AxonDBClient.this.eventCipher.decrypt(eventWithToken));
            }

            public void onError(Throwable throwable) {
                AxonDBClient.this.checkConnectionException(throwable);
                responseStreamObserver.onError((Throwable)GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
                responseStreamObserver.onCompleted();
            }
        };
        return this.eventStoreStub().listEvents(wrappedStreamObserver);
    }

    public CompletableFuture<Confirmation> appendSnapshot(Event snapshot) {
        final CompletableFuture<Confirmation> confirmationFuture = new CompletableFuture<Confirmation>();
        this.eventStoreStub().appendSnapshot(this.eventCipher.encrypt(snapshot), new StreamObserver<Confirmation>(){

            public void onNext(Confirmation confirmation) {
                confirmationFuture.complete(confirmation);
            }

            public void onError(Throwable throwable) {
                AxonDBClient.this.checkConnectionException(throwable);
                confirmationFuture.completeExceptionally(GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
            }
        });
        return confirmationFuture;
    }

    public AppendEventTransaction createAppendEventConnection() {
        final CompletableFuture<Confirmation> futureConfirmation = new CompletableFuture<Confirmation>();
        return new AppendEventTransaction(this.eventStoreStub().appendEvent(new StreamObserver<Confirmation>(){

            public void onNext(Confirmation confirmation) {
                futureConfirmation.complete(confirmation);
            }

            public void onError(Throwable throwable) {
                AxonDBClient.this.checkConnectionException(throwable);
                futureConfirmation.completeExceptionally(GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
            }
        }), futureConfirmation, this.eventCipher);
    }

    private void checkConnectionException(Throwable ex) {
        if (ex instanceof StatusRuntimeException && ((StatusRuntimeException)ex).getStatus().getCode().equals((Object)Status.UNAVAILABLE.getCode())) {
            this.stopChannelToEventStore();
        }
    }

    public StreamObserver<QueryEventsRequest> query(final StreamObserver<QueryEventsResponse> responseStreamObserver) {
        StreamObserver<QueryEventsResponse> wrappedStreamObserver = new StreamObserver<QueryEventsResponse>(){

            public void onNext(QueryEventsResponse eventWithToken) {
                responseStreamObserver.onNext((Object)eventWithToken);
            }

            public void onError(Throwable throwable) {
                AxonDBClient.this.checkConnectionException(throwable);
                responseStreamObserver.onError((Throwable)GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
                responseStreamObserver.onCompleted();
            }
        };
        return this.eventStoreStub().queryEvents(wrappedStreamObserver);
    }
}

