/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axondb.client;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.axoniq.axondb.Event;
import io.axoniq.axondb.client.AppendEventTransaction;
import io.axoniq.axondb.client.AxonDBConfiguration;
import io.axoniq.axondb.client.ChannelManager;
import io.axoniq.axondb.client.ClientConnectionException;
import io.axoniq.axondb.client.ContextAddingInterceptor;
import io.axoniq.axondb.client.SendingStreamObserver;
import io.axoniq.axondb.client.TokenAddingInterceptor;
import io.axoniq.axondb.client.util.Broadcaster;
import io.axoniq.axondb.client.util.EventCipher;
import io.axoniq.axondb.client.util.EventStoreClientException;
import io.axoniq.axondb.client.util.GrpcExceptionParser;
import io.axoniq.axondb.grpc.Confirmation;
import io.axoniq.axondb.grpc.EventStoreGrpc;
import io.axoniq.axondb.grpc.EventWithToken;
import io.axoniq.axondb.grpc.GetAggregateEventsRequest;
import io.axoniq.axondb.grpc.GetEventsRequest;
import io.axoniq.axondb.grpc.GetFirstTokenRequest;
import io.axoniq.axondb.grpc.GetLastTokenRequest;
import io.axoniq.axondb.grpc.GetTokenAtRequest;
import io.axoniq.axondb.grpc.QueryEventsRequest;
import io.axoniq.axondb.grpc.QueryEventsResponse;
import io.axoniq.axondb.grpc.ReadHighestSequenceNrRequest;
import io.axoniq.axondb.grpc.ReadHighestSequenceNrResponse;
import io.axoniq.axondb.grpc.TrackingToken;
import io.axoniq.platform.grpc.ClientIdentification;
import io.axoniq.platform.grpc.NodeInfo;
import io.axoniq.platform.grpc.PlatformInboundInstruction;
import io.axoniq.platform.grpc.PlatformInfo;
import io.axoniq.platform.grpc.PlatformOutboundInstruction;
import io.axoniq.platform.grpc.PlatformServiceGrpc;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AxonDBClient {
    private final Logger logger = LoggerFactory.getLogger(AxonDBClient.class);
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("axondbclient-pool-%d").build());
    private final AxonDBConfiguration eventStoreConfiguration;
    private final ClientInterceptor[] interceptors;
    private final EventCipher eventCipher;
    private final AtomicReference<PlatformInfo> eventStoreServer = new AtomicReference();
    private final ChannelManager channelManager;
    private final long commitTimeout;
    private boolean shutdown;
    private final Map<UUID, Runnable> connectionCloseListeners = new ConcurrentHashMap<UUID, Runnable>();
    private SendingStreamObserver<PlatformInboundInstruction> streamToAxonDB;

    public AxonDBClient(AxonDBConfiguration eventStoreConfiguration) {
        this.eventStoreConfiguration = eventStoreConfiguration;
        this.interceptors = new ClientInterceptor[]{new TokenAddingInterceptor(eventStoreConfiguration.getToken()), new ContextAddingInterceptor(eventStoreConfiguration.getContext())};
        this.channelManager = new ChannelManager(eventStoreConfiguration.isSslEnabled(), eventStoreConfiguration.getCertFile(), eventStoreConfiguration.getKeepAliveTime(), eventStoreConfiguration.getKeepAliveTimeout());
        this.eventCipher = eventStoreConfiguration.eventCipher();
        this.commitTimeout = eventStoreConfiguration.getCommitTimeout();
    }

    public void shutdown() {
        this.shutdown = true;
        this.channelManager.cleanup();
    }

    private EventStoreGrpc.EventStoreStub eventStoreStub() {
        return (EventStoreGrpc.EventStoreStub)EventStoreGrpc.newStub(this.getChannelToEventStore()).withInterceptors(this.interceptors);
    }

    private PlatformInfo discoverEventStore() {
        this.eventStoreServer.set(null);
        Broadcaster<PlatformInfo> b = new Broadcaster<PlatformInfo>(this.eventStoreConfiguration.serverNodes(), this::retrieveClusterInfo, this::nodeReceived);
        try {
            b.broadcast(1, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ClientConnectionException("Thread was interrupted while attempting to connect to the server", e);
        }
        return this.eventStoreServer.get();
    }

    private void nodeReceived(PlatformInfo node) {
        this.logger.info("Received: {}:{}", (Object)node.getPrimary().getHostName(), (Object)node.getPrimary().getGrpcPort());
        this.eventStoreServer.set(node);
    }

    private void retrieveClusterInfo(NodeInfo nodeInfo, StreamObserver<PlatformInfo> streamObserver) {
        ManagedChannel channel = this.channelManager.getChannel(nodeInfo);
        PlatformServiceGrpc.PlatformServiceStub clusterManagerStub = (PlatformServiceGrpc.PlatformServiceStub)PlatformServiceGrpc.newStub((Channel)channel).withInterceptors(this.interceptors);
        clusterManagerStub.getPlatformServer(ClientIdentification.newBuilder().build(), streamObserver);
    }

    private Channel getChannelToEventStore() {
        if (this.shutdown) {
            return null;
        }
        CompletableFuture<PlatformInfo> masterInfoCompletableFuture = new CompletableFuture<PlatformInfo>();
        this.getEventStoreAsync(this.eventStoreConfiguration.getConnectionRetryCount(), masterInfoCompletableFuture);
        try {
            return this.channelManager.getChannel(masterInfoCompletableFuture.get().getPrimary());
        }
        catch (ExecutionException e) {
            throw (RuntimeException)e.getCause();
        }
        catch (InterruptedException e) {
            throw new EventStoreClientException("AXONIQ-0001", e.getMessage(), e);
        }
    }

    private void getEventStoreAsync(int retries, CompletableFuture<PlatformInfo> result) {
        PlatformInfo currentEventStore = this.eventStoreServer.get();
        if (currentEventStore != null) {
            result.complete(currentEventStore);
        } else {
            currentEventStore = this.discoverEventStore();
            if (currentEventStore != null) {
                this.openStream(currentEventStore);
                result.complete(currentEventStore);
            } else if (retries > 0) {
                this.executorService.schedule(() -> this.getEventStoreAsync(retries - 1, result), this.eventStoreConfiguration.getConnectionRetry(), TimeUnit.MILLISECONDS);
            } else {
                result.completeExceptionally(new EventStoreClientException("AXONIQ-0001", "No available event store server"));
            }
        }
    }

    private void openStream(PlatformInfo currentEventStore) {
        ManagedChannel channel = this.channelManager.getChannel(currentEventStore.getPrimary());
        PlatformServiceGrpc.PlatformServiceStub platformService = (PlatformServiceGrpc.PlatformServiceStub)PlatformServiceGrpc.newStub((Channel)channel).withInterceptors(this.interceptors);
        this.streamToAxonDB = new SendingStreamObserver<PlatformInboundInstruction>(platformService.openStream(new StreamObserver<PlatformOutboundInstruction>(){

            public void onNext(PlatformOutboundInstruction value) {
            }

            public void onError(Throwable t) {
                AxonDBClient.this.stopChannelToEventStore();
            }

            public void onCompleted() {
                AxonDBClient.this.stopChannelToEventStore();
            }
        }));
        this.streamToAxonDB.onNext(PlatformInboundInstruction.newBuilder().setRegister(ClientIdentification.getDefaultInstance()).build());
    }

    private void stopChannelToEventStore() {
        PlatformInfo current = this.eventStoreServer.getAndSet(null);
        if (current != null) {
            this.logger.info("Shutting down gRPC channel");
            this.connectionCloseListeners.forEach((key, callback) -> callback.run());
            this.connectionCloseListeners.clear();
            this.channelManager.shutdown(current);
            this.closeStream();
        }
    }

    private void closeStream() {
        if (this.streamToAxonDB != null) {
            this.streamToAxonDB.onCompleted();
            this.streamToAxonDB = null;
        }
    }

    public Stream<Event> listAggregateEvents(final GetAggregateEventsRequest request) throws ExecutionException, InterruptedException {
        final CompletableFuture stream = new CompletableFuture();
        final long before = System.currentTimeMillis();
        this.eventStoreStub().listAggregateEvents(request, new StreamObserver<Event>(){
            Stream.Builder<Event> eventStream = Stream.builder();
            int count;

            public void onNext(Event event) {
                this.eventStream.accept(AxonDBClient.this.eventCipher.decrypt(event));
                ++this.count;
            }

            public void onError(Throwable throwable) {
                AxonDBClient.this.checkConnectionException(throwable);
                stream.completeExceptionally(GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
                stream.complete(this.eventStream.build());
                if (AxonDBClient.this.logger.isDebugEnabled()) {
                    AxonDBClient.this.logger.debug("Done request for {}: {}ms, {} events", new Object[]{request.getAggregateId(), System.currentTimeMillis() - before, this.count});
                }
            }
        });
        return (Stream)stream.get();
    }

    public StreamObserver<GetEventsRequest> listEvents(final StreamObserver<EventWithToken> responseStreamObserver) {
        final UUID id = UUID.randomUUID();
        StreamObserver<EventWithToken> wrappedStreamObserver = new StreamObserver<EventWithToken>(){

            public void onNext(EventWithToken eventWithToken) {
                responseStreamObserver.onNext((Object)AxonDBClient.this.eventCipher.decrypt(eventWithToken));
            }

            public void onError(Throwable throwable) {
                AxonDBClient.this.checkConnectionException(throwable);
                responseStreamObserver.onError((Throwable)GrpcExceptionParser.parse(throwable));
                AxonDBClient.this.connectionCloseListeners.remove(id);
            }

            public void onCompleted() {
                responseStreamObserver.onCompleted();
                AxonDBClient.this.connectionCloseListeners.remove(id);
            }
        };
        StreamObserver<GetEventsRequest> requestStream = this.eventStoreStub().listEvents(wrappedStreamObserver);
        this.connectionCloseListeners.put(id, () -> {
            try {
                requestStream.onCompleted();
            }
            catch (Exception exception) {
                // empty catch block
            }
            responseStreamObserver.onError((Throwable)new RuntimeException("Connection to AxonDB lost"));
        });
        return requestStream;
    }

    public CompletableFuture<Confirmation> appendSnapshot(Event snapshot) {
        CompletableFuture<Confirmation> confirmationFuture = new CompletableFuture<Confirmation>();
        this.eventStoreStub().appendSnapshot(this.eventCipher.encrypt(snapshot), new SingleResultStreamObserver<Confirmation>(confirmationFuture));
        return confirmationFuture;
    }

    public AppendEventTransaction createAppendEventConnection() {
        CompletableFuture<Confirmation> futureConfirmation = new CompletableFuture<Confirmation>();
        return new AppendEventTransaction(this.eventStoreStub().appendEvent(new SingleResultStreamObserver<Confirmation>(futureConfirmation)), futureConfirmation, this.commitTimeout, this.eventCipher);
    }

    private void checkConnectionException(Throwable ex) {
        if (ex instanceof StatusRuntimeException && ((StatusRuntimeException)ex).getStatus().getCode().equals((Object)Status.UNAVAILABLE.getCode())) {
            this.stopChannelToEventStore();
        }
    }

    public StreamObserver<QueryEventsRequest> query(final StreamObserver<QueryEventsResponse> responseStreamObserver) {
        StreamObserver<QueryEventsResponse> wrappedStreamObserver = new StreamObserver<QueryEventsResponse>(){

            public void onNext(QueryEventsResponse eventWithToken) {
                responseStreamObserver.onNext((Object)eventWithToken);
            }

            public void onError(Throwable throwable) {
                AxonDBClient.this.checkConnectionException(throwable);
                responseStreamObserver.onError((Throwable)GrpcExceptionParser.parse(throwable));
            }

            public void onCompleted() {
                responseStreamObserver.onCompleted();
            }
        };
        return this.eventStoreStub().queryEvents(wrappedStreamObserver);
    }

    public CompletableFuture<ReadHighestSequenceNrResponse> lastSequenceNumberFor(String aggregateIdentifier) {
        CompletableFuture<ReadHighestSequenceNrResponse> completableFuture = new CompletableFuture<ReadHighestSequenceNrResponse>();
        this.eventStoreStub().readHighestSequenceNr(ReadHighestSequenceNrRequest.newBuilder().setAggregateId(aggregateIdentifier).build(), new SingleResultStreamObserver<ReadHighestSequenceNrResponse>(completableFuture));
        return completableFuture;
    }

    public CompletableFuture<TrackingToken> getLastToken() {
        CompletableFuture<TrackingToken> trackingTokenFuture = new CompletableFuture<TrackingToken>();
        this.eventStoreStub().getLastToken(GetLastTokenRequest.getDefaultInstance(), new SingleResultStreamObserver<TrackingToken>(trackingTokenFuture));
        return trackingTokenFuture;
    }

    public CompletableFuture<TrackingToken> getFirstToken() {
        CompletableFuture<TrackingToken> trackingTokenFuture = new CompletableFuture<TrackingToken>();
        this.eventStoreStub().getFirstToken(GetFirstTokenRequest.getDefaultInstance(), new SingleResultStreamObserver<TrackingToken>(trackingTokenFuture));
        return trackingTokenFuture;
    }

    public CompletableFuture<TrackingToken> getTokenAt(Instant instant) {
        CompletableFuture<TrackingToken> trackingTokenFuture = new CompletableFuture<TrackingToken>();
        this.eventStoreStub().getTokenAt(GetTokenAtRequest.newBuilder().setInstant(instant.toEpochMilli()).build(), new SingleResultStreamObserver<TrackingToken>(trackingTokenFuture));
        return trackingTokenFuture;
    }

    private class SingleResultStreamObserver<T>
    implements StreamObserver<T> {
        private final CompletableFuture<T> future;

        private SingleResultStreamObserver(CompletableFuture<T> future) {
            this.future = future;
        }

        public void onNext(T t) {
            this.future.complete(t);
        }

        public void onError(Throwable throwable) {
            AxonDBClient.this.checkConnectionException(throwable);
            this.future.completeExceptionally(GrpcExceptionParser.parse(throwable));
        }

        public void onCompleted() {
            if (!this.future.isDone()) {
                this.future.completeExceptionally(new EventStoreClientException("AXONIQ-0001", "Async call completed before answer"));
            }
        }
    }
}

