/*
 * Decompiled with CFR 0.152.
 */
package org.apache.carbondata.processing.loading.steps;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.keygenerator.KeyGenException;
import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.util.CarbonThreadFactory;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
import org.apache.carbondata.processing.store.CarbonFactHandler;
import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;

public class DataWriterProcessorStepImpl
extends AbstractDataLoadProcessorStep {
    private static final LogService LOGGER = LogServiceFactory.getLogService((String)DataWriterProcessorStepImpl.class.getName());
    private long readCounter;
    private DataMapWriterListener listener;
    private final Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
    private ExecutorService rangeExecutorService;
    private List<CarbonFactHandler> carbonFactHandlers;

    public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration, AbstractDataLoadProcessorStep child) {
        super(configuration, child);
        this.localDictionaryGeneratorMap = CarbonUtil.getLocalDictionaryModel((CarbonTable)configuration.getTableSpec().getCarbonTable());
    }

    public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration) {
        super(configuration, null);
        this.localDictionaryGeneratorMap = CarbonUtil.getLocalDictionaryModel((CarbonTable)configuration.getTableSpec().getCarbonTable());
    }

    @Override
    public void initialize() throws IOException {
        super.initialize();
        this.child.initialize();
        this.carbonFactHandlers = new CopyOnWriteArrayList<CarbonFactHandler>();
    }

    private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
        String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), String.valueOf(this.configuration.getTaskNo()), this.configuration.getSegmentId(), false, false);
        CarbonDataProcessorUtil.createLocations(storeLocation);
        return storeLocation;
    }

    public CarbonFactDataHandlerModel getDataHandlerModel() {
        CarbonTableIdentifier tableIdentifier = this.configuration.getTableIdentifier().getCarbonTableIdentifier();
        String[] storeLocation = this.getStoreLocation(tableIdentifier);
        this.listener = this.getDataMapWriterListener(0);
        CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(this.configuration, storeLocation, 0, 0, this.listener);
        carbonFactDataHandlerModel.setColumnLocalDictGenMap(this.localDictionaryGeneratorMap);
        return carbonFactDataHandlerModel;
    }

    @Override
    public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
        Iterator<CarbonRowBatch>[] iterators = this.child.execute();
        CarbonTableIdentifier tableIdentifier = this.configuration.getTableIdentifier().getCarbonTableIdentifier();
        String tableName = tableIdentifier.getTableName();
        try {
            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDictionaryValue2MdkAdd2FileTime("0", Long.valueOf(System.currentTimeMillis()));
            this.rangeExecutorService = Executors.newFixedThreadPool(iterators.length, (ThreadFactory)new CarbonThreadFactory("WriterForwardPool: " + tableName));
            ArrayList<Future<Void>> rangeExecutorServiceSubmitList = new ArrayList<Future<Void>>(iterators.length);
            int i = 0;
            for (Iterator<CarbonRowBatch> iterator : iterators) {
                rangeExecutorServiceSubmitList.add(this.rangeExecutorService.submit(new WriterForwarder(iterator, tableIdentifier, i)));
                ++i;
            }
            try {
                this.rangeExecutorService.shutdown();
                this.rangeExecutorService.awaitTermination(2L, TimeUnit.DAYS);
                for (int j = 0; j < rangeExecutorServiceSubmitList.size(); ++j) {
                    ((Future)rangeExecutorServiceSubmitList.get(j)).get();
                }
            }
            catch (InterruptedException | ExecutionException e) {
                throw new CarbonDataWriterException((Throwable)e);
            }
        }
        catch (CarbonDataWriterException e) {
            LOGGER.error((Throwable)e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
            throw new CarbonDataLoadingException("Error while initializing data handler : " + e.getMessage());
        }
        catch (Exception e) {
            LOGGER.error((Throwable)e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
            throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e);
        }
        return null;
    }

    @Override
    protected String getStepName() {
        return "Data Writer";
    }

    private void processRange(Iterator<CarbonRowBatch> insideRangeIterator, CarbonTableIdentifier tableIdentifier, int rangeId) {
        String[] storeLocation = this.getStoreLocation(tableIdentifier);
        this.listener = this.getDataMapWriterListener(rangeId);
        CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(this.configuration, storeLocation, rangeId, 0, this.listener);
        model.setColumnLocalDictGenMap(this.localDictionaryGeneratorMap);
        CarbonFactHandler dataHandler = null;
        boolean rowsNotExist = true;
        while (insideRangeIterator.hasNext()) {
            if (rowsNotExist) {
                rowsNotExist = false;
                dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
                this.carbonFactHandlers.add(dataHandler);
                dataHandler.initialise();
            }
            this.processBatch(insideRangeIterator.next(), dataHandler);
        }
        if (!rowsNotExist) {
            this.finish(dataHandler);
        }
        this.carbonFactHandlers.remove(dataHandler);
    }

    public void finish(CarbonFactHandler dataHandler) {
        CarbonTableIdentifier tableIdentifier = this.configuration.getTableIdentifier().getCarbonTableIdentifier();
        String tableName = tableIdentifier.getTableName();
        try {
            dataHandler.finish();
        }
        catch (Exception e) {
            LOGGER.error((Throwable)e, "Failed for table: " + tableName + " in  finishing data handler");
        }
        LOGGER.info("Record Processed For table: " + tableName);
        String logMessage = "Finished Carbon DataWriterProcessorStepImpl: Read: " + this.readCounter + ": Write: " + this.rowCounter.get();
        LOGGER.info(logMessage);
        CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(this.rowCounter.get());
        this.processingComplete(dataHandler);
        CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDictionaryValue2MdkAdd2FileTime("0", Long.valueOf(System.currentTimeMillis()));
        CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordMdkGenerateTotalTime("0", Long.valueOf(System.currentTimeMillis()));
    }

    private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
        if (null != dataHandler) {
            try {
                dataHandler.closeHandler();
            }
            catch (CarbonDataWriterException e) {
                LOGGER.error((Throwable)e, e.getMessage());
                throw new CarbonDataLoadingException(e.getMessage(), e);
            }
            catch (Exception e) {
                LOGGER.error((Throwable)e, e.getMessage());
                throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
            }
        }
    }

    private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
        try {
            while (batch.hasNext()) {
                CarbonRow row = batch.next();
                dataHandler.addDataToStore(row);
                ++this.readCounter;
            }
        }
        catch (Exception e) {
            throw new CarbonDataLoadingException(e);
        }
        this.rowCounter.getAndAdd(batch.getSize());
    }

    public void processRow(CarbonRow row, CarbonFactHandler dataHandler) throws KeyGenException {
        try {
            ++this.readCounter;
            dataHandler.addDataToStore(row);
        }
        catch (Exception e) {
            throw new CarbonDataLoadingException("unable to generate the mdkey", e);
        }
        this.rowCounter.getAndAdd(1L);
    }

    @Override
    public void close() {
        if (!this.closed) {
            super.close();
            if (this.listener != null) {
                try {
                    LOGGER.info("closing all the DataMap writers registered to DataMap writer listener");
                    this.listener.finish();
                }
                catch (IOException e) {
                    LOGGER.error((Throwable)e, "error while closing the datamap writers");
                }
            }
            if (null != this.rangeExecutorService) {
                this.rangeExecutorService.shutdownNow();
            }
            if (null != this.carbonFactHandlers && !this.carbonFactHandlers.isEmpty()) {
                for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) {
                    carbonFactHandler.finish();
                    carbonFactHandler.closeHandler();
                }
            }
        }
    }

    private final class WriterForwarder
    implements Callable<Void> {
        private Iterator<CarbonRowBatch> insideRangeIterator;
        private CarbonTableIdentifier tableIdentifier;
        private int rangeId;

        public WriterForwarder(Iterator<CarbonRowBatch> insideRangeIterator, CarbonTableIdentifier tableIdentifier, int rangeId) {
            this.insideRangeIterator = insideRangeIterator;
            this.tableIdentifier = tableIdentifier;
            this.rangeId = rangeId;
        }

        @Override
        public Void call() throws Exception {
            LOGGER.info("Process writer forward for table " + this.tableIdentifier.getTableName() + ", range: " + this.rangeId);
            DataWriterProcessorStepImpl.this.processRange(this.insideRangeIterator, this.tableIdentifier, this.rangeId);
            return null;
        }
    }
}

