/*
 * Decompiled with CFR 0.152.
 */
package com.oceanbase.tools.loaddump.writer.oceanbase;

import com.alipay.oceanbase.rpc.protocol.payload.impl.direct_load.ObLoadDupActionType;
import com.google.common.cache.Cache;
import com.lmax.disruptor.WorkHandler;
import com.oceanbase.tools.loaddump.common.model.Insertion;
import com.oceanbase.tools.loaddump.common.model.LoadParameter;
import com.oceanbase.tools.loaddump.common.model.SubFile;
import com.oceanbase.tools.loaddump.common.model.TaskState;
import com.oceanbase.tools.loaddump.directpath.DirectPathConnection;
import com.oceanbase.tools.loaddump.directpath.DirectPathPreparedStatement;
import com.oceanbase.tools.loaddump.parser.record.Record;
import com.oceanbase.tools.loaddump.writer.oceanbase.AbstractOceanBaseWriter;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectPathWriter
extends AbstractOceanBaseWriter
implements WorkHandler<Insertion> {
    private static final Logger log = LoggerFactory.getLogger(DirectPathWriter.class);
    private static final long SERVER_TIMEOUT = 86400000000L;
    private final Map<String, List<SubFile>> subFileMap;
    private final Cache<String, DirectPathConnection> connectionCache;

    public DirectPathWriter(LoadParameter parameter, Cache<String, DirectPathConnection> connectionCache) {
        super(parameter);
        this.subFileMap = parameter.getSubFiles().stream().collect(Collectors.groupingBy(SubFile::getObjectName));
        this.connectionCache = connectionCache;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onEvent(Insertion event) throws Exception {
        DirectPathConnection conn;
        SubFile subFile = event.getSubFile();
        if (subFile.getTaskState() == TaskState.FAILURE) {
            subFile.markdone();
            return;
        }
        String tableName = event.getTable();
        try {
            conn = this.getDirectPathConnection(tableName);
        }
        catch (Exception e) {
            subFile.markdone();
            throw new IllegalStateException("Load task failed to begin.", e);
        }
        List<Record> recordList = event.getRecordList();
        if (CollectionUtils.isNotEmpty(recordList)) {
            try (DirectPathPreparedStatement stmt = conn.createStatement();){
                for (Record record : recordList) {
                    stmt.addBatch(record.getValues());
                }
                int[] result = stmt.executeBatch();
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Insert {} rows success", (Object)Thread.currentThread().getName(), (Object)Arrays.stream(result).sum());
                }
                stmt.clearBatch();
                this.meter.mark(event.getLeaderServer(), result.length, event.getByteSize(), this.getConsumedSlots(event.getLeaderServer()));
                subFile.addLoadedCount(result.length);
                subFile.addLoadedBytes(event.getByteSize());
            }
            catch (Exception ex) {
                String msg = MessageFormat.format("Load data into table \"{0}\" failed. Error: {1}", tableName, ex.getMessage());
                log.error(msg);
                subFile.setMessage(msg);
                subFile.setTaskState(TaskState.FAILURE);
            }
            finally {
                subFile.markdone();
            }
        } else {
            if (!event.isCommit()) {
                subFile.markdone();
                throw new IllegalStateException("Invalid event without records, expected a commit");
            }
            try {
                conn.compareAndCommit(subFile.getParsedCount(), subFile.getLoadedCount());
            }
            catch (Exception ex) {
                String msg = MessageFormat.format("Commit direct load transaction for table \"{0}\" failed. Error: {1}", tableName, ex.getMessage());
                log.error(msg);
                subFile.setMessage(msg);
                subFile.setTaskState(TaskState.FAILURE);
            }
            finally {
                subFile.markdone();
            }
        }
    }

    private DirectPathConnection getDirectPathConnection(String tableName) throws Exception {
        return (DirectPathConnection)this.connectionCache.get((Object)tableName, () -> {
            int blocks = Optional.ofNullable(this.subFileMap.get(tableName)).map(List::size).orElse(0);
            return new DirectPathConnection.Builder().host(this.parameter.getHost()).port(this.parameter.getRpcPort()).tenant(this.parameter.getTenant()).user(this.parameter.getUser()).password(Optional.ofNullable(this.parameter.getPassword()).orElse("")).schema(this.parameter.getDatabaseName()).table(tableName).blocks(blocks).parallel(this.parameter.getThreads()).maxErrorCount(this.parameter.getMaxErrors()).duplicateKeyAction(this.parameter.isReplaceData() ? ObLoadDupActionType.REPLACE : ObLoadDupActionType.IGNORE).serverTimeout(86400000000L).build();
        });
    }
}

