/*
 * Decompiled with CFR 0.152.
 */
package com.antgroup.geaflow.shuffle.api.writer;

import com.antgroup.geaflow.common.metric.ShuffleWriteMetrics;
import com.antgroup.geaflow.common.shuffle.ShuffleDescriptor;
import com.antgroup.geaflow.shuffle.api.writer.IShuffleWriter;
import com.antgroup.geaflow.shuffle.api.writer.IWriterContext;
import com.antgroup.geaflow.shuffle.api.writer.ShardBuffer;
import com.antgroup.geaflow.shuffle.api.writer.ShardBufferFactory;
import com.antgroup.geaflow.shuffle.network.IConnectionManager;
import java.io.IOException;
import java.util.List;
import java.util.Optional;

public class PipelineWriter<T, R>
implements IShuffleWriter<T, R> {
    private ShardBuffer shardBuffer;
    private final IConnectionManager connectionManager;

    public PipelineWriter(IConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
    }

    @Override
    public void init(IWriterContext writerContext) {
        ShuffleDescriptor descriptor = writerContext.getShuffleDescriptor();
        this.shardBuffer = ShardBufferFactory.getShardBuffer(descriptor.getExchangeMode(), this.connectionManager);
        this.shardBuffer.init(writerContext);
    }

    @Override
    public void emit(long batchId, T value, boolean isRetract, int[] channels) throws IOException {
        this.shardBuffer.emit(batchId, value, isRetract, channels);
    }

    @Override
    public void emit(long batchId, List<T> data, boolean isRetract, int channel) throws IOException {
        this.shardBuffer.emit(batchId, data, channel);
    }

    @Override
    public Optional<R> flush(long batchId) throws IOException {
        return this.shardBuffer.finish(batchId);
    }

    @Override
    public ShuffleWriteMetrics getShuffleWriteMetrics() {
        return this.shardBuffer.getShuffleWriteMetrics();
    }

    @Override
    public void close() {
        if (this.shardBuffer != null) {
            this.shardBuffer.close();
        }
    }
}

