/*
 * Decompiled with CFR 0.152.
 */
package com.starrocks.connector.flink.table;

import com.starrocks.connector.flink.manager.StarRocksSinkManager;
import com.starrocks.connector.flink.row.StarRocksIRowTransformer;
import com.starrocks.connector.flink.row.StarRocksISerializer;
import com.starrocks.connector.flink.row.StarRocksSerializerFactory;
import com.starrocks.connector.flink.table.StarRocksSinkOptions;
import com.starrocks.connector.flink.table.StarRocksSinkSemantic;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.alter.Alter;
import net.sf.jsqlparser.statement.truncate.Truncate;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.NestedRowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.InstantiationUtil;

public class StarRocksDynamicSinkFunction<T>
extends RichSinkFunction<T>
implements CheckpointedFunction {
    private static final long serialVersionUID = 1L;
    private StarRocksSinkManager sinkManager;
    private StarRocksIRowTransformer<T> rowTransformer;
    private StarRocksSinkOptions sinkOptions;
    private StarRocksISerializer serializer;
    private transient Counter totalInvokeRowsTime;
    private transient Counter totalInvokeRows;
    private static final String COUNTER_INVOKE_ROWS_COST_TIME = "totalInvokeRowsTimeNs";
    private static final String COUNTER_INVOKE_ROWS = "totalInvokeRows";
    private transient ListState<Tuple2<String, List<byte[]>>> checkpointedState;

    public StarRocksDynamicSinkFunction(StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer<T> rowTransformer) {
        this.sinkManager = new StarRocksSinkManager(sinkOptions, schema);
        rowTransformer.setTableSchema(schema);
        this.serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames());
        this.rowTransformer = rowTransformer;
        this.sinkOptions = sinkOptions;
    }

    public StarRocksDynamicSinkFunction(StarRocksSinkOptions sinkOptions) {
        this.sinkManager = new StarRocksSinkManager(sinkOptions, null);
        this.sinkOptions = sinkOptions;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.sinkManager.setRuntimeContext(this.getRuntimeContext());
        this.totalInvokeRows = this.getRuntimeContext().getMetricGroup().counter(COUNTER_INVOKE_ROWS);
        this.totalInvokeRowsTime = this.getRuntimeContext().getMetricGroup().counter(COUNTER_INVOKE_ROWS_COST_TIME);
        if (null != this.rowTransformer) {
            this.rowTransformer.setRuntimeContext(this.getRuntimeContext());
        }
        this.sinkManager.startScheduler();
        this.sinkManager.startAsyncFlushing();
    }

    public synchronized void invoke(T value, SinkFunction.Context context) throws Exception {
        long start = System.nanoTime();
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals((Object)this.sinkOptions.getSemantic())) {
            this.flushPreviousState();
        }
        if (null == this.serializer) {
            this.sinkManager.writeRecord((String)value);
            this.totalInvokeRows.inc(1L);
            this.totalInvokeRowsTime.inc(System.nanoTime() - start);
            return;
        }
        if (value instanceof RowData && !this.sinkOptions.supportUpsertDelete() && !RowKind.INSERT.equals((Object)((RowData)value).getRowKind())) {
            return;
        }
        if (value instanceof NestedRowData) {
            int headerSize = 256;
            NestedRowData ddlData = (NestedRowData)value;
            if (ddlData.getSegments().length != 1 || ddlData.getSegments()[0].size() < 256) {
                return;
            }
            int totalSize = ddlData.getSegments()[0].size();
            byte[] data = new byte[totalSize - 256];
            ddlData.getSegments()[0].get(256, data);
            Map ddlMap = (Map)InstantiationUtil.deserializeObject((byte[])data, (ClassLoader)HashMap.class.getClassLoader());
            if (null == ddlMap || "true".equals(ddlMap.get("snapshot")) || Strings.isNullOrEmpty((String)((String)ddlMap.get("ddl"))) || Strings.isNullOrEmpty((String)((String)ddlMap.get("databaseName")))) {
                return;
            }
            Statement stmt = CCJSqlParserUtil.parse((String)((String)ddlMap.get("ddl")));
            if (stmt instanceof Truncate) {
                Truncate truncate = (Truncate)stmt;
                if (!this.sinkOptions.getTableName().equalsIgnoreCase(truncate.getTable().getName())) {
                    return;
                }
            } else if (stmt instanceof Alter) {
                Alter alter = (Alter)stmt;
            }
        }
        this.sinkManager.writeRecord(this.serializer.serialize(this.rowTransformer.transform(value, this.sinkOptions.supportUpsertDelete())));
        this.totalInvokeRows.inc(1L);
        this.totalInvokeRowsTime.inc(System.nanoTime() - start);
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        if (!StarRocksSinkSemantic.EXACTLY_ONCE.equals((Object)this.sinkOptions.getSemantic())) {
            return;
        }
        ListStateDescriptor descriptor = new ListStateDescriptor("buffered-rows", TypeInformation.of((TypeHint)new TypeHint<Tuple2<String, List<byte[]>>>(){}));
        this.checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    }

    public synchronized void snapshotState(FunctionSnapshotContext context) throws Exception {
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals((Object)this.sinkOptions.getSemantic())) {
            this.flushPreviousState();
            this.checkpointedState.add((Object)new Tuple2((Object)this.sinkManager.createBatchLabel(), new ArrayList<byte[]>(this.sinkManager.getBufferedBatchList())));
            return;
        }
        this.sinkManager.flush(this.sinkManager.createBatchLabel(), true);
    }

    public synchronized void close() throws Exception {
        super.close();
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals((Object)this.sinkOptions.getSemantic())) {
            this.flushPreviousState();
        }
        this.sinkManager.close();
    }

    private void flushPreviousState() throws Exception {
        for (Tuple2 state : (Iterable)this.checkpointedState.get()) {
            this.sinkManager.setBufferedBatchList((List)state.f1);
            this.sinkManager.flush((String)state.f0, true);
        }
        this.checkpointedState.clear();
    }
}

