/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.ververica.connectors.common.sink;

import com.alibaba.ververica.connectors.common.sink.HasRetryTimeout;
import com.alibaba.ververica.connectors.common.sink.Syncable;
import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OutputFormatSinkFunction<RECORD>
extends RichSinkFunction<RECORD>
implements ListCheckpointed<byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(OutputFormatSinkFunction.class);
    private static final long RETRY_INTERVAL = 100L;
    private OutputFormat<RECORD> outputFormat;
    private long retryTimeout = 1800000L;

    public OutputFormatSinkFunction(OutputFormat<RECORD> outputFormat) {
        this.outputFormat = outputFormat;
    }

    public void open(Configuration config) throws IOException {
        if (RichOutputFormat.class.isAssignableFrom(this.outputFormat.getClass())) {
            ((RichOutputFormat)this.outputFormat).setRuntimeContext(this.getRuntimeContext());
        }
        this.outputFormat.configure(config);
        this.outputFormat.open(this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getNumberOfParallelSubtasks());
        if (this.outputFormat instanceof HasRetryTimeout) {
            this.retryTimeout = ((HasRetryTimeout)this.outputFormat).getRetryTimeout();
        }
        LOG.info("Initialized OutputFormatSinkFunction of {}/{} task.", (Object)this.getRuntimeContext().getIndexOfThisSubtask(), (Object)this.getRuntimeContext().getNumberOfParallelSubtasks());
    }

    public void close() throws IOException {
        LOG.info("Closing OutputFormatSinkFunction.");
        this.outputFormat.close();
    }

    public void invoke(RECORD record) throws Exception {
        this.outputFormat.writeRecord(record);
    }

    public OutputFormat<RECORD> getOutputFormat() {
        return this.outputFormat;
    }

    public List<byte[]> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        if (this.outputFormat instanceof Syncable) {
            long startSyncing = System.currentTimeMillis();
            while (true) {
                try {
                    ((Syncable)this.outputFormat).sync();
                }
                catch (IOException e) {
                    long retryingTimeCost;
                    LOG.error("Sync output format failed", (Throwable)e);
                    try {
                        Thread.sleep(100L);
                        continue;
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    if ((retryingTimeCost = System.currentTimeMillis() - startSyncing) <= this.retryTimeout) continue;
                    throw new IOException(String.format("Retry time exceed timeout Error: %s, %s", retryingTimeCost, this.retryTimeout));
                }
                break;
            }
        }
        return null;
    }

    public void restoreState(List<byte[]> state) throws Exception {
    }

    public String toString() {
        return ((Object)((Object)this)).getClass().getSimpleName() + ":" + this.outputFormat.toString();
    }
}

