/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.clogproxy.client.connection;

import com.oceanbase.clogproxy.client.config.ClientConf;
import com.oceanbase.clogproxy.client.connection.Connection;
import com.oceanbase.clogproxy.client.connection.ConnectionFactory;
import com.oceanbase.clogproxy.client.connection.ConnectionParams;
import com.oceanbase.clogproxy.client.connection.StreamContext;
import com.oceanbase.clogproxy.client.enums.ErrorCode;
import com.oceanbase.clogproxy.client.exception.LogProxyClientException;
import com.oceanbase.clogproxy.client.listener.RecordListener;
import com.oceanbase.clogproxy.client.listener.StatusListener;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientStream {
    private static final Logger logger = LoggerFactory.getLogger(ClientStream.class);
    private final AtomicBoolean started = new AtomicBoolean(false);
    private Thread thread = null;
    private final StreamContext context;
    private String checkpointString;
    private int retryTimes = 0;
    private Connection connection = null;
    private final AtomicBoolean reconnecting = new AtomicBoolean(true);
    private final AtomicBoolean reconnect = new AtomicBoolean(true);
    private final List<RecordListener> listeners = new ArrayList<RecordListener>();
    private final List<StatusListener> statusListeners = new ArrayList<StatusListener>();

    public ClientStream(ClientConf clientConf, ConnectionParams connectionParams) {
        this.context = new StreamContext(this, clientConf, connectionParams);
    }

    public void stop() {
        if (this.started.compareAndSet(true, false)) {
            logger.info("Try to stop this client");
            if (this.connection != null) {
                this.connection.close();
                this.connection = null;
            }
            this.join();
            this.thread = null;
            logger.info("Client stopped successfully");
        }
    }

    public void join() {
        if (this.thread != null) {
            try {
                this.thread.join();
            }
            catch (InterruptedException e) {
                logger.warn("Waits for process thread failed : {}", (Object)e.getMessage());
                this.triggerStop();
            }
        }
    }

    public void triggerStop() {
        new Thread(this::stop).start();
    }

    public void triggerException(LogProxyClientException e) {
        if (e == null) {
            return;
        }
        for (RecordListener listener : this.listeners) {
            try {
                listener.onException(e);
            }
            catch (Throwable throwable) {
                logger.error("Failed to notify listener on exception: {}, cause: {}", (Object)e, (Object)throwable);
            }
        }
    }

    public void start() {
        this.context.params().setEnableMonitor(!this.statusListeners.isEmpty());
        this.retryTimes = 0;
        if (this.started.compareAndSet(false, true)) {
            this.thread = new Thread(() -> {
                while (this.isRunning()) {
                    ReconnectState state = this.reconnect();
                    if (state == ReconnectState.EXIT) {
                        this.triggerException(new LogProxyClientException(ErrorCode.E_MAX_RECONNECT, "Exceed max retry times", true));
                        break;
                    }
                    if (state == ReconnectState.RETRY) {
                        try {
                            TimeUnit.SECONDS.sleep(this.context.config().getRetryIntervalS());
                        }
                        catch (InterruptedException interruptedException) {}
                        continue;
                    }
                    StreamContext.TransferPacket packet = null;
                    while (this.isRunning()) {
                        try {
                            packet = this.context.recordQueue().poll(this.context.config().getReadWaitTimeMs(), TimeUnit.MILLISECONDS);
                            break;
                        }
                        catch (InterruptedException interruptedException) {
                        }
                    }
                    if (packet == null) continue;
                    try {
                        switch (packet.getType()) {
                            case DATA_CLIENT: {
                                for (RecordListener recordListener : this.listeners) {
                                    recordListener.notify(packet.getRecord());
                                }
                                break;
                            }
                            case STATUS: {
                                for (StatusListener statusListener : this.statusListeners) {
                                    statusListener.notify(packet.getStatus());
                                }
                                break;
                            }
                            default: {
                                throw new LogProxyClientException(ErrorCode.E_PROTOCOL, "Unsupported Packet Type: " + packet.getType());
                            }
                        }
                        try {
                            this.setCheckpointString(packet.getRecord().getSafeTimestamp());
                        }
                        catch (IllegalArgumentException e) {
                            logger.error("Failed to update checkpoint for log message: " + packet.getRecord(), (Throwable)e);
                            throw new LogProxyClientException(ErrorCode.E_INNER, "Failed to update checkpoint");
                        }
                    }
                    catch (LogProxyClientException e) {
                        this.triggerException(e);
                        break;
                    }
                    catch (Exception e) {
                        this.triggerException(new LogProxyClientException(ErrorCode.E_USER, e));
                        break;
                    }
                }
                this.triggerStop();
                logger.info("Client process thread exit");
            });
            this.thread.setDaemon(false);
            this.thread.start();
        }
        Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
    }

    public boolean isRunning() {
        return this.started.get();
    }

    private ReconnectState reconnect() {
        if (this.reconnect.compareAndSet(true, false)) {
            logger.info("Try to connect");
            try {
                if (this.context.config().getMaxReconnectTimes() != -1 && this.retryTimes > this.context.config().getMaxReconnectTimes()) {
                    logger.error("Failed to connect, exceed max reconnect retry time: {}", (Object)this.context.config().getMaxReconnectTimes());
                    ReconnectState reconnectState = ReconnectState.EXIT;
                    return reconnectState;
                }
                if (this.connection != null) {
                    this.connection.close();
                    this.connection = null;
                }
                if (StringUtils.isNotEmpty((CharSequence)this.checkpointString)) {
                    logger.warn("update checkpoint: {}", (Object)this.checkpointString);
                    this.context.params().updateCheckpoint(this.checkpointString);
                }
                this.connection = ConnectionFactory.instance().createConnection(this.context);
                if (this.connection != null) {
                    logger.info("Connect successfully");
                    ReconnectState reconnectState = ReconnectState.SUCCESS;
                    return reconnectState;
                }
                logger.error("Failed to connect, retry count: {}, max: {}", (Object)this.retryTimes, (Object)this.context.config().getMaxReconnectTimes());
                this.reconnect.set(true);
                ReconnectState reconnectState = ReconnectState.RETRY;
                return reconnectState;
            }
            catch (Exception e) {
                logger.error("Failed to connect, retry count: {}, max: {}, message: {}", new Object[]{this.retryTimes, this.context.config().getMaxReconnectTimes(), e.getMessage()});
                this.reconnect.set(true);
                ReconnectState reconnectState = ReconnectState.RETRY;
                return reconnectState;
            }
            finally {
                this.reconnecting.set(false);
                ++this.retryTimes;
            }
        }
        return ReconnectState.SUCCESS;
    }

    public void triggerReconnect() {
        if (this.reconnecting.compareAndSet(false, true)) {
            this.reconnect.compareAndSet(false, true);
        }
    }

    private void setCheckpointString(String checkpointString) {
        long timestamp = Long.parseLong(checkpointString);
        if (timestamp <= 0L) {
            throw new IllegalArgumentException("Update checkpoint with invalid value: " + timestamp);
        }
        if (this.checkpointString == null || Long.parseLong(this.checkpointString) < timestamp) {
            this.checkpointString = checkpointString;
        }
    }

    public synchronized void addListener(RecordListener recordListener) {
        this.listeners.add(recordListener);
    }

    public synchronized void addStatusListener(StatusListener statusListener) {
        this.statusListeners.add(statusListener);
    }

    private static enum ReconnectState {
        SUCCESS,
        RETRY,
        EXIT;

    }
}

