/*
 * Decompiled with CFR 0.152.
 */
package io.streamnative.oxia.client.shard;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import io.opentelemetry.api.common.Attributes;
import io.streamnative.oxia.client.CompositeConsumer;
import io.streamnative.oxia.client.grpc.CustomStatusCode;
import io.streamnative.oxia.client.grpc.OxiaStub;
import io.streamnative.oxia.client.metrics.Counter;
import io.streamnative.oxia.client.metrics.InstrumentProvider;
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.shard.HashRangeShardStrategy;
import io.streamnative.oxia.client.shard.NamespaceNotFoundException;
import io.streamnative.oxia.client.shard.Shard;
import io.streamnative.oxia.client.shard.ShardAssignmentsContainer;
import io.streamnative.oxia.client.util.Backoff;
import io.streamnative.oxia.proto.NamespaceShardsAssignment;
import io.streamnative.oxia.proto.ShardAssignments;
import io.streamnative.oxia.proto.ShardAssignmentsRequest;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShardManager
implements AutoCloseable,
StreamObserver<ShardAssignments> {
    private static final Logger log = LoggerFactory.getLogger(ShardManager.class);
    private final ScheduledExecutorService executor;
    private final OxiaStub stub;
    @NonNull
    private final ShardAssignmentsContainer assignments;
    @NonNull
    private final CompositeConsumer<ShardAssignmentChanges> callbacks;
    private final Counter shardAssignmentsEvents;
    private final Backoff backoff = new Backoff();
    private volatile boolean closed;
    private final CompletableFuture<Void> initialAssignmentsFuture = new CompletableFuture();

    @VisibleForTesting
    ShardManager(@NonNull ScheduledExecutorService executor, @NonNull OxiaStub stub, @NonNull ShardAssignmentsContainer assignments, @NonNull CompositeConsumer<ShardAssignmentChanges> callbacks, @NonNull InstrumentProvider instrumentProvider) {
        if (executor == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
        if (stub == null) {
            throw new NullPointerException("stub is marked non-null but is null");
        }
        if (assignments == null) {
            throw new NullPointerException("assignments is marked non-null but is null");
        }
        if (callbacks == null) {
            throw new NullPointerException("callbacks is marked non-null but is null");
        }
        if (instrumentProvider == null) {
            throw new NullPointerException("instrumentProvider is marked non-null but is null");
        }
        this.stub = stub;
        this.executor = executor;
        this.assignments = assignments;
        this.callbacks = callbacks;
        this.shardAssignmentsEvents = instrumentProvider.newCounter("oxia.client.shard.assignments.count", Unit.None, "The total count of received shard assignment events", Attributes.empty());
    }

    public ShardManager(ScheduledExecutorService executor, @NonNull OxiaStub stub, @NonNull InstrumentProvider instrumentProvider, @NonNull String namespace) {
        this(executor, stub, new ShardAssignmentsContainer(HashRangeShardStrategy.Xxh332HashRangeShardStrategy, namespace), new CompositeConsumer<ShardAssignmentChanges>(), instrumentProvider);
        if (stub == null) {
            throw new NullPointerException("stub is marked non-null but is null");
        }
        if (instrumentProvider == null) {
            throw new NullPointerException("instrumentProvider is marked non-null but is null");
        }
        if (namespace == null) {
            throw new NullPointerException("namespace is marked non-null but is null");
        }
    }

    @Override
    public void close() {
        this.closed = true;
    }

    public CompletableFuture<Void> start() {
        ShardAssignmentsRequest req = ShardAssignmentsRequest.newBuilder().setNamespace(this.assignments.getNamespace()).build();
        this.stub.async().getShardAssignments(req, this);
        return this.initialAssignmentsFuture;
    }

    public void onNext(ShardAssignments assignments) {
        this.shardAssignmentsEvents.increment();
        this.updateAssignments(assignments);
        this.backoff.reset();
        if (!this.initialAssignmentsFuture.isDone()) {
            this.initialAssignmentsFuture.complete(null);
        }
    }

    public void onError(Throwable error) {
        CustomStatusCode customStatusCode;
        String description;
        StatusRuntimeException statusError;
        Status status;
        if (this.closed) {
            return;
        }
        if (error instanceof StatusRuntimeException && (status = (statusError = (StatusRuntimeException)error).getStatus()).getCode() == Status.Code.UNKNOWN && (description = status.getDescription()) != null && (customStatusCode = CustomStatusCode.fromDescription(description)) == CustomStatusCode.ErrorNamespaceNotFound) {
            log.error("Namespace not found: {}", (Object)this.assignments.getNamespace());
            if (!this.initialAssignmentsFuture.isDone() && this.initialAssignmentsFuture.completeExceptionally(new NamespaceNotFoundException(this.assignments.getNamespace()))) {
                this.close();
            }
        }
        log.warn("Failed receiving shard assignments: {}", (Object)error.getMessage());
        this.executor.schedule(() -> {
            if (!this.closed) {
                log.info("Retry creating stream for shard assignments namespace={}", (Object)this.assignments.getNamespace());
                this.start();
            }
        }, this.backoff.nextDelayMillis(), TimeUnit.MILLISECONDS);
    }

    public void onCompleted() {
        if (this.closed) {
            return;
        }
        log.warn("Stream closed while receiving shard assignments");
        this.executor.schedule(() -> {
            if (!this.closed) {
                log.info("Retry creating stream for shard assignments after stream closed namespace={}", (Object)this.assignments.getNamespace());
                this.start();
            }
        }, this.backoff.nextDelayMillis(), TimeUnit.MILLISECONDS);
    }

    private void updateAssignments(ShardAssignments shardAssignments) {
        NamespaceShardsAssignment nsSharedAssignments = shardAssignments.getNamespacesMap().get(this.assignments.getNamespace());
        if (nsSharedAssignments == null) {
            throw new NamespaceNotFoundException(this.assignments.getNamespace(), true);
        }
        Set<Shard> updates = nsSharedAssignments.getAssignmentsList().stream().map(Shard::fromProto).collect(Collectors.toSet());
        Map<Long, Shard> updatedMap = ShardManager.recomputeShardHashBoundaries(this.assignments.allShards(), updates);
        ShardAssignmentChanges changes = ShardManager.computeShardLeaderChanges(this.assignments.allShards(), updatedMap);
        this.assignments.update(changes);
        this.callbacks.accept(changes);
    }

    @VisibleForTesting
    static Map<Long, Shard> recomputeShardHashBoundaries(Map<Long, Shard> assignments, Set<Shard> updates) {
        ArrayList toDelete = new ArrayList();
        updates.forEach(update -> update.findOverlapping(assignments.values()).forEach(existing -> {
            log.info("Deleting shard {} as it overlaps with {}", existing, update);
            toDelete.add(existing.id());
        }));
        return Collections.unmodifiableMap(Stream.concat(assignments.entrySet().stream().filter(e -> !toDelete.contains(e.getKey())).map(Map.Entry::getValue), updates.stream()).collect(Collectors.toMap(Shard::id, Function.identity())));
    }

    @VisibleForTesting
    static ShardAssignmentChanges computeShardLeaderChanges(Map<Long, Shard> oldAssignments, Map<Long, Shard> newAssignments) {
        Set removed = oldAssignments.values().stream().filter(shard -> !newAssignments.containsKey(shard.id())).collect(Collectors.toSet());
        Set added = newAssignments.values().stream().filter(shard -> !oldAssignments.containsKey(shard.id())).collect(Collectors.toSet());
        Set changed = oldAssignments.values().stream().filter(s -> newAssignments.containsKey(s.id())).filter(s -> !((Shard)newAssignments.get(s.id())).leader().equals(s.leader())).collect(Collectors.toSet());
        return new ShardAssignmentChanges(Collections.unmodifiableSet(added), Collections.unmodifiableSet(removed), Collections.unmodifiableSet(changed));
    }

    public long getShardForKey(String key) {
        return this.assignments.getShardForKey(key);
    }

    public Collection<Shard> allShards() {
        return this.assignments.allShards().values();
    }

    public Set<Long> allShardIds() {
        return this.assignments.allShardIds();
    }

    public String leader(long shardId) {
        return this.assignments.leader(shardId);
    }

    public void addCallback(@NonNull Consumer<ShardAssignmentChanges> callback) {
        if (callback == null) {
            throw new NullPointerException("callback is marked non-null but is null");
        }
        this.callbacks.add(callback);
    }

    private boolean isErrorRetryable(@NonNull Throwable ex) {
        if (ex == null) {
            throw new NullPointerException("ex is marked non-null but is null");
        }
        if (ex instanceof NamespaceNotFoundException) {
            NamespaceNotFoundException nsNotFoundError = (NamespaceNotFoundException)ex;
            return nsNotFoundError.isRetryable();
        }
        return true;
    }

    public record ShardAssignmentChanges(Set<Shard> added, Set<Shard> removed, Set<Shard> reassigned) {
    }
}

