/*
 * Decompiled with CFR 0.152.
 */
package io.apicurio.registry.utils.streams.diservice;

import io.grpc.Channel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DistributedService<K, S>
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(DistributedService.class);
    private final KafkaStreams streams;
    private final HostInfo localApplicationServer;
    private final String storeName;
    private final Serde<K> keySerde;
    private final Function<? super HostInfo, ? extends Channel> grpcChannelProvider;
    private final boolean parallel;
    private final ConcurrentMap<HostInfo, S> hostInfo2service = new ConcurrentHashMap<HostInfo, S>();

    public DistributedService(KafkaStreams streams, HostInfo localApplicationServer, String storeName, Serde<K> keySerde, Function<? super HostInfo, ? extends Channel> grpcChannelProvider, boolean parallel) {
        this.streams = Objects.requireNonNull(streams, "streams");
        this.localApplicationServer = Objects.requireNonNull(localApplicationServer, "localApplicationServer");
        this.storeName = Objects.requireNonNull(storeName, "storeName");
        this.keySerde = Objects.requireNonNull(keySerde, "keySerde");
        this.grpcChannelProvider = Objects.requireNonNull(grpcChannelProvider, "grpcChannelProvider");
        this.parallel = parallel;
    }

    protected Serde<K> getKeySerde() {
        return this.keySerde;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        ConcurrentMap<HostInfo, S> concurrentMap = this.hostInfo2service;
        synchronized (concurrentMap) {
            Iterator serviceIter = this.hostInfo2service.entrySet().iterator();
            while (serviceIter.hasNext()) {
                Map.Entry entry = serviceIter.next();
                HostInfo hostInfo = (HostInfo)entry.getKey();
                Object service = entry.getValue();
                if (!this.localApplicationServer.equals((Object)hostInfo)) {
                    try {
                        ((AutoCloseable)service).close();
                    }
                    catch (Error | RuntimeException e) {
                        throw e;
                    }
                    catch (Exception e) {
                        throw new RuntimeException("Exception occurred closing the service", e);
                    }
                }
                serviceIter.remove();
            }
        }
    }

    protected final S serviceForKey(K key) {
        KeyQueryMetadata smeta = this.streams.queryMetadataForKey(this.storeName, key, this.keySerde.serializer());
        if (smeta == null) {
            throw new InvalidStateStoreException("StreamsMetadata is null?! Store-name: " + this.storeName + " Key: " + key);
        }
        if (smeta == KeyQueryMetadata.NOT_AVAILABLE) {
            throw new InvalidStateStoreException("StreamsMetadata is currently unavailable. This can occur during rebalance operations. Store-name: " + this.storeName + " Key: " + key);
        }
        return this.serviceForHostInfo(smeta.getActiveHost());
    }

    protected final Collection<S> allServicesForStore() {
        Collection smetas = this.streams.allMetadataForStore(this.storeName);
        if (smetas.isEmpty()) {
            throw new InvalidStateStoreException("StreamsMetadata is currently unavailable. This can occur during rebalance operations. Store-name: " + this.storeName);
        }
        ArrayList<S> services = new ArrayList<S>(smetas.size());
        for (StreamsMetadata smeta : smetas) {
            if (smeta.topicPartitions().size() <= 0) continue;
            services.add(this.serviceForHostInfo(smeta.hostInfo()));
        }
        return services;
    }

    protected final Stream<S> allServicesForStoreStream() {
        Collection<S> services = this.allServicesForStore();
        return this.parallel && services.size() > 1 ? services.parallelStream() : services.stream();
    }

    protected final Collection<S> allServices() {
        Collection smetas = this.streams.allMetadata();
        if (smetas.isEmpty()) {
            throw new StreamsException("StreamsMetadata is currently unavailable. This can occur during rebalance operations. ");
        }
        ArrayList<S> services = new ArrayList<S>(smetas.size());
        for (StreamsMetadata smeta : smetas) {
            if (smeta.topicPartitions().size() <= 0) continue;
            services.add(this.serviceForHostInfo(smeta.hostInfo()));
        }
        return services;
    }

    protected final Stream<S> allServicesStream() {
        Collection<S> services = this.allServices();
        return this.parallel && services.size() > 1 ? services.parallelStream() : services.stream();
    }

    private S serviceForHostInfo(HostInfo hostInfo) {
        return (S)this.hostInfo2service.computeIfAbsent(hostInfo, hInfo -> {
            if (this.localApplicationServer.equals(hInfo)) {
                log.info("Obtaining local service '{}' for host info '{}'", (Object)this.storeName, hInfo);
                return this.localService(this.storeName, this.streams);
            }
            log.info("Obtaining remote service '{}' for host info '{}'", (Object)this.storeName, hInfo);
            return this.remoteServiceGrpcClient(this.storeName, this.grpcChannelProvider.apply((HostInfo)hInfo), this.keySerde);
        });
    }

    protected abstract S localService(String var1, KafkaStreams var2);

    protected abstract S remoteServiceGrpcClient(String var1, Channel var2, Serde<K> var3);
}

