/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.MockOutput;
import org.apache.flink.streaming.util.MockStreamConfig;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.AppendOnlySingleTableCompactionWorkerOperator;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class AppendOnlySingleTableCompactionWorkerOperatorTest
extends TableTestBase {
    @Test
    public void testAsyncCompactionWorks() throws Exception {
        this.createTableDefault();
        AppendOnlySingleTableCompactionWorkerOperator workerOperator = (AppendOnlySingleTableCompactionWorkerOperator)new AppendOnlySingleTableCompactionWorkerOperator.Factory(this.getTableDefault(), "user", false).createStreamOperator(new StreamOperatorParameters((StreamTask)new SourceOperatorStreamTask((Environment)new DummyEnvironment()), (StreamConfig)new MockStreamConfig(new Configuration(), 1), (Output)new MockOutput(new ArrayList()), null, null, null));
        List commitMessages = this.writeDataDefault(200, 20);
        List<AppendCompactTask> tasks = AppendOnlySingleTableCompactionWorkerOperatorTest.packTask(commitMessages, 5);
        List records = tasks.stream().map(StreamRecord::new).collect(Collectors.toList());
        Assertions.assertThat((int)tasks.size()).isEqualTo(4);
        workerOperator.open();
        for (StreamRecord record : records) {
            workerOperator.processElement(record);
        }
        ArrayList committables = new ArrayList();
        Long timeStart = System.currentTimeMillis();
        long timeout = 60000L;
        Assertions.assertThatCode(() -> {
            while (committables.size() != 4) {
                committables.addAll(workerOperator.prepareCommit(false, Long.MAX_VALUE));
                Long now = System.currentTimeMillis();
                if (now - timeStart > timeout && committables.size() != 4) {
                    throw new RuntimeException("Timeout waiting for compaction, maybe some error happens in " + AppendOnlySingleTableCompactionWorkerOperator.class.getName());
                }
                Thread.sleep(1000L);
            }
        }).doesNotThrowAnyException();
        committables.forEach(a -> Assertions.assertThat((((CommitMessageImpl)a.wrappedCommittable()).compactIncrement().compactAfter().size() == 1 ? 1 : 0) != 0).isTrue());
        workerOperator.close();
        Thread.sleep(2000L);
    }

    @Test
    public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception {
        List fileMetas;
        CommitMessage commitMessage;
        Future f2;
        this.createTableDefault();
        AppendOnlySingleTableCompactionWorkerOperator workerOperator = (AppendOnlySingleTableCompactionWorkerOperator)new AppendOnlySingleTableCompactionWorkerOperator.Factory(this.getTableDefault(), "user", false).createStreamOperator(new StreamOperatorParameters((StreamTask)new SourceOperatorStreamTask((Environment)new DummyEnvironment()), (StreamConfig)new MockStreamConfig(new Configuration(), 1), (Output)new MockOutput(new ArrayList()), null, null, null));
        List commitMessages = this.writeDataDefault(200, 40);
        List<AppendCompactTask> tasks = AppendOnlySingleTableCompactionWorkerOperatorTest.packTask(commitMessages, 5);
        List records = tasks.stream().map(StreamRecord::new).collect(Collectors.toList());
        Assertions.assertThat((int)tasks.size()).isEqualTo(8);
        workerOperator.open();
        for (StreamRecord record : records) {
            workerOperator.processElement(record);
        }
        Thread.sleep(5000L);
        LocalFileIO localFileIO = LocalFileIO.create();
        DataFilePathFactory dataFilePathFactory = this.getTableDefault().store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
        int i = 0;
        Iterator iterator = workerOperator.result().iterator();
        while (iterator.hasNext() && (f2 = (Future)iterator.next()).isDone()) {
            commitMessage = (CommitMessage)f2.get();
            fileMetas = ((CommitMessageImpl)commitMessage).compactIncrement().compactAfter();
            for (DataFileMeta fileMeta : fileMetas) {
                Assertions.assertThat((boolean)localFileIO.exists(dataFilePathFactory.toPath(fileMeta))).isTrue();
            }
            if (i++ <= 2) continue;
            break;
        }
        workerOperator.close();
        Thread.sleep(2000L);
        for (Future f2 : workerOperator.result()) {
            try {
                if (!f2.isDone()) {
                    try {
                        f2.get(5L, TimeUnit.SECONDS);
                    }
                    catch (Exception e) {
                        break;
                    }
                }
                commitMessage = (CommitMessage)f2.get();
                fileMetas = ((CommitMessageImpl)commitMessage).compactIncrement().compactAfter();
                for (DataFileMeta fileMeta : fileMetas) {
                    Assertions.assertThat((boolean)localFileIO.exists(dataFilePathFactory.toPath(fileMeta))).isFalse();
                }
            }
            catch (Exception exception) {
            }
        }
    }

    protected Schema schemaDefault() {
        Schema.Builder schemaBuilder = Schema.newBuilder();
        schemaBuilder.column("f0", (DataType)DataTypes.INT());
        schemaBuilder.column("f1", (DataType)DataTypes.BIGINT());
        schemaBuilder.column("f2", (DataType)DataTypes.STRING());
        schemaBuilder.option(CoreOptions.BUCKET.key(), "-1");
        return schemaBuilder.build();
    }

    protected InternalRow dataDefault(int time, int size) {
        return GenericRow.of((Object[])new Object[]{RANDOM.nextInt(), RANDOM.nextLong(), this.randomString()});
    }

    public static List<AppendCompactTask> packTask(List<CommitMessage> messages, int fileSize) {
        ArrayList<AppendCompactTask> result = new ArrayList<AppendCompactTask>();
        List metas = messages.stream().flatMap(m -> ((CommitMessageImpl)m).newFilesIncrement().newFiles().stream()).collect(Collectors.toList());
        for (int i = 0; i < metas.size(); i += fileSize) {
            if (i < metas.size() - fileSize) {
                result.add(new AppendCompactTask(BinaryRow.EMPTY_ROW, metas.subList(i, i + fileSize)));
                continue;
            }
            result.add(new AppendCompactTask(BinaryRow.EMPTY_ROW, metas.subList(i, metas.size())));
        }
        return result;
    }
}

