/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.socket.sink;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.socket.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.socket.exception.SocketConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketClient {
    private static final Logger log = LoggerFactory.getLogger(SocketClient.class);
    private final String hostName;
    private final int port;
    private int retries;
    private final int maxNumRetries;
    private transient Socket client;
    private transient OutputStream outputStream;
    private final SerializationSchema serializationSchema;
    private volatile boolean isRunning = Boolean.TRUE;
    private static final int CONNECTION_RETRY_DELAY = 500;

    public SocketClient(SinkConfig config, SerializationSchema serializationSchema) {
        this.hostName = config.getHost();
        this.port = config.getPort();
        this.serializationSchema = serializationSchema;
        this.retries = config.getMaxNumRetries();
        this.maxNumRetries = config.getMaxNumRetries();
    }

    private void createConnection() throws IOException {
        this.client = new Socket(this.hostName, this.port);
        this.client.setKeepAlive(true);
        this.client.setTcpNoDelay(true);
        this.outputStream = this.client.getOutputStream();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void open() throws IOException {
        try {
            Class<SocketClient> clazz = SocketClient.class;
            synchronized (SocketClient.class) {
                this.createConnection();
                // ** MonitorExit[var1_1] (shouldn't be in output)
            }
        }
        catch (IOException e) {
            throw new SocketConnectorException(SocketConnectorErrorCode.SOCKET_SERVER_CONNECT_FAILED, String.format("Cannot connect to socket server at %s:%d", this.hostName, this.port), e);
        }
        {
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void write(SeaTunnelRow row) throws IOException {
        byte[] msg = this.serializationSchema.serialize(row);
        try {
            this.outputStream.write(msg);
            this.outputStream.flush();
            return;
        }
        catch (IOException e) {
            if (this.maxNumRetries == 0) {
                throw new SocketConnectorException(SocketConnectorErrorCode.SEND_MESSAGE_TO_SOCKET_SERVER_FAILED, String.format("Failed to send message '%s' to socket server at %s:%d. Connection re-tries are not enabled.", row, this.hostName, this.port), e);
            }
            log.error("Failed to send message '{}' to socket server at {}:{}. Trying to reconnect...", new Object[]{row, this.hostName, this.port, e});
            Class<SocketClient> clazz = SocketClient.class;
            synchronized (SocketClient.class) {
                IOException lastException = null;
                this.retries = 0;
                while (this.isRunning && (this.maxNumRetries < 0 || this.retries < this.maxNumRetries)) {
                    try {
                        if (this.outputStream != null) {
                            this.outputStream.close();
                        }
                    }
                    catch (IOException ee) {
                        log.error("Could not close output stream from failed write attempt", (Throwable)ee);
                    }
                    try {
                        if (this.client != null) {
                            this.client.close();
                        }
                    }
                    catch (IOException ee) {
                        log.error("Could not close socket from failed write attempt", (Throwable)ee);
                    }
                    ++this.retries;
                    try {
                        this.createConnection();
                        this.outputStream.write(msg);
                        // ** MonitorExit[var4_4] (shouldn't be in output)
                        return;
                    }
                    catch (IOException ee) {
                        lastException = ee;
                        log.error("Re-connect to socket server and send message failed. Retry time(s): {}", (Object)this.retries, (Object)ee);
                        try {
                            this.wait(500L);
                        }
                        catch (InterruptedException ex) {
                            Thread.currentThread().interrupt();
                            throw new SocketConnectorException(SocketConnectorErrorCode.SOCKET_WRITE_FAILED, "unable to write; interrupted while doing another attempt", e);
                        }
                    }
                }
                if (!this.isRunning) return;
                throw new SocketConnectorException(SocketConnectorErrorCode.SEND_MESSAGE_TO_SOCKET_SERVER_FAILED, String.format("Failed to send message '%s' to socket server at %s:%d. Failed after %d retries.", row, this.hostName, this.port, this.retries), lastException);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws IOException {
        this.isRunning = false;
        SocketClient socketClient = this;
        synchronized (socketClient) {
            this.notifyAll();
            try {
                if (this.outputStream != null) {
                    this.outputStream.close();
                }
            }
            finally {
                if (this.client != null) {
                    this.client.close();
                }
            }
        }
    }
}

