/*
 * Decompiled with CFR 0.152.
 */
package org.apache.shardingsphere.data.pipeline.cdc.client.handler;

import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.context.ClientConnectionContext;
import org.apache.shardingsphere.data.pipeline.cdc.client.handler.ExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RetryStreamingExceptionHandler
implements ExceptionHandler {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(RetryStreamingExceptionHandler.class);
    private final CDCClient cdcClient;
    private final int maxRetryTimes;
    private final int retryIntervalMills;
    private final AtomicInteger retryTimes = new AtomicInteger(0);

    public RetryStreamingExceptionHandler(CDCClient cdcClient, int maxRetryTimes, int retryIntervalMills) {
        this.cdcClient = cdcClient;
        this.maxRetryTimes = maxRetryTimes;
        this.retryIntervalMills = retryIntervalMills;
    }

    @Override
    public void handleException(ChannelHandlerContext ctx, Throwable throwable) {
        log.error("Catch exception: ", throwable);
        this.reconnect(ctx);
    }

    private void reconnect(ChannelHandlerContext ctx) {
        this.retryTimes.incrementAndGet();
        ClientConnectionContext connectionContext = (ClientConnectionContext)ctx.channel().attr(ClientConnectionContext.CONTEXT_KEY).get();
        if (this.retryTimes.get() > this.maxRetryTimes) {
            log.warn("Stop try to reconnect, stop streaming ids: {}", connectionContext.getStreamingIds());
            connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> this.cdcClient.stopStreaming((String)each)));
            return;
        }
        TimeUnit.MILLISECONDS.sleep(this.retryIntervalMills);
        log.info("Retry to restart streaming, retry times: {}", (Object)this.retryTimes.get());
        connectionContext.getStreamingIds().forEach(each -> CompletableFuture.runAsync(() -> this.cdcClient.restartStreaming((String)each)));
    }
}

