/*
 * Decompiled with CFR 0.152.
 */
package com.hedera.hashgraph.sdk;

import com.hedera.hashgraph.sdk.Client;
import com.hedera.hashgraph.sdk.Executable;
import com.hedera.hashgraph.sdk.InstantConverter;
import com.hedera.hashgraph.sdk.SubscriptionHandle;
import com.hedera.hashgraph.sdk.TopicId;
import com.hedera.hashgraph.sdk.TopicMessage;
import com.hedera.hashgraph.sdk.proto.Timestamp;
import com.hedera.hashgraph.sdk.proto.TransactionID;
import com.hedera.hashgraph.sdk.proto.mirror.ConsensusServiceGrpc;
import com.hedera.hashgraph.sdk.proto.mirror.ConsensusTopicQuery;
import com.hedera.hashgraph.sdk.proto.mirror.ConsensusTopicResponse;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class TopicMessageQuery {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopicMessageQuery.class);
    private final ConsensusTopicQuery.Builder builder;
    private Runnable completionHandler = this::onComplete;
    private BiConsumer<Throwable, TopicMessage> errorHandler = this::onError;
    private int maxAttempts = 10;
    private Duration maxBackoff = Duration.ofSeconds(8L);
    private Predicate<Throwable> retryHandler = this::shouldRetry;

    public TopicMessageQuery() {
        this.builder = ConsensusTopicQuery.newBuilder();
    }

    public TopicMessageQuery setTopicId(TopicId topicId) {
        Objects.requireNonNull(topicId, "topicId must not be null");
        this.builder.setTopicID(topicId.toProtobuf());
        return this;
    }

    public TopicMessageQuery setStartTime(Instant startTime) {
        Objects.requireNonNull(startTime, "startTime must not be null");
        this.builder.setConsensusStartTime(InstantConverter.toProtobuf(startTime));
        return this;
    }

    public TopicMessageQuery setEndTime(Instant endTime) {
        Objects.requireNonNull(endTime, "endTime must not be null");
        this.builder.setConsensusEndTime(InstantConverter.toProtobuf(endTime));
        return this;
    }

    public TopicMessageQuery setLimit(long limit) {
        this.builder.setLimit(limit);
        return this;
    }

    public TopicMessageQuery setCompletionHandler(Runnable completionHandler) {
        Objects.requireNonNull(completionHandler, "completionHandler must not be null");
        this.completionHandler = completionHandler;
        return this;
    }

    public TopicMessageQuery setErrorHandler(BiConsumer<Throwable, TopicMessage> errorHandler) {
        Objects.requireNonNull(errorHandler, "errorHandler must not be null");
        this.errorHandler = errorHandler;
        return this;
    }

    public TopicMessageQuery setMaxAttempts(int maxAttempts) {
        if (maxAttempts < 0) {
            throw new IllegalArgumentException("maxAttempts must be positive");
        }
        this.maxAttempts = maxAttempts;
        return this;
    }

    @SuppressFBWarnings(value={"EI_EXPOSE_REP2"}, justification="A Duration can't actually be mutated")
    public TopicMessageQuery setMaxBackoff(Duration maxBackoff) {
        if (maxBackoff == null || maxBackoff.toMillis() < 500L) {
            throw new IllegalArgumentException("maxBackoff must be at least 500 ms");
        }
        this.maxBackoff = maxBackoff;
        return this;
    }

    public TopicMessageQuery setRetryHandler(Predicate<Throwable> retryHandler) {
        Objects.requireNonNull(retryHandler, "retryHandler must not be null");
        this.retryHandler = retryHandler;
        return this;
    }

    private void onComplete() {
        TopicId topicId = TopicId.fromProtobuf(this.builder.getTopicID());
        LOGGER.info("Subscription to topic {} complete", (Object)topicId);
    }

    private void onError(Throwable throwable, TopicMessage topicMessage) {
        TopicId topicId = TopicId.fromProtobuf(this.builder.getTopicID());
        LOGGER.error("Error attempting to subscribe to topic {}:", (Object)topicId, (Object)throwable);
    }

    private boolean shouldRetry(Throwable throwable) {
        if (throwable instanceof StatusRuntimeException) {
            StatusRuntimeException statusRuntimeException = (StatusRuntimeException)throwable;
            Status.Code code = statusRuntimeException.getStatus().getCode();
            String description = statusRuntimeException.getStatus().getDescription();
            return code == Status.Code.NOT_FOUND || code == Status.Code.UNAVAILABLE || code == Status.Code.RESOURCE_EXHAUSTED || code == Status.Code.INTERNAL && description != null && Executable.RST_STREAM.matcher(description).matches();
        }
        return false;
    }

    public SubscriptionHandle subscribe(Client client, Consumer<TopicMessage> onNext) {
        SubscriptionHandle subscriptionHandle = new SubscriptionHandle();
        HashMap<TransactionID, ArrayList<ConsensusTopicResponse>> pendingMessages = new HashMap<TransactionID, ArrayList<ConsensusTopicResponse>>();
        try {
            this.makeStreamingCall(client, subscriptionHandle, onNext, 0, new AtomicLong(), new AtomicReference<ConsensusTopicResponse>(), pendingMessages);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return subscriptionHandle;
    }

    private void makeStreamingCall(final Client client, final SubscriptionHandle subscriptionHandle, final Consumer<TopicMessage> onNext, final int attempt, final AtomicLong counter, final AtomicReference<ConsensusTopicResponse> lastMessage, final HashMap<TransactionID, ArrayList<ConsensusTopicResponse>> pendingMessages) throws InterruptedException {
        final ClientCall call = client.mirrorNetwork.getNextMirrorNode().getChannel().newCall(ConsensusServiceGrpc.getSubscribeTopicMethod(), CallOptions.DEFAULT);
        subscriptionHandle.setOnUnsubscribe(() -> call.cancel("unsubscribe", null));
        ConsensusTopicQuery.Builder newBuilder = this.builder;
        if (lastMessage.get() != null) {
            newBuilder = (ConsensusTopicQuery.Builder)this.builder.clone();
            if (this.builder.getLimit() > 0L) {
                newBuilder.setLimit(this.builder.getLimit() - counter.get());
            }
            Timestamp lastStartTime = lastMessage.get().getConsensusTimestamp();
            Timestamp.Builder nextStartTime = Timestamp.newBuilder(lastStartTime).setNanos(lastStartTime.getNanos() + 1);
            newBuilder.setConsensusStartTime(nextStartTime);
        }
        ClientCalls.asyncServerStreamingCall((ClientCall)call, (Object)((ConsensusTopicQuery)newBuilder.build()), (StreamObserver)new StreamObserver<ConsensusTopicResponse>(){

            public void onNext(ConsensusTopicResponse consensusTopicResponse) {
                counter.incrementAndGet();
                lastMessage.set(consensusTopicResponse);
                if (!consensusTopicResponse.hasChunkInfo() || consensusTopicResponse.getChunkInfo().getTotal() == 1) {
                    TopicMessage message = TopicMessage.ofSingle(consensusTopicResponse);
                    try {
                        onNext.accept(message);
                    }
                    catch (Throwable t) {
                        TopicMessageQuery.this.errorHandler.accept(t, message);
                    }
                    return;
                }
                TransactionID initialTransactionID = consensusTopicResponse.getChunkInfo().getInitialTransactionID();
                if (!pendingMessages.containsKey(initialTransactionID)) {
                    pendingMessages.put(initialTransactionID, new ArrayList());
                }
                ArrayList chunks = (ArrayList)pendingMessages.get(initialTransactionID);
                Objects.requireNonNull(chunks).add(consensusTopicResponse);
                if (chunks.size() == consensusTopicResponse.getChunkInfo().getTotal()) {
                    TopicMessage message = TopicMessage.ofMany(chunks);
                    try {
                        onNext.accept(message);
                    }
                    catch (Throwable t) {
                        TopicMessageQuery.this.errorHandler.accept(t, message);
                    }
                }
            }

            public void onError(Throwable t) {
                if (attempt >= TopicMessageQuery.this.maxAttempts || !TopicMessageQuery.this.retryHandler.test(t)) {
                    TopicMessageQuery.this.errorHandler.accept(t, null);
                    return;
                }
                long delay = Math.min(500L * (long)Math.pow(2.0, attempt), TopicMessageQuery.this.maxBackoff.toMillis());
                TopicId topicId = TopicId.fromProtobuf(TopicMessageQuery.this.builder.getTopicID());
                LOGGER.warn("Error subscribing to topic {} during attempt #{}. Waiting {} ms before next attempt: {}", new Object[]{topicId, attempt, delay, t.getMessage()});
                call.cancel("unsubscribed", null);
                try {
                    Thread.sleep(delay);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                try {
                    TopicMessageQuery.this.makeStreamingCall(client, subscriptionHandle, onNext, attempt + 1, counter, lastMessage, pendingMessages);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            public void onCompleted() {
                TopicMessageQuery.this.completionHandler.run();
            }
        });
    }
}

