/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.dcp.transport.netty;

import com.couchbase.client.dcp.conductor.BucketConfigSink;
import com.couchbase.client.dcp.conductor.DcpChannel;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.core.logging.RedactableArgument;
import com.couchbase.client.dcp.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.dcp.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.dcp.deps.io.netty.channel.SimpleChannelInboundHandler;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.Future;
import com.couchbase.client.dcp.deps.io.netty.util.concurrent.GenericFutureListener;
import com.couchbase.client.dcp.message.HelloFeature;
import com.couchbase.client.dcp.message.MessageUtil;
import com.couchbase.client.dcp.message.ResponseStatus;
import com.couchbase.client.dcp.message.ServerRequest;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketConfigHandler
extends SimpleChannelInboundHandler<ByteBuf> {
    private static final Logger log = LoggerFactory.getLogger(BucketConfigHandler.class);
    private final BucketConfigSink bucketConfigSink;
    private final Duration configRefreshInterval;

    BucketConfigHandler(BucketConfigSink bucketConfigSink, Duration configRefreshInterval) {
        this.bucketConfigSink = Objects.requireNonNull(bucketConfigSink);
        this.configRefreshInterval = Objects.requireNonNull(configRefreshInterval);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte magic = msg.getByte(0);
        if (magic == -126) {
            ServerRequest.handleServerRequest(ctx, msg, this.bucketConfigSink);
            return;
        }
        if (magic != -127) {
            ctx.fireChannelRead(msg.retain());
            return;
        }
        byte opcode = msg.getByte(1);
        if (opcode == -75) {
            this.updateConfig(ctx, msg, "client request");
            boolean serverPushesConfigUpdates = HelloFeature.CLUSTERMAP_CHANGE_NOTIFICATION.isEnabled(ctx);
            if (!serverPushesConfigUpdates) {
                this.scheduleConfigRefresh(ctx);
            }
            return;
        }
        if (BucketConfigHandler.isNotMyVbucketError(msg)) {
            this.updateConfig(ctx, msg, "NotMyVbucket");
        }
        ctx.fireChannelRead(msg.retain());
    }

    private void scheduleConfigRefresh(ChannelHandlerContext ctx) {
        log.debug("Scheduling bucket config refresh in {}", (Object)this.configRefreshInterval);
        ctx.executor().schedule(() -> {
            ByteBuf request = ctx.alloc().buffer();
            MessageUtil.initRequest((byte)-75, request);
            ctx.writeAndFlush(request).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future -> {
                if (!future.isSuccess() && ctx.channel().isActive()) {
                    log.warn("Config refresh failed; rescheduling.", future.cause());
                    this.scheduleConfigRefresh(ctx);
                }
            }));
        }, this.configRefreshInterval.toMillis(), TimeUnit.MILLISECONDS);
    }

    private static boolean isNotMyVbucketError(ByteBuf msg) {
        return MessageUtil.getResponseStatus(msg) == ResponseStatus.NOT_MY_VBUCKET;
    }

    private void updateConfig(ChannelHandlerContext ctx, ByteBuf msg, String source) {
        String clustermap = MessageUtil.getContentAsString(msg);
        if (!clustermap.isEmpty()) {
            log.info("{} Received bucket config from {}", (Object)RedactableArgument.system(ctx.channel()), (Object)source);
            HostAndPort remote = DcpChannel.getHostAndPort(ctx.channel());
            this.bucketConfigSink.accept(remote, clustermap);
        }
    }
}

