/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.internal.PartitionSubscriberFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.ApiServiceUtils;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class AssigningSubscriber
extends ProxyService
implements Subscriber {
    private static final GoogleLogger LOG = GoogleLogger.forEnclosingClass();
    private final PartitionSubscriberFactory subscriberFactory;
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private final Map<Partition, Subscriber> liveSubscriberMap = new HashMap<Partition, Subscriber>();
    @GuardedBy(value="monitor.monitor")
    private final List<Subscriber> stoppingSubscribers = new ArrayList<Subscriber>();
    @GuardedBy(value="monitor.monitor")
    private boolean shutdown = false;

    public AssigningSubscriber(PartitionSubscriberFactory subscriberFactory, AssignerFactory assignerFactory) throws ApiException {
        this.subscriberFactory = subscriberFactory;
        Assigner assigner = assignerFactory.New(this::handleAssignment);
        this.addServices(assigner);
    }

    @Override
    protected void start() {
    }

    @Override
    protected void stop() {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
            ApiServiceUtils.blockingShutdown(this.liveSubscriberMap.values());
            this.liveSubscriberMap.clear();
            ApiServiceUtils.blockingShutdown(this.stoppingSubscribers);
        }
    }

    @Override
    protected void handlePermanentError(CheckedApiException error) {
        this.stop();
    }

    private void handleAssignment(Set<Partition> assignment) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            ImmutableSet livePartitions = ImmutableSet.copyOf(this.liveSubscriberMap.keySet());
            for (Partition partition : livePartitions) {
                if (assignment.contains(partition)) continue;
                this.stopSubscriber(this.liveSubscriberMap.remove(partition));
            }
            for (Partition partition : assignment) {
                if (this.liveSubscriberMap.containsKey(partition)) continue;
                this.startSubscriber(partition);
            }
        }
        catch (Throwable t) {
            this.onPermanentError(ExtractStatus.toCanonical(t));
        }
    }

    @GuardedBy(value="monitor.monitor")
    private void startSubscriber(Partition partition) throws CheckedApiException {
        CheckedApiPreconditions.checkState(!this.liveSubscriberMap.containsKey(partition));
        final Subscriber subscriber = this.subscriberFactory.newSubscriber(partition);
        subscriber.addListener(new ApiService.Listener(){

            public void failed(ApiService.State from, Throwable failure) {
                if (ApiService.State.STOPPING.equals((Object)from)) {
                    return;
                }
                AssigningSubscriber.this.onPermanentError(ExtractStatus.toCanonical(failure));
            }

            public void terminated(ApiService.State from) {
                try (CloseableMonitor.Hold h = AssigningSubscriber.this.monitor.enter();){
                    AssigningSubscriber.this.stoppingSubscribers.remove(subscriber);
                }
            }
        }, SystemExecutors.getFuturesExecutor());
        this.liveSubscriberMap.put(partition, subscriber);
        subscriber.startAsync();
    }

    @GuardedBy(value="monitor.monitor")
    private void stopSubscriber(Subscriber subscriber) {
        this.stoppingSubscribers.add(subscriber);
        subscriber.stopAsync();
    }
}

