/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.log.LogWriteCallback;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.LogOffsetCommittable;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;

public class RowDataStoreWriteOperator
extends TableWriteOperator<InternalRow> {
    private static final long serialVersionUID = 3L;
    @Nullable
    private final LogSinkFunction logSinkFunction;
    private transient SimpleContext sinkContext;
    @Nullable
    private transient LogWriteCallback logCallback;
    private transient boolean logIgnoreDelete;
    private long currentWatermark = Long.MIN_VALUE;

    protected RowDataStoreWriteOperator(StreamOperatorParameters<Committable> parameters, FileStoreTable table, @Nullable LogSinkFunction logSinkFunction, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
        super(parameters, table, storeSinkWriteProvider, initialCommitUser);
        this.logSinkFunction = logSinkFunction;
        if (logSinkFunction != null) {
            FunctionUtils.setFunctionRuntimeContext((Function)logSinkFunction, (RuntimeContext)this.getRuntimeContext());
        }
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        if (this.logSinkFunction != null) {
            StreamingFunctionUtils.restoreFunctionState((StateInitializationContext)context, (Function)this.logSinkFunction);
        }
    }

    @Override
    protected boolean containLogSystem() {
        return this.logSinkFunction != null;
    }

    public void open() throws Exception {
        super.open();
        this.sinkContext = new SimpleContext(this.getProcessingTimeService());
        if (this.logSinkFunction != null) {
            RowDataStoreWriteOperator.openFunction((Function)this.logSinkFunction);
            this.logCallback = new LogWriteCallback();
            this.logSinkFunction.setWriteCallback(this.logCallback);
            this.logIgnoreDelete = Options.fromMap(this.table.options()).get(CoreOptions.LOG_IGNORE_DELETE);
        }
    }

    private static void openFunction(Function function) throws Exception {
        if (function instanceof RichFunction) {
            RichFunction richFunction = (RichFunction)function;
            try {
                Method method = RichFunction.class.getDeclaredMethod("open", OpenContext.class);
                method.invoke((Object)richFunction, new OpenContext(){});
                return;
            }
            catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException method) {
                Method method2 = RichFunction.class.getDeclaredMethod("open", Configuration.class);
                method2.invoke((Object)richFunction, new Configuration());
            }
        }
    }

    public void processWatermark(Watermark mark) throws Exception {
        super.processWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
        if (this.logSinkFunction != null) {
            this.logSinkFunction.writeWatermark(new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
        }
    }

    public void processElement(StreamRecord<InternalRow> element) throws Exception {
        SinkRecord record;
        this.sinkContext.timestamp = element.hasTimestamp() ? Long.valueOf(element.getTimestamp()) : null;
        try {
            record = this.write.write((InternalRow)element.getValue());
        }
        catch (Exception e) {
            throw new IOException(e);
        }
        if (record != null && this.logSinkFunction != null && (!this.logIgnoreDelete || record.row().getRowKind().isAdd())) {
            SinkRecord logRecord = this.write.toLogRecord(record);
            this.logSinkFunction.invoke(logRecord, this.sinkContext);
        }
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        if (this.logSinkFunction != null) {
            StreamingFunctionUtils.snapshotFunctionState((StateSnapshotContext)context, (OperatorStateBackend)this.getOperatorStateBackend(), (Function)this.logSinkFunction);
        }
    }

    public void finish() throws Exception {
        super.finish();
        if (this.logSinkFunction != null) {
            this.logSinkFunction.finish();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.logSinkFunction != null) {
            FunctionUtils.closeFunction((Function)this.logSinkFunction);
        }
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        if (this.logSinkFunction instanceof CheckpointListener) {
            ((CheckpointListener)this.logSinkFunction).notifyCheckpointComplete(checkpointId);
        }
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        super.notifyCheckpointAborted(checkpointId);
        if (this.logSinkFunction instanceof CheckpointListener) {
            ((CheckpointListener)this.logSinkFunction).notifyCheckpointAborted(checkpointId);
        }
    }

    @Override
    protected List<Committable> prepareCommit(boolean waitCompaction, long checkpointId) throws IOException {
        List<Committable> committables = super.prepareCommit(waitCompaction, checkpointId);
        if (this.logCallback != null) {
            try {
                Objects.requireNonNull(this.logSinkFunction).flush();
            }
            catch (Exception e) {
                throw new IOException(e);
            }
            this.logCallback.offsets().forEach((k, v) -> committables.add(new Committable(checkpointId, Committable.Kind.LOG_OFFSET, new LogOffsetCommittable((int)k, (long)v))));
        }
        return committables;
    }

    public static class Factory
    extends TableWriteOperator.Factory<InternalRow> {
        @Nullable
        private final LogSinkFunction logSinkFunction;

        public Factory(FileStoreTable table, @Nullable LogSinkFunction logSinkFunction, StoreSinkWrite.Provider storeSinkWriteProvider, String initialCommitUser) {
            super(table, storeSinkWriteProvider, initialCommitUser);
            this.logSinkFunction = logSinkFunction;
        }

        public <T extends StreamOperator<Committable>> T createStreamOperator(StreamOperatorParameters<Committable> parameters) {
            return (T)((Object)new RowDataStoreWriteOperator(parameters, this.table, this.logSinkFunction, this.storeSinkWriteProvider, this.initialCommitUser));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return RowDataStoreWriteOperator.class;
        }
    }

    private class SimpleContext
    implements SinkFunction.Context {
        @Nullable
        private Long timestamp;
        private final ProcessingTimeService processingTimeService;

        public SimpleContext(ProcessingTimeService processingTimeService) {
            this.processingTimeService = processingTimeService;
        }

        public long currentProcessingTime() {
            return this.processingTimeService.getCurrentProcessingTime();
        }

        public long currentWatermark() {
            return RowDataStoreWriteOperator.this.currentWatermark;
        }

        public Long timestamp() {
            return this.timestamp;
        }
    }
}

