/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.data.load.stream;

import com.fasterxml.jackson.databind.JsonNode;
import com.starrocks.data.load.stream.DefaultStreamLoader;
import com.starrocks.data.load.stream.StreamLoadConstants;
import com.starrocks.data.load.stream.StreamLoadManager;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.StreamLoadUtils;
import com.starrocks.data.load.stream.TableRegion;
import com.starrocks.data.load.stream.TransactionStatus;
import com.starrocks.data.load.stream.exception.StreamLoadFailException;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import org.apache.http.Header;
import org.apache.http.client.RedirectStrategy;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.DefaultRedirectStrategy;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionStreamLoader
extends DefaultStreamLoader {
    private static final Logger log = LoggerFactory.getLogger(TransactionStreamLoader.class);
    private Header[] defaultTxnHeaders;
    private Header[] beginTxnHeader;
    private HttpClientBuilder clientBuilder;
    private StreamLoadManager manager;

    protected void initTxHeaders(StreamLoadProperties properties) {
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("Authorization", StreamLoadUtils.getBasicAuthHeader(properties.getUsername(), properties.getPassword()));
        this.defaultTxnHeaders = (Header[])headers.entrySet().stream().map(entry -> new BasicHeader((String)entry.getKey(), (String)entry.getValue())).toArray(Header[]::new);
        HashMap<String, String> beginHeaders = new HashMap<String, String>(headers);
        String timeout = properties.getHeaders().get("timeout");
        if (timeout == null) {
            beginHeaders.put("timeout", "600");
        } else {
            beginHeaders.put("timeout", timeout);
        }
        this.beginTxnHeader = (Header[])beginHeaders.entrySet().stream().map(entry -> new BasicHeader((String)entry.getKey(), (String)entry.getValue())).toArray(Header[]::new);
    }

    @Override
    public void start(StreamLoadProperties properties, StreamLoadManager manager) {
        super.start(properties, manager);
        this.manager = manager;
        this.enableTransaction();
        this.initTxHeaders(properties);
        this.clientBuilder = HttpClients.custom().setRedirectStrategy((RedirectStrategy)new DefaultRedirectStrategy(){

            protected boolean isRedirectable(String method) {
                return true;
            }
        });
    }

    @Override
    public boolean begin(TableRegion region) {
        if (region.getLabel() == null) {
            region.setLabel(region.getLabelGenerator().next());
            if (this.doBegin(region)) {
                return true;
            }
            region.setLabel(null);
            return false;
        }
        return true;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected boolean doBegin(TableRegion region) {
        String host = this.getAvailableHost();
        String beginUrl = StreamLoadConstants.getBeginUrl(host);
        String label = region.getLabel();
        log.info("Transaction start, label : {}", (Object)label);
        HttpPost httpPost = new HttpPost(beginUrl);
        httpPost.setHeaders(this.beginTxnHeader);
        httpPost.addHeader("label", label);
        httpPost.addHeader("db", region.getDatabase());
        httpPost.addHeader("table", region.getTable());
        httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());
        String db = region.getDatabase();
        String table = region.getTable();
        log.info("Transaction start, db: {}, table: {}, label: {}, request : {}", new Object[]{db, table, label, httpPost});
        try (CloseableHttpClient client = this.clientBuilder.build();){
            String status;
            String responseBody;
            try (CloseableHttpResponse response = client.execute((HttpUriRequest)httpPost);){
                responseBody = this.parseHttpResponse("begin transaction", region.getDatabase(), region.getTable(), label, response);
            }
            log.info("Transaction started, db: {}, table: {}, label: {}, body : {}", new Object[]{db, table, label, responseBody});
            JsonNode node = this.objectMapper.readTree(responseBody);
            JsonNode statusNode = node.get("Status");
            String string = status = statusNode == null ? null : statusNode.asText();
            if (status == null) {
                String errMsg = String.format("Can't find 'Status' in the response of transaction begin request. Transaction load is supported since StarRocks 2.4, and please make sure your StarRocks version support transaction load first. db: %s, table: %s, label: %s, response: %s", db, table, label, responseBody);
                log.error(errMsg);
                throw new StreamLoadFailException(errMsg);
            }
            if ("OK".equals(status)) {
                boolean errMsg = true;
                return errMsg;
            }
            String errMsg = String.format("Transaction start failed, db: %s, table: %s, label: %s, responseBody: %s", region.getDatabase(), region.getTable(), label, responseBody);
            throw new StreamLoadFailException(errMsg);
        }
        catch (StreamLoadFailException se) {
            throw se;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public boolean prepare(StreamLoadSnapshot.Transaction transaction) {
        String host = this.getAvailableHost();
        String prepareUrl = StreamLoadConstants.getPrepareUrl(host);
        HttpPost httpPost = new HttpPost(prepareUrl);
        httpPost.setHeaders(this.defaultTxnHeaders);
        httpPost.addHeader("label", transaction.getLabel());
        httpPost.addHeader("db", transaction.getDatabase());
        httpPost.addHeader("table", transaction.getTable());
        httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());
        log.info("Transaction prepare, label : {}, request : {}", (Object)transaction.getLabel(), (Object)httpPost);
        try (CloseableHttpClient client = this.clientBuilder.build();){
            String responseBody;
            try (CloseableHttpResponse response = client.execute((HttpUriRequest)httpPost);){
                responseBody = this.parseHttpResponse("prepare transaction", transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), response);
            }
            log.info("Transaction prepared, label : {}, body : {}", (Object)transaction.getLabel(), (Object)responseBody);
            StreamLoadResponse streamLoadResponse = new StreamLoadResponse();
            StreamLoadResponse.StreamLoadResponseBody streamLoadBody = (StreamLoadResponse.StreamLoadResponseBody)this.objectMapper.readValue(responseBody, StreamLoadResponse.StreamLoadResponseBody.class);
            streamLoadResponse.setBody(streamLoadBody);
            String status = streamLoadBody.getStatus();
            if (status == null) {
                throw new StreamLoadFailException(String.format("Prepare transaction status is null. db: %s, table: %s, label: %s, response body: %s", transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), responseBody));
            }
            switch (status) {
                case "OK": {
                    this.manager.callback(streamLoadResponse);
                    boolean bl = true;
                    return bl;
                }
                case "TXN_NOT_EXISTS": {
                    String labelState = this.getLabelState(host, transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), Collections.singleton(TransactionStatus.PREPARE.name()));
                    if (!TransactionStatus.PREPARED.isSame(labelState)) {
                        String errMsg = String.format("Transaction prepare failed because of unexpected state, label: %s, state: %s", transaction.getLabel(), labelState);
                        log.error(errMsg);
                        throw new StreamLoadFailException(errMsg);
                    }
                    boolean bl = true;
                    return bl;
                }
            }
            String errorLog = this.getErrorLog(streamLoadBody.getErrorURL());
            String errorMsg = String.format("Transaction prepare failed, db: %s, table: %s, label: %s, \nresponseBody: %s\nerrorLog: %s", transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), responseBody, errorLog);
            log.error(errorMsg);
            throw new StreamLoadFailException(errorMsg);
        }
        catch (StreamLoadFailException se) {
            throw se;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public boolean commit(StreamLoadSnapshot.Transaction transaction) {
        String host = this.getAvailableHost();
        String commitUrl = StreamLoadConstants.getCommitUrl(host);
        HttpPost httpPost = new HttpPost(commitUrl);
        httpPost.setHeaders(this.defaultTxnHeaders);
        httpPost.addHeader("label", transaction.getLabel());
        httpPost.addHeader("db", transaction.getDatabase());
        httpPost.addHeader("table", transaction.getTable());
        httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());
        log.info("Transaction commit, label: {}, request : {}", (Object)transaction.getLabel(), (Object)httpPost);
        try (CloseableHttpClient client = this.clientBuilder.build();){
            String responseBody;
            try (CloseableHttpResponse response = client.execute((HttpUriRequest)httpPost);){
                responseBody = this.parseHttpResponse("commit transaction", transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), response);
            }
            log.info("Transaction committed, lable: {}, body : {}", (Object)transaction.getLabel(), (Object)responseBody);
            StreamLoadResponse streamLoadResponse = new StreamLoadResponse();
            StreamLoadResponse.StreamLoadResponseBody streamLoadBody = (StreamLoadResponse.StreamLoadResponseBody)this.objectMapper.readValue(responseBody, StreamLoadResponse.StreamLoadResponseBody.class);
            streamLoadResponse.setBody(streamLoadBody);
            String status = streamLoadBody.getStatus();
            if (status == null) {
                throw new StreamLoadFailException(String.format("Commit transaction status is null. db: %s, table: %s, label: %s, response body: %s", transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), responseBody));
            }
            if ("OK".equals(status)) {
                this.manager.callback(streamLoadResponse);
                boolean bl = true;
                return bl;
            }
            String labelState = this.getLabelState(host, transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), Collections.emptySet());
            if (TransactionStatus.COMMITTED.isSame(labelState) || TransactionStatus.VISIBLE.isSame(labelState)) {
                boolean bl = true;
                return bl;
            }
            String errorLog = this.getErrorLog(streamLoadBody.getErrorURL());
            log.error("Transaction commit failed, db: {}, table: {}, label: {}, label state: {}, \nresponseBody: {}\nerrorLog: {}", new Object[]{transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), labelState, responseBody, errorLog});
            String exceptionMsg = String.format("Transaction commit failed, db: %s, table: %s, label: %s, commit response status: %s, label state: %s", transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), status, labelState);
            if (!"TXN_NOT_EXISTS".equals(status)) {
                if (!TransactionStatus.UNKNOWN.isSame(labelState)) throw new StreamLoadFailException(exceptionMsg);
            }
            exceptionMsg = exceptionMsg + ". commit response status with TXN_NOT_EXISTS or label state with UNKNOWN often happens when transaction timeouts, and please check StarRocks FE leader's log to confirm it. You can find the transaction id for the label in the FE log first, and search with the transaction id and the keyword 'expired'";
            throw new StreamLoadFailException(exceptionMsg);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public boolean rollback(StreamLoadSnapshot.Transaction transaction) {
        String host = this.getAvailableHost();
        String rollbackUrl = StreamLoadConstants.getRollbackUrl(host);
        log.info("Transaction rollback, label : {}", (Object)transaction.getLabel());
        HttpPost httpPost = new HttpPost(rollbackUrl);
        httpPost.setHeaders(this.defaultTxnHeaders);
        httpPost.addHeader("label", transaction.getLabel());
        httpPost.addHeader("db", transaction.getDatabase());
        httpPost.addHeader("table", transaction.getTable());
        try (CloseableHttpClient client = this.clientBuilder.build();){
            String status;
            String responseBody;
            try (CloseableHttpResponse response = client.execute((HttpUriRequest)httpPost);){
                responseBody = this.parseHttpResponse("abort transaction", transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), response);
            }
            log.info("Transaction rollback, label: {}, body : {}", (Object)transaction.getLabel(), (Object)responseBody);
            JsonNode node = this.objectMapper.readTree(responseBody);
            JsonNode statusNode = node.get("Status");
            String string = status = statusNode == null ? null : statusNode.asText();
            if (status == null) {
                String errMsg = String.format("Abort transaction status is null. db: %s, table: %s, label: %s, response: %s", transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), responseBody);
                log.error(errMsg);
                throw new StreamLoadFailException(errMsg);
            }
            if ("Success".equals(status) || "OK".equals(status)) {
                boolean errMsg = true;
                return errMsg;
            }
            JsonNode msgNode = node.get("Message");
            String msg = msgNode == null ? "" : msgNode.asText();
            log.error("Transaction rollback failed, db: {}, table: {}, label : {}, message: {}", new Object[]{transaction.getDatabase(), transaction.getTable(), transaction.getLabel(), msg});
            boolean bl = false;
            return bl;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    protected String getSendUrl(String host, String database, String table) {
        return StreamLoadConstants.getSendUrl(host);
    }
}

