/*
 * Decompiled with CFR 0.152.
 */
package com.tencent.polaris.discovery.client.flow;

import com.tencent.polaris.api.exception.PolarisException;
import com.tencent.polaris.api.listener.ServiceListener;
import com.tencent.polaris.api.plugin.compose.Extensions;
import com.tencent.polaris.api.plugin.registry.AbstractResourceEventListener;
import com.tencent.polaris.api.plugin.registry.ResourceEventListener;
import com.tencent.polaris.api.pojo.Instance;
import com.tencent.polaris.api.pojo.RegistryCacheValue;
import com.tencent.polaris.api.pojo.ServiceChangeEvent;
import com.tencent.polaris.api.pojo.ServiceEventKey;
import com.tencent.polaris.api.pojo.ServiceKey;
import com.tencent.polaris.api.rpc.InstancesResponse;
import com.tencent.polaris.api.rpc.WatchServiceResponse;
import com.tencent.polaris.api.utils.CollectionUtils;
import com.tencent.polaris.client.pojo.ServiceInstancesByProto;
import com.tencent.polaris.client.util.NamedThreadFactory;
import com.tencent.polaris.client.util.Utils;
import com.tencent.polaris.discovery.client.flow.CommonUnWatchServiceRequest;
import com.tencent.polaris.discovery.client.flow.CommonWatchServiceRequest;
import com.tencent.polaris.discovery.client.flow.SyncFlow;
import com.tencent.polaris.logging.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;

public class WatchFlow {
    private static final Logger LOG = LoggerFactory.getLogger(SyncFlow.class);
    private static final Logger UPDATE_EVENT_LOG = LoggerFactory.getLogger((String)"polaris-update-event-async");
    private static final Map<ServiceKey, Set<ServiceListener>> watchers = new ConcurrentHashMap<ServiceKey, Set<ServiceListener>>();
    private final AtomicBoolean initialize = new AtomicBoolean(false);
    private Extensions extensions;
    private SyncFlow syncFlow;
    private DispatchExecutor executor;

    public void init(Extensions extensions, SyncFlow syncFlow) {
        this.extensions = extensions;
        this.syncFlow = syncFlow;
        this.initFlow();
    }

    public WatchServiceResponse commonWatchService(CommonWatchServiceRequest request) throws PolarisException {
        ServiceKey serviceKey = request.getSvcEventKey().getServiceKey();
        InstancesResponse response = this.syncFlow.commonSyncGetAllInstances(request.getAllRequest());
        watchers.computeIfAbsent(request.getSvcEventKey().getServiceKey(), key -> Collections.synchronizedSet(new HashSet()));
        List addListeners = request.getWatchServiceRequest().getListeners();
        Set<ServiceListener> existListeners = watchers.get(serviceKey);
        List<ServiceListener> firstAddedListeners = addListeners.stream().filter(serviceListener -> !existListeners.contains(serviceListener)).collect(Collectors.toList());
        if (CollectionUtils.isNotEmpty(firstAddedListeners)) {
            ServiceChangeEvent event = ServiceChangeEvent.builder().serviceKey(serviceKey).addInstances(Arrays.asList(response.getInstances())).allInstances(Arrays.asList(response.getInstances())).build();
            firstAddedListeners.forEach(serviceListener -> this.executor.execute(event.getServiceKey(), () -> serviceListener.onEvent(event)));
        }
        boolean result = existListeners.addAll(addListeners);
        return new WatchServiceResponse(response, result);
    }

    public WatchServiceResponse commonUnWatchService(CommonUnWatchServiceRequest request) throws PolarisException {
        boolean result = true;
        Set<ServiceListener> listeners = watchers.get(request.getSvcEventKey().getServiceKey());
        if (request.getRequest().isRemoveAll()) {
            watchers.remove(request.getSvcEventKey().getServiceKey());
        } else if (CollectionUtils.isNotEmpty(listeners)) {
            result = listeners.removeAll(request.getRequest().getListeners());
        }
        return new WatchServiceResponse(null, result);
    }

    private void initFlow() {
        if (this.initialize.compareAndSet(false, true)) {
            this.extensions.getLocalRegistry().registerResourceListener((ResourceEventListener)new InstanceChangeListener());
            this.executor = new DispatchExecutor(this.extensions.getConfiguration().getConsumer().getSubscribe().getCallbackConcurrency());
        }
    }

