/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.loaddump.writer.oceanbase;

import com.lmax.disruptor.WorkHandler;
import com.oceanbase.tools.loaddump.common.model.Insertion;
import com.oceanbase.tools.loaddump.common.model.LoadParameter;
import com.oceanbase.tools.loaddump.common.model.SubFile;
import com.oceanbase.tools.loaddump.common.model.TaskState;
import com.oceanbase.tools.loaddump.directpath.DirectPathConnection;
import com.oceanbase.tools.loaddump.directpath.DirectPathPreparedStatement;
import com.oceanbase.tools.loaddump.parser.record.Record;
import com.oceanbase.tools.loaddump.vmoption.JavaOpts;
import com.oceanbase.tools.loaddump.writer.oceanbase.AbstractOceanBaseWriter;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectPathWriter
extends AbstractOceanBaseWriter
implements WorkHandler<Insertion> {
    private static final Logger log = LoggerFactory.getLogger(DirectPathWriter.class);
    private final Map<String, DirectPathConnection> directLoadConnMap;

    public DirectPathWriter(LoadParameter parameter, Map<String, DirectPathConnection> directLoadConnMap) {
        super(parameter);
        this.directLoadConnMap = directLoadConnMap;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onEvent(Insertion insertion) throws Exception {
        SubFile subFile = insertion.getSubFile();
        if (subFile.getTaskState() == TaskState.FAILURE) {
            this.loadCtx.batchConsumed(insertion);
            return;
        }
        String tableName = insertion.getTable();
        List<Record> recordList = insertion.getRecordList();
        if (JavaOpts.isDryRunMode) {
            int size = Optional.ofNullable(recordList).map(List::size).orElse(0);
            this.meter.mark(insertion.getLeaderServer(), size, insertion.getByteSize(), this.getConsumedSlots(insertion.getLeaderServer()));
            subFile.addLoadedCount(size);
            subFile.addLoadedBytes(insertion.getByteSize());
            this.loadCtx.batchConsumed(insertion);
            return;
        }
        DirectPathConnection conn = this.directLoadConnMap.get(tableName);
        try {
            if (conn.isInit()) {
                conn.begin();
            }
            try (DirectPathPreparedStatement stmt = conn.createStatement();){
                for (Record record : recordList) {
                    stmt.addBatch(record.getValues());
                }
                long start = System.currentTimeMillis();
                int[] result = stmt.executeBatch();
                if (JavaOpts.isDebugable) {
                    log.info("[{}] Insert {} rows success. Elapsed: {} ms", new Object[]{Thread.currentThread().getName(), Arrays.stream(result).sum(), System.currentTimeMillis() - start});
                }
                stmt.clearBatch();
                this.meter.mark(insertion.getLeaderServer(), result.length, insertion.getByteSize(), this.getConsumedSlots(insertion.getLeaderServer()));
                subFile.addLoadedCount(result.length);
                subFile.addLoadedBytes(insertion.getByteSize());
            }
        }
        catch (Throwable ex) {
            subFile.setMessage("Write into table \"" + tableName + "\" failed. " + ex.getMessage());
            subFile.setTaskState(TaskState.FAILURE);
            this.getLoadCtx().markTableFail(tableName);
        }
        finally {
            this.loadCtx.batchConsumed(insertion);
        }
    }
}

