/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.protocol.management;

import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.core.cluster.ClusterCache;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.supports.protocol.management.ProtocolSupportDefinition;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ClusterProtocolSupportManager
implements ProtocolSupportManager {
    private ClusterManager clusterManager;
    private ClusterCache<String, ProtocolSupportDefinition> cache;

    public ClusterProtocolSupportManager(ClusterManager clusterManager) {
        this.clusterManager = clusterManager;
        this.cache = clusterManager.getCache("__protocol_supports");
    }

    @Override
    public Mono<Boolean> store(Flux<ProtocolSupportDefinition> all) {
        return all.collect(Collectors.toMap(ProtocolSupportDefinition::getId, Function.identity())).flatMap(arg_0 -> this.cache.putAll(arg_0));
    }

    @Override
    public Flux<ProtocolSupportDefinition> loadAll() {
        return this.cache.values();
    }

    @Override
    public Mono<Boolean> save(ProtocolSupportDefinition definition) {
        return this.cache.put((Object)definition.getId(), (Object)definition).flatMap(su -> this.clusterManager.getTopic("_protocol_changed").publish((Publisher)Mono.just((Object)definition)).thenReturn(su));
    }

    @Override
    public Mono<Boolean> remove(String id) {
        return this.cache.get((Object)id).doOnNext(def -> def.setState((byte)-1)).flatMap(this::save).then(this.cache.remove((Object)id));
    }
}

