/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.airlift.discovery.store;

import com.facebook.airlift.concurrent.Threads;
import com.facebook.airlift.discovery.client.ServiceDescriptor;
import com.facebook.airlift.discovery.client.ServiceSelector;
import com.facebook.airlift.discovery.store.BatchProcessor;
import com.facebook.airlift.discovery.store.Entry;
import com.facebook.airlift.discovery.store.RemoteStore;
import com.facebook.airlift.discovery.store.StoreConfig;
import com.facebook.airlift.http.client.BodyGenerator;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.ResponseHandler;
import com.facebook.airlift.http.client.StatusResponseHandler;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.node.NodeInfo;
import com.facebook.airlift.units.Duration;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import java.io.OutputStream;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.weakref.jmx.MBeanExporter;
import org.weakref.jmx.Managed;
import org.weakref.jmx.ObjectNames;

public class HttpRemoteStore
implements RemoteStore {
    private static final Logger log = Logger.get(HttpRemoteStore.class);
    private final int maxBatchSize;
    private final int queueSize;
    private final Duration updateInterval;
    private final ConcurrentMap<String, BatchProcessor<Entry>> processors = new ConcurrentHashMap<String, BatchProcessor<Entry>>();
    private final String name;
    private final NodeInfo node;
    private final ServiceSelector selector;
    private final HttpClient httpClient;
    private Future<?> future;
    private ScheduledExecutorService executor;
    private final AtomicLong lastRemoteServerRefreshTimestamp = new AtomicLong();
    private final MBeanExporter mbeanExporter;

    @Inject
    public HttpRemoteStore(String name, NodeInfo node, ServiceSelector selector, StoreConfig config, HttpClient httpClient, MBeanExporter mbeanExporter) {
        Objects.requireNonNull(name, "name is null");
        Objects.requireNonNull(node, "node is null");
        Objects.requireNonNull(selector, "selector is null");
        Objects.requireNonNull(httpClient, "httpClient is null");
        Objects.requireNonNull(config, "config is null");
        Objects.requireNonNull(mbeanExporter, "mBeanExporter is null");
        this.name = name;
        this.node = node;
        this.selector = selector;
        this.httpClient = httpClient;
        this.mbeanExporter = mbeanExporter;
        this.maxBatchSize = config.getMaxBatchSize();
        this.queueSize = config.getQueueSize();
        this.updateInterval = config.getRemoteUpdateInterval();
    }

    @PostConstruct
    public synchronized void start() {
        if (this.future == null) {
            this.executor = Executors.newSingleThreadScheduledExecutor(Threads.daemonThreadsNamed((String)("http-remote-store-" + this.name)));
            this.future = this.executor.scheduleWithFixedDelay(new Runnable(){

                @Override
                public void run() {
                    try {
                        HttpRemoteStore.this.updateProcessors(HttpRemoteStore.this.selector.selectAllServices());
                    }
                    catch (Throwable e) {
                        log.warn(e, "Error refreshing batch processors");
                    }
                }
            }, 0L, this.updateInterval.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    @PreDestroy
    public synchronized void shutdown() {
        if (this.future != null) {
            this.future.cancel(true);
            try {
                this.executor.submit(new Runnable(){

                    @Override
                    public void run() {
                        HttpRemoteStore.this.updateProcessors(Collections.emptyList());
                    }
                }).get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
            this.executor.shutdownNow();
            this.future = null;
        }
    }

    private void updateProcessors(List<ServiceDescriptor> descriptors) {
        Set nodeIds = (Set)descriptors.stream().map(HttpRemoteStore.getNodeIdFunction()).collect(ImmutableSet.toImmutableSet());
        Iterator iterator = this.processors.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry entry = iterator.next();
            if (nodeIds.contains(entry.getKey())) continue;
            iterator.remove();
            ((BatchProcessor)entry.getValue()).stop();
            this.mbeanExporter.unexport(this.nameFor((String)entry.getKey()));
        }
        Iterable newDescriptors = (Iterable)descriptors.stream().filter(descriptor -> !descriptor.getNodeId().equals(this.node.getNodeId()) && !this.processors.containsKey(descriptor.getNodeId())).collect(ImmutableList.toImmutableList());
        for (ServiceDescriptor descriptor2 : newDescriptors) {
            BatchProcessor<Entry> processor = new BatchProcessor<Entry>(descriptor2.getNodeId(), new MyBatchHandler(this.name, descriptor2, this.httpClient), this.maxBatchSize, this.queueSize);
            processor.start();
            this.processors.put(descriptor2.getNodeId(), processor);
            this.mbeanExporter.export(this.nameFor(descriptor2.getNodeId()), processor);
        }
        this.lastRemoteServerRefreshTimestamp.set(System.currentTimeMillis());
    }

    private String nameFor(String id) {
        return ObjectNames.generatedNameOf(BatchProcessor.class, (Named)Names.named((String)(this.name + "-" + id)));
    }

    @Managed
    public long getLastRemoteServerRefreshTimestamp() {
        return this.lastRemoteServerRefreshTimestamp.get();
    }

    private static Function<ServiceDescriptor, String> getNodeIdFunction() {
        return new Function<ServiceDescriptor, String>(){

            @Override
            public String apply(ServiceDescriptor descriptor) {
                return descriptor.getNodeId();
            }
        };
    }

    @Override
    public void put(Entry entry) {
        for (BatchProcessor processor : this.processors.values()) {
            processor.put(entry);
        }
    }

    private static class MyBatchHandler
    implements BatchProcessor.BatchHandler<Entry> {
        private final ObjectMapper mapper = new ObjectMapper((JsonFactory)new SmileFactory());
        private final URI uri;
        private final HttpClient httpClient;

        public MyBatchHandler(String name, ServiceDescriptor descriptor, HttpClient httpClient) {
            this.httpClient = httpClient;
            this.uri = descriptor.getProperties().get("https") != null ? URI.create((String)descriptor.getProperties().get("https") + "/v1/store/" + name) : URI.create((String)descriptor.getProperties().get("http") + "/v1/store/" + name);
        }

        @Override
        public void processBatch(final Collection<Entry> entries) {
            Request request = Request.Builder.preparePost().setUri(this.uri).setHeader("Content-Type", "application/x-jackson-smile").setBodyGenerator(new BodyGenerator(){

                public void write(OutputStream out) throws Exception {
                    mapper.writeValue(out, (Object)entries);
                }
            }).build();
            try {
                this.httpClient.execute(request, (ResponseHandler)StatusResponseHandler.createStatusResponseHandler());
            }
            catch (RuntimeException runtimeException) {
                // empty catch block
            }
        }
    }
}

