/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.IOException;
import java.util.Optional;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator;
import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
import org.apache.paimon.table.FileStoreTable;

public class CdcDynamicBucketWriteOperator
extends TableWriteOperator<Tuple2<CdcRecord, Integer>> {
    private static final long serialVersionUID = 1L;
    private final long retrySleepMillis;
    private final int maxRetryNumTimes;
    private final boolean skipCorruptRecord;
    private final boolean logCorruptRecord;

    private CdcDynamicBucketWriteOperator(StreamOperatorParameters<Committable> parameters, FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
        super(parameters, table, storeSinkWriteProvider, initialCommitUser);
        this.retrySleepMillis = table.coreOptions().toConfiguration().get(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME).toMillis();
        this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES);
        this.skipCorruptRecord = table.coreOptions().toConfiguration().get(CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD);
        this.logCorruptRecord = table.coreOptions().toConfiguration().get(CdcRecordStoreWriteOperator.LOG_CORRUPT_RECORD);
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        this.table = this.table.copyWithLatestSchema();
        super.initializeState(context);
    }

    @Override
    protected boolean containLogSystem() {
        return false;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) throws Exception {
        Tuple2 record = (Tuple2)element.getValue();
        Optional<GenericRow> optionalConverted = CdcRecordUtils.toGenericRow((CdcRecord)record.f0, this.table.schema().fields(), this.logCorruptRecord);
        if (!optionalConverted.isPresent()) {
            for (int retry = 0; retry < this.maxRetryNumTimes; ++retry) {
                this.table = this.table.copyWithLatestSchema();
                optionalConverted = CdcRecordUtils.toGenericRow((CdcRecord)record.f0, this.table.schema().fields(), this.logCorruptRecord);
                if (optionalConverted.isPresent()) break;
                Thread.sleep(this.retrySleepMillis);
            }
            this.write.replace(this.table);
        }
        if (!optionalConverted.isPresent()) {
            if (!this.skipCorruptRecord) throw new RuntimeException("Unable to process element. Possibly a corrupt record: " + (this.logCorruptRecord ? record : "<redacted>"));
            LOG.warn("Skipping corrupt or unparsable record {}", this.logCorruptRecord ? record : "<redacted>");
            return;
        }
        try {
            this.write.write(optionalConverted.get(), (Integer)record.f1);
            return;
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    public static class Factory
    extends TableWriteOperator.Factory<Tuple2<CdcRecord, Integer>> {
        public Factory(FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
            super(table, storeSinkWriteProvider, initialCommitUser);
        }

        public <T extends StreamOperator<Committable>> T createStreamOperator(StreamOperatorParameters<Committable> parameters) {
            return (T)((Object)new CdcDynamicBucketWriteOperator(parameters, this.table, this.storeSinkWriteProvider, this.initialCommitUser));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return CdcDynamicBucketWriteOperator.class;
        }
    }
}

