/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.internal.PartitionSubscriberFactory;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Preconditions;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class AssigningSubscriber
extends ProxyService
implements Subscriber {
    private final PartitionSubscriberFactory subscriberFactory;
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private final Map<Partition, Subscriber> liveSubscriberMap = new HashMap<Partition, Subscriber>();
    @GuardedBy(value="monitor.monitor")
    private final List<Subscriber> stoppingSubscribers = new ArrayList<Subscriber>();
    @GuardedBy(value="monitor.monitor")
    private boolean shutdown = false;

    public AssigningSubscriber(PartitionSubscriberFactory subscriberFactory, AssignerFactory assignerFactory) throws StatusException {
        this.subscriberFactory = subscriberFactory;
        Assigner assigner = assignerFactory.New(this::handleAssignment);
        this.addServices(assigner);
    }

    @Override
    protected void start() {
    }

    @Override
    protected void stop() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
            this.liveSubscriberMap.values().forEach(subscriber -> subscriber.stopAsync().awaitTerminated());
            this.liveSubscriberMap.clear();
            this.stoppingSubscribers.forEach(ApiService::awaitTerminated);
        }
    }

    @Override
    protected void handlePermanentError(StatusException error) {
        this.stop();
    }

    private void handleAssignment(Set<Partition> assignment) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            ImmutableSet livePartitions = ImmutableSet.copyOf(this.liveSubscriberMap.keySet());
            for (Partition partition : livePartitions) {
                if (assignment.contains(partition)) continue;
                this.stopSubscriber(this.liveSubscriberMap.remove(partition));
            }
            for (Partition partition : assignment) {
                if (this.liveSubscriberMap.containsKey(partition)) continue;
                this.startSubscriber(partition);
            }
        }
        catch (StatusException e) {
            this.onPermanentError(e);
        }
    }

    @GuardedBy(value="monitor.monitor")
    private void startSubscriber(Partition partition) throws StatusException {
        Preconditions.checkState(!this.liveSubscriberMap.containsKey(partition));
        final Subscriber subscriber = this.subscriberFactory.New(partition);
        subscriber.addListener(new ApiService.Listener(){

            public void failed(ApiService.State from, Throwable failure) {
                AssigningSubscriber.this.onPermanentError(ExtractStatus.toCanonical(failure));
            }

            public void terminated(ApiService.State from) {
                try (CloseableMonitor.Hold h = AssigningSubscriber.this.monitor.enter();){
                    AssigningSubscriber.this.stoppingSubscribers.remove(subscriber);
                }
            }
        }, MoreExecutors.directExecutor());
        this.liveSubscriberMap.put(partition, subscriber);
        subscriber.startAsync();
    }

    @GuardedBy(value="monitor.monitor")
    private void stopSubscriber(Subscriber subscriber) {
        this.stoppingSubscribers.add(subscriber);
        subscriber.stopAsync();
    }
}

