/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.coordinator;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.coordinator.CoordinationResponseUtils;
import org.apache.paimon.flink.sink.coordinator.LatestIdentifierRequest;
import org.apache.paimon.flink.sink.coordinator.LatestIdentifierResponse;
import org.apache.paimon.flink.sink.coordinator.PagedCoordinationRequest;
import org.apache.paimon.flink.sink.coordinator.TableWriteCoordinator;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SegmentsCache;
import org.apache.paimon.utils.ThreadPoolUtils;

public class WriteOperatorCoordinator
implements OperatorCoordinator,
CoordinationRequestHandler {
    private final FileStoreTable table;
    private ThreadPoolExecutor executor;
    private TableWriteCoordinator coordinator;

    public WriteOperatorCoordinator(FileStoreTable table) {
        this.table = table;
    }

    public void start() throws Exception {
        this.executor = ThreadPoolUtils.createCachedThreadPool(1, "WriteCoordinator");
        MemorySize cacheMemory = this.table.coreOptions().toConfiguration().get(FlinkConnectorOptions.SINK_WRITER_COORDINATOR_CACHE_MEMORY);
        SegmentsCache<Path> manifestCache = SegmentsCache.create(cacheMemory, Long.MAX_VALUE);
        this.table.setManifestCache(manifestCache);
        this.coordinator = new TableWriteCoordinator(this.table);
    }

    public void close() throws Exception {
        if (this.executor != null) {
            this.executor.shutdownNow();
            this.executor = null;
        }
    }

    public CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
        CompletableFuture<CoordinationResponse> future = new CompletableFuture<CoordinationResponse>();
        this.executor.execute(() -> {
            try {
                Object response;
                if (request instanceof PagedCoordinationRequest) {
                    response = this.coordinator.scan((PagedCoordinationRequest)request);
                } else if (request instanceof LatestIdentifierRequest) {
                    response = new LatestIdentifierResponse(this.coordinator.latestCommittedIdentifier(((LatestIdentifierRequest)request).user()));
                } else {
                    throw new UnsupportedOperationException("Unsupported request type: " + request);
                }
                future.complete(CoordinationResponseUtils.wrap(response));
            }
            catch (Exception e) {
                future.completeExceptionally(e);
            }
        });
        return future;
    }

    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
        this.coordinator.checkpoint();
        this.executor.execute(() -> {
            try {
                result.complete(new byte[0]);
            }
            catch (Throwable throwable) {
                result.completeExceptionally(new CompletionException(throwable));
            }
        });
    }

    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
    }

    public void notifyCheckpointComplete(long checkpointId) {
    }

    public void resetToCheckpoint(long checkpointId, byte[] checkpointData) {
    }

    public void subtaskReset(int subtask, long checkpointId) {
    }

    public void executionAttemptFailed(int subtask, int attemptNumber, Throwable reason) {
    }

    public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway) {
    }

    public static class Provider
    implements OperatorCoordinator.Provider {
        private final OperatorID operatorId;
        private final FileStoreTable table;

        public Provider(OperatorID operatorId, FileStoreTable table) {
            this.operatorId = operatorId;
            this.table = table;
        }

        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        public OperatorCoordinator create(OperatorCoordinator.Context context) {
            return new WriteOperatorCoordinator(this.table);
        }
    }
}

