/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.repository.store.graph.v2.bulkimport.pc;

import org.apache.atlas.model.impexp.AtlasImportResult;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.pc.StatusReporter;
import org.apache.atlas.pc.WorkItemBuilder;
import org.apache.atlas.pc.WorkItemManager;
import org.apache.atlas.repository.migration.DataMigrationStatusService;
import org.apache.atlas.repository.store.graph.v2.BulkImporterImpl;
import org.apache.atlas.repository.store.graph.v2.EntityImportStream;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EntityCreationManager<AtlasEntityWithExtInfo>
extends WorkItemManager {
    private static final Logger LOG = LoggerFactory.getLogger(EntityCreationManager.class);
    private static final String WORKER_PREFIX = "migration-import";
    private static final long STATUS_REPORT_TIMEOUT_DURATION = 60000L;
    private final StatusReporter<String, Long> statusReporter;
    private final AtlasImportResult importResult;
    private final DataMigrationStatusService dataMigrationStatusService;
    private String currentTypeName;
    private float currentPercent;
    private EntityImportStream entityImportStream;

    public EntityCreationManager(WorkItemBuilder builder, int batchSize, int numWorkers, AtlasImportResult importResult, DataMigrationStatusService dataMigrationStatusService) {
        super(builder, WORKER_PREFIX, batchSize, numWorkers, true);
        this.importResult = importResult;
        this.dataMigrationStatusService = dataMigrationStatusService;
        this.statusReporter = new StatusReporter(60000L);
    }

    public long read(EntityImportStream entityStream) {
        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo;
        long currentIndex = entityStream.getPosition();
        this.entityImportStream = entityStream;
        this.dataMigrationStatusService.setStatus("IN_PROGRESS");
        while ((entityWithExtInfo = entityStream.getNextEntityWithExtInfo()) != null) {
            AtlasEntity entity;
            AtlasEntity atlasEntity = entity = entityWithExtInfo != null ? entityWithExtInfo.getEntity() : null;
            if (entity == null) continue;
            try {
                this.produce(currentIndex++, entity.getTypeName(), entityWithExtInfo);
            }
            catch (Throwable e) {
                LOG.warn("Exception: {}", (Object)entity.getGuid(), (Object)e);
                break;
            }
        }
        this.dataMigrationStatusService.setStatus("DONE");
        return currentIndex;
    }

    private void produce(long currentIndex, String typeName, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) {
        String previousTypeName = this.getCurrentTypeName();
        if (StringUtils.isNotEmpty((String)typeName) && StringUtils.isNotEmpty((String)previousTypeName) && !StringUtils.equals((String)previousTypeName, (String)typeName)) {
            LOG.info("Waiting: '{}' to complete...", (Object)previousTypeName);
            super.drain();
            LOG.info("Switching entity type processing: From: '{}' To: '{}'...", (Object)previousTypeName, (Object)typeName);
        }
        this.setCurrentTypeName(typeName);
        this.statusReporter.produced((Object)entityWithExtInfo.getEntity().getGuid(), (Object)currentIndex);
        super.checkProduce((Object)entityWithExtInfo);
        this.extractResults();
    }

    public void extractResults() {
        Object result;
        while ((result = this.getResults().poll()) != null) {
            this.statusReporter.processed((Object)((String)result));
        }
        this.logStatus();
    }

    private void logStatus() {
        Long ack = (Long)this.statusReporter.ack();
        if (ack == null) {
            return;
        }
        this.importResult.incrementMeticsCounter(this.getCurrentTypeName());
        this.dataMigrationStatusService.savePosition(ack);
        this.currentPercent = EntityCreationManager.updateImportMetrics(this.getCurrentTypeName(), ack, this.entityImportStream.size(), this.getCurrentPercent());
    }

    private static float updateImportMetrics(String typeNameGuid, long currentIndex, int streamSize, float currentPercent) {
        String lastEntityImported = String.format("entity:last-imported:%s:(%s)", typeNameGuid, currentIndex);
        return BulkImporterImpl.updateImportProgress(LOG, (int)currentIndex, streamSize, currentPercent, lastEntityImported);
    }

    private String getCurrentTypeName() {
        return this.currentTypeName;
    }

    private void setCurrentTypeName(String typeName) {
        this.currentTypeName = typeName;
    }

    private float getCurrentPercent() {
        return this.currentPercent;
    }
}

