/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.translation.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SupportResourceShare;
import org.apache.seatunnel.translation.flink.sink.CommitWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkGlobalCommitter<CommT, GlobalCommT>
implements GlobalCommitter<CommitWrapper<CommT>, GlobalCommT> {
    private static final Logger log = LoggerFactory.getLogger(FlinkGlobalCommitter.class);
    private final SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter;
    private MultiTableResourceManager resourceManager;

    FlinkGlobalCommitter(SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter) {
        this.aggregatedCommitter = aggregatedCommitter;
        if (this.aggregatedCommitter instanceof SupportResourceShare) {
            this.resourceManager = ((SupportResourceShare)this.aggregatedCommitter).initMultiTableResourceManager(1, 1);
        }
        aggregatedCommitter.init();
        if (this.resourceManager != null) {
            ((SupportResourceShare)this.aggregatedCommitter).setMultiTableResourceManager(this.resourceManager, 0);
        }
    }

    public List<GlobalCommT> filterRecoveredCommittables(List globalCommittables) throws IOException {
        return Collections.emptyList();
    }

    public GlobalCommT combine(List<CommitWrapper<CommT>> committables) throws IOException {
        return (GlobalCommT)this.aggregatedCommitter.combine(committables.stream().map(CommitWrapper::getCommit).collect(Collectors.toList()));
    }

    public List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException {
        List reCommittable = this.aggregatedCommitter.commit(globalCommittables);
        if (reCommittable != null && !reCommittable.isEmpty()) {
            log.warn("this version not support re-commit when use flink engine");
        }
        return new ArrayList();
    }

    public void endOfInput() throws IOException {
    }

    public void close() throws Exception {
        this.aggregatedCommitter.close();
        try {
            if (this.resourceManager != null) {
                this.resourceManager.close();
            }
        }
        catch (Throwable e) {
            log.error("close resourceManager error", e);
        }
    }
}

