/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.connector.flink.sink;

import com.oceanbase.connector.flink.ConnectorOptions;
import com.oceanbase.connector.flink.OceanBaseConnectorOptions;
import com.oceanbase.connector.flink.connection.OceanBaseConnectionProvider;
import com.oceanbase.connector.flink.sink.AbstractDynamicTableSink;
import com.oceanbase.connector.flink.sink.OceanBaseRecordFlusher;
import com.oceanbase.connector.flink.sink.OceanBaseSink;
import com.oceanbase.connector.flink.sink.OceanBaseWriterEvent;
import com.oceanbase.connector.flink.sink.RecordFlusher;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.OceanBaseRowDataSerializationSchema;
import com.oceanbase.connector.flink.table.RecordSerializationSchema;
import com.oceanbase.connector.flink.table.TableId;
import com.oceanbase.connector.flink.table.TableInfo;
import com.oceanbase.connector.flink.table.TransactionRecord;
import java.io.Serializable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.util.function.SerializableFunction;

public class OceanBaseDynamicTableSink
extends AbstractDynamicTableSink {
    private final OceanBaseConnectorOptions connectorOptions;

    public OceanBaseDynamicTableSink(ResolvedSchema physicalSchema, OceanBaseConnectorOptions connectorOptions) {
        super(physicalSchema);
        this.connectorOptions = connectorOptions;
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        OceanBaseConnectionProvider connectionProvider = new OceanBaseConnectionProvider(this.connectorOptions);
        TableId tableId = new TableId(connectionProvider.getDialect()::getFullTableName, this.connectorOptions.getSchemaName(), this.connectorOptions.getTableName());
        OceanBaseRecordFlusher recordFlusher = new OceanBaseRecordFlusher(this.connectorOptions, connectionProvider);
        return new AbstractDynamicTableSink.SinkProvider((SerializableFunction & Serializable)typeSerializer -> new OceanBaseSink((ConnectorOptions)this.connectorOptions, typeSerializer, (RecordSerializationSchema)new OceanBaseRowDataSerializationSchema(new TableInfo(tableId, this.physicalSchema)), DataChangeRecord.KeyExtractor.simple(), (RecordFlusher)recordFlusher, this.getWriterEventListener(recordFlusher, tableId)));
    }

    protected OceanBaseWriterEvent.Listener getWriterEventListener(RecordFlusher recordFlusher, TableId tableId) {
        return (OceanBaseWriterEvent.Listener & Serializable)event -> {
            try {
                if (event == OceanBaseWriterEvent.INITIALIZED) {
                    recordFlusher.flush(new TransactionRecord(tableId, TransactionRecord.Type.BEGIN));
                }
                if (event == OceanBaseWriterEvent.CLOSING) {
                    recordFlusher.flush(new TransactionRecord(tableId, TransactionRecord.Type.COMMIT));
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to flush transaction record", e);
            }
        };
    }

    public DynamicTableSink copy() {
        return new OceanBaseDynamicTableSink(this.physicalSchema, this.connectorOptions);
    }

    public String asSummaryString() {
        return "OceanBase";
    }
}

