/*
 * Decompiled with CFR 0.152.
 */
package io.scalecube.cluster.metadata;

import io.scalecube.cluster.ClusterConfig;
import io.scalecube.cluster.CorrelationIdGenerator;
import io.scalecube.cluster.Member;
import io.scalecube.cluster.metadata.GetMetadataRequest;
import io.scalecube.cluster.metadata.GetMetadataResponse;
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.net.Address;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

public class MetadataStoreImpl
implements MetadataStore {
    private static final Logger LOGGER = LoggerFactory.getLogger(MetadataStoreImpl.class);
    public static final String GET_METADATA_REQ = "sc/metadata/req";
    public static final String GET_METADATA_RESP = "sc/metadata/resp";
    private Object localMetadata;
    private final Member localMember;
    private final Transport transport;
    private final ClusterConfig config;
    private final CorrelationIdGenerator cidGenerator;
    private final Map<Member, ByteBuffer> membersMetadata = new ConcurrentHashMap<Member, ByteBuffer>();
    private final Scheduler scheduler;
    private final Disposable.Composite actionsDisposables = Disposables.composite();

    public MetadataStoreImpl(Member localMember, Transport transport, Object localMetadata, ClusterConfig config, Scheduler scheduler, CorrelationIdGenerator cidGenerator) {
        this.localMember = Objects.requireNonNull(localMember);
        this.transport = Objects.requireNonNull(transport);
        this.config = Objects.requireNonNull(config);
        this.scheduler = Objects.requireNonNull(scheduler);
        this.cidGenerator = Objects.requireNonNull(cidGenerator);
        this.localMetadata = Objects.requireNonNull(localMetadata);
    }

    public void start() {
        this.actionsDisposables.add(this.transport.listen().publishOn(this.scheduler).subscribe(this::onMessage, this::onError));
    }

    public void stop() {
        this.actionsDisposables.dispose();
        this.membersMetadata.clear();
    }

    public Object metadata() {
        return this.localMetadata;
    }

    public Optional<ByteBuffer> metadata(Member member) {
        return Optional.ofNullable(this.membersMetadata.get(member)).map(ByteBuffer::slice);
    }

    public void updateMetadata(Object metadata) {
        Objects.requireNonNull(metadata, "updateMetadata(): metadata must be not null");
        this.localMetadata = metadata;
    }

    public ByteBuffer updateMetadata(Member member, ByteBuffer metadata) {
        if (this.localMember.equals((Object)member)) {
            throw new IllegalArgumentException("removeMetadata must not accept local member");
        }
        if (metadata == null) {
            return this.removeMetadata(member);
        }
        ByteBuffer value = metadata.slice();
        ByteBuffer result = this.membersMetadata.put(member, value);
        if (result == null) {
            LOGGER.debug("Added metadata: {} for member {} [at {}]", new Object[]{value.remaining(), member, this.localMember});
        } else {
            LOGGER.debug("Updated metadata: {} for member {} [at {}]", new Object[]{value.remaining(), member, this.localMember});
        }
        return result;
    }

    public ByteBuffer removeMetadata(Member member) {
        if (this.localMember.equals((Object)member)) {
            throw new IllegalArgumentException("removeMetadata must not accept local member");
        }
        ByteBuffer metadata = this.membersMetadata.remove(member);
        if (metadata != null) {
            LOGGER.debug("Removed metadata for member {} [at {}]", (Object)member, (Object)this.localMember);
            return metadata;
        }
        return null;
    }

    public Mono<ByteBuffer> fetchMetadata(Member member) {
        return Mono.create(sink -> {
            LOGGER.debug("Getting metadata for member {} [at {}]", (Object)member, (Object)this.localMember);
            String cid = this.cidGenerator.nextCid();
            Address targetAddress = member.address();
            Message request = Message.builder().qualifier(GET_METADATA_REQ).correlationId(cid).data((Object)new GetMetadataRequest(member)).build();
            this.transport.requestResponse(targetAddress, request).timeout(Duration.ofMillis(this.config.metadataTimeout()), this.scheduler).publishOn(this.scheduler).subscribe(response -> {
                LOGGER.debug("Received GetMetadataResp[{}] from {} [at {}]", new Object[]{cid, targetAddress, this.localMember});
                GetMetadataResponse respData = (GetMetadataResponse)response.data();
                ByteBuffer metadata = respData.getMetadata();
                sink.success((Object)metadata);
            }, th -> {
                LOGGER.warn("Failed getting GetMetadataResp[{}] from {} within {} ms [at {}], cause : {}", new Object[]{cid, targetAddress, this.config.metadataTimeout(), this.localMember, th.toString()});
                sink.error(th);
            });
        });
    }

    private void onMessage(Message message) {
        if (GET_METADATA_REQ.equals(message.qualifier())) {
            this.onMetadataRequest(message);
        }
    }

    private void onError(Throwable throwable) {
        LOGGER.error("Received unexpected error: ", throwable);
    }

    private void onMetadataRequest(Message message) {
        LOGGER.debug("Received GetMetadataReq: {} [at {}]", (Object)message, (Object)this.localMember);
        GetMetadataRequest reqData = (GetMetadataRequest)message.data();
        Member targetMember = reqData.getMember();
        if (!targetMember.id().equals(this.localMember.id())) {
            LOGGER.warn("Received GetMetadataReq: {} to {}, but local member is {}", new Object[]{message, targetMember, this.localMember});
            return;
        }
        ByteBuffer byteBuffer = this.config.metadataEncoder().encode(this.localMetadata);
        GetMetadataResponse respData = new GetMetadataResponse(this.localMember, byteBuffer);
        Message response = Message.builder().qualifier(GET_METADATA_RESP).correlationId(message.correlationId()).data((Object)respData).build();
        Address responseAddress = message.sender();
        LOGGER.debug("Send GetMetadataResp: {} to {} [at {}]", new Object[]{response, responseAddress, this.localMember});
        this.transport.send(responseAddress, response).subscribe(null, ex -> LOGGER.debug("Failed to send GetMetadataResp: {} to {} [at {}], cause: {}", new Object[]{response, responseAddress, this.localMember, ex.toString()}));
    }
}