    private static class DispatchExecutor {
        private final Executor[] executors;

        public DispatchExecutor(int nThread) {
            if (nThread < 1) {
                nThread = 1;
            }
            this.executors = new Executor[nThread];
            for (int i = 0; i < nThread; ++i) {
                this.executors[i] = Executors.newFixedThreadPool(1, (ThreadFactory)new NamedThreadFactory("service-watch-dispatch" + i));
            }
        }

        public void execute(ServiceKey serviceKey, Runnable command) {
            int code = Math.abs(serviceKey.hashCode());
            Executor executor = this.executors[code % this.executors.length];
            executor.execute(command);
        }
    }

    private class InstanceChangeListener
    extends AbstractResourceEventListener {
        private final BiConsumer<ServiceChangeEvent, ServiceListener> consumer = (event, listener) -> WatchFlow.this.executor.execute(event.getServiceKey(), () -> listener.onEvent(event));

        private InstanceChangeListener() {
        }

        public void onResourceUpdated(ServiceEventKey svcEventKey, RegistryCacheValue oldValue, RegistryCacheValue newValue) {
            if (newValue.getEventType() != ServiceEventKey.EventType.INSTANCE) {
                return;
            }
            if (oldValue instanceof ServiceInstancesByProto && newValue instanceof ServiceInstancesByProto) {
                LOG.debug("receive service={} change event", (Object)svcEventKey);
                ServiceInstancesByProto oldIns = (ServiceInstancesByProto)oldValue;
                ServiceInstancesByProto newIns = (ServiceInstancesByProto)newValue;
                ServiceChangeEvent event = ServiceChangeEvent.builder().serviceKey(svcEventKey.getServiceKey()).addInstances(Utils.checkAddInstances((ServiceInstancesByProto)oldIns, (ServiceInstancesByProto)newIns)).updateInstances(Utils.checkUpdateInstances((ServiceInstancesByProto)oldIns, (ServiceInstancesByProto)newIns)).deleteInstances(Utils.checkDeleteInstances((ServiceInstancesByProto)oldIns, (ServiceInstancesByProto)newIns)).allInstances(newIns.getInstances()).build();
                this.logChangeInstances(svcEventKey, event, oldIns, newIns);
                Set<ServiceListener> listeners = watchers.getOrDefault(svcEventKey.getServiceKey(), Collections.emptySet());
                listeners.forEach(serviceListener -> this.consumer.accept(event, (ServiceListener)serviceListener));
            }
        }

        private void logChangeInstances(ServiceEventKey svcEventKey, ServiceChangeEvent event, ServiceInstancesByProto oldIns, ServiceInstancesByProto newIns) {
            UPDATE_EVENT_LOG.info("service instances of {} change, oldRevision {}, newRevision {}, oldCount {}, newCount {}.", new Object[]{svcEventKey, oldIns.getRevision(), newIns.getRevision(), oldIns.getInstances().size(), newIns.getInstances().size()});
            for (Instance addInst : event.getAddInstances()) {
                UPDATE_EVENT_LOG.info("add instance of {}: [{}:{}, status: {}].", new Object[]{svcEventKey, addInst.getHost(), addInst.getPort(), this.totalInstanceInfo(addInst)});
            }
            for (ServiceChangeEvent.OneInstanceUpdate oneInstanceUpdate : event.getUpdateInstances()) {
                UPDATE_EVENT_LOG.info("modify instance of {} from [{}:{}, status: {}] to [{}:{}, status: {}].", new Object[]{svcEventKey, oneInstanceUpdate.getBefore().getHost(), oneInstanceUpdate.getBefore().getPort(), this.totalInstanceInfo(oneInstanceUpdate.getBefore()), oneInstanceUpdate.getAfter().getHost(), oneInstanceUpdate.getAfter().getPort(), this.totalInstanceInfo(oneInstanceUpdate.getAfter())});
            }
            for (Instance delInst : event.getDeleteInstances()) {
                UPDATE_EVENT_LOG.info("delete instance of {}: [{}:{}, status: {}].", new Object[]{svcEventKey, delInst.getHost(), delInst.getPort(), this.totalInstanceInfo(delInst)});
            }
        }

        private String totalInstanceInfo(Instance instance) {
            return String.format("healthy:%s;isolate:%s;weight:%s", instance.isHealthy(), instance.isIsolated(), instance.getWeight());
        }
    }
}

