/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.table.FileStoreTable;

public class CdcRecordStoreWriteOperator
extends TableWriteOperator<CdcRecord> {
    private static final long serialVersionUID = 1L;
    public static final ConfigOption<Duration> RETRY_SLEEP_TIME = ConfigOptions.key((String)"cdc.schema-change-retry-interval").durationType().defaultValue((Object)Duration.ofMillis(500L)).withFallbackKeys(new String[]{"cdc.retry-sleep-time"}).withDescription("The interval of retrying the schema change.");
    public static final ConfigOption<Integer> MAX_RETRY_NUM_TIMES = ConfigOptions.key((String)"cdc.schema-change-retry-max-num").intType().defaultValue((Object)100).withDescription("Max retry count for retrying the schema change before failing loudly");
    public static final ConfigOption<Boolean> SKIP_CORRUPT_RECORD = ConfigOptions.key((String)"cdc.skip-corrupt-record").booleanType().defaultValue((Object)false).withDescription("Skip corrupt record if we fail to parse it");
    private final long retrySleepMillis;
    private final int maxRetryNumTimes;
    private final boolean skipCorruptRecord;

    protected CdcRecordStoreWriteOperator(StreamOperatorParameters<Committable> parameters, FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
        super(parameters, table, storeSinkWriteProvider, initialCommitUser);
        this.retrySleepMillis = ((Duration)table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME)).toMillis();
        this.maxRetryNumTimes = (Integer)table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
        this.skipCorruptRecord = (Boolean)table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        this.table = this.table.copyWithLatestSchema();
        super.initializeState(context);
    }

    protected boolean containLogSystem() {
        return false;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void processElement(StreamRecord<CdcRecord> element) throws Exception {
        CdcRecord record = (CdcRecord)element.getValue();
        Optional<GenericRow> optionalConverted = CdcRecordUtils.toGenericRow(record, this.table.schema().fields());
        if (!optionalConverted.isPresent()) {
            for (int retry = 0; retry < this.maxRetryNumTimes; ++retry) {
                this.table = this.table.copyWithLatestSchema();
                optionalConverted = CdcRecordUtils.toGenericRow(record, this.table.schema().fields());
                if (optionalConverted.isPresent()) break;
                Thread.sleep(this.retrySleepMillis);
            }
            this.write.replace(this.table);
        }
        if (!optionalConverted.isPresent()) {
            if (!this.skipCorruptRecord) throw new RuntimeException("Unable to process element. Possibly a corrupt record");
            LOG.warn("Skipping corrupt or unparsable record {}", (Object)record);
            return;
        }
        try {
            this.write.write((InternalRow)optionalConverted.get());
            return;
        }
        catch (Exception e) {
            throw new IOException(e);
        }
    }

    public static class Factory
    extends TableWriteOperator.Factory<CdcRecord> {
        public Factory(FileStoreTable table, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
            super(table, storeSinkWriteProvider, initialCommitUser);
        }

        public <T extends StreamOperator<Committable>> T createStreamOperator(StreamOperatorParameters<Committable> parameters) {
            return (T)((Object)new CdcRecordStoreWriteOperator(parameters, this.table, this.storeSinkWriteProvider, this.initialCommitUser));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return CdcRecordStoreWriteOperator.class;
        }
    }
}

