/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.flink.sink;

import java.io.IOException;
import java.io.InvalidClassException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.seatunnel.api.common.metrics.Counter;
import org.apache.seatunnel.api.common.metrics.Meter;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.Event;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportResourceShare;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.flink.sink.CommitWrapper;
import org.apache.seatunnel.translation.flink.sink.FlinkWriterState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSinkWriter<InputT, CommT, WriterStateT>
implements org.apache.flink.api.connector.sink.SinkWriter<InputT, CommitWrapper<CommT>, FlinkWriterState<WriterStateT>> {
    private static final Logger log = LoggerFactory.getLogger(FlinkSinkWriter.class);
    private final SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter;
    private final SinkWriter.Context context;
    private final Counter sinkWriteCount;
    private final Counter sinkWriteBytes;
    private final Meter sinkWriterQPS;
    private long checkpointId;
    private MultiTableResourceManager resourceManager;

    FlinkSinkWriter(SinkWriter<SeaTunnelRow, CommT, WriterStateT> sinkWriter, long checkpointId, SinkWriter.Context context) {
        this.context = context;
        this.sinkWriter = sinkWriter;
        this.checkpointId = checkpointId;
        MetricsContext metricsContext = context.getMetricsContext();
        this.sinkWriteCount = metricsContext.counter("SinkWriteCount");
        this.sinkWriteBytes = metricsContext.counter("SinkWriteBytes");
        this.sinkWriterQPS = metricsContext.meter("SinkWriteQPS");
        if (sinkWriter instanceof SupportResourceShare) {
            this.resourceManager = ((SupportResourceShare)sinkWriter).initMultiTableResourceManager(1, 1);
            ((SupportResourceShare)sinkWriter).setMultiTableResourceManager(this.resourceManager, 0);
        }
    }

    public void write(InputT element, SinkWriter.Context context) throws IOException {
        if (element == null) {
            return;
        }
        if (!(element instanceof SeaTunnelRow)) {
            throw new InvalidClassException("only support SeaTunnelRow at now, the element Class is " + element.getClass());
        }
        this.sinkWriter.write((Object)((SeaTunnelRow)element));
        this.sinkWriteCount.inc();
        this.sinkWriteBytes.inc((long)((SeaTunnelRow)element).getBytesSize());
        this.sinkWriterQPS.markEvent();
    }

    public List<CommitWrapper<CommT>> prepareCommit(boolean flush) throws IOException {
        Optional commTOptional = this.sinkWriter.prepareCommit(this.checkpointId);
        return commTOptional.map(CommitWrapper::new).map(Collections::singletonList).orElse(Collections.emptyList());
    }

    public List<FlinkWriterState<WriterStateT>> snapshotState() throws IOException {
        List<FlinkWriterState<WriterStateT>> states = this.sinkWriter.snapshotState(this.checkpointId).stream().map(state -> new FlinkWriterState<Object>(this.checkpointId, state)).collect(Collectors.toList());
        ++this.checkpointId;
        return states;
    }

    public void close() throws Exception {
        this.sinkWriter.close();
        this.context.getEventListener().onEvent((Event)new WriterCloseEvent());
        try {
            if (this.resourceManager != null) {
                this.resourceManager.close();
            }
        }
        catch (Throwable e) {
            log.error("close resourceManager error", e);
        }
    }
}

