/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.IOException;
import java.util.Optional;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator;
import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
import org.apache.paimon.table.FileStoreTable;

public class CdcDynamicBucketWriteOperator
extends TableWriteOperator<Tuple2<CdcRecord, Integer>> {
    private static final long serialVersionUID = 1L;
    private final long retrySleepMillis;

    public CdcDynamicBucketWriteOperator(FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
        super(table, storeSinkWriteProvider, initialCommitUser);
        this.retrySleepMillis = table.coreOptions().toConfiguration().get(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME).toMillis();
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        this.table = this.table.copyWithLatestSchema();
        super.initializeState(context);
    }

    @Override
    protected boolean containLogSystem() {
        return false;
    }

    public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) throws Exception {
        Tuple2 record = (Tuple2)element.getValue();
        Optional<GenericRow> optionalConverted = CdcRecordUtils.toGenericRow((CdcRecord)record.f0, this.table.schema().fields());
        if (!optionalConverted.isPresent()) {
            while (true) {
                this.table = this.table.copyWithLatestSchema();
                optionalConverted = CdcRecordUtils.toGenericRow((CdcRecord)record.f0, this.table.schema().fields());
                if (optionalConverted.isPresent()) break;
                Thread.sleep(this.retrySleepMillis);
            }
            this.write.replace(this.table);
        }
        try {
            this.write.write(optionalConverted.get(), (Integer)record.f1);
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }
}

