package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.LogicalResult;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexOperations;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroup;
import org.apache.flink.runtime.scheduler.adaptivebatch.forwardgroup.ForwardGroupComputeUtil;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.class */
public class AdaptiveBatchScheduler extends DefaultScheduler implements SchedulerOperations {
    private final DefaultLogicalTopology logicalTopology;
    private final VertexParallelismDecider vertexParallelismDecider;
    private final Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AdaptiveBatchScheduler(Logger logger, JobGraph jobGraph, Executor executor, Configuration configuration, Consumer<ComponentMainThreadExecutor> consumer, ScheduledExecutor scheduledExecutor, ClassLoader classLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory factory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionVertexOperations executionVertexOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long j, ComponentMainThreadExecutor componentMainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory, ShuffleMaster<?> shuffleMaster, Time time, VertexParallelismDecider vertexParallelismDecider, int i) throws Exception {
        super(logger, jobGraph, executor, configuration, consumer, scheduledExecutor, classLoader, checkpointsCleaner, checkpointRecoveryFactory, jobManagerJobMetricGroup, schedulingStrategyFactory, factory, restartBackoffTimeStrategy, executionVertexOperations, executionVertexVersioner, executionSlotAllocatorFactory, j, componentMainThreadExecutor, jobStatusListener, executionGraphFactory, shuffleMaster, time, computeVertexParallelismStoreForDynamicGraph(jobGraph.getVertices(), i));
        this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph);
        this.vertexParallelismDecider = vertexParallelismDecider;
        List<JobVertex> verticesSortedTopologicallyFromSources = jobGraph.getVerticesSortedTopologicallyFromSources();
        ExecutionGraph executionGraph = getExecutionGraph();
        executionGraph.getClass();
        this.forwardGroupsByJobVertexId = ForwardGroupComputeUtil.computeForwardGroups(verticesSortedTopologicallyFromSources, executionGraph::getJobVertex);
    }

    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerBase
    public void startSchedulingInternal() {
        initializeVerticesIfPossible();
        super.startSchedulingInternal();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.scheduler.DefaultScheduler, org.apache.flink.runtime.scheduler.SchedulerBase
    public void updateTaskExecutionStateInternal(ExecutionVertexID executionVertexID, TaskExecutionStateTransition taskExecutionStateTransition) {
        initializeVerticesIfPossible();
        super.updateTaskExecutionStateInternal(executionVertexID, taskExecutionStateTransition);
    }

    private void initializeVerticesIfPossible() {
        List<ExecutionJobVertex> arrayList = new ArrayList<>();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            Iterator<ExecutionJobVertex> it = getExecutionGraph().getVerticesTopologically().iterator();
            while (it.hasNext()) {
                maybeSetParallelism(it.next());
            }
            for (ExecutionJobVertex executionJobVertex : getExecutionGraph().getVerticesTopologically()) {
                if (canInitialize(executionJobVertex)) {
                    getExecutionGraph().initializeJobVertex(executionJobVertex, currentTimeMillis);
                    arrayList.add(executionJobVertex);
                }
            }
        } catch (JobException e) {
            this.log.error("Unexpected error occurred when initializing ExecutionJobVertex", e);
            failJob(e, System.currentTimeMillis());
        }
        if (arrayList.size() > 0) {
            updateTopology(arrayList);
        }
    }

    private void maybeSetParallelism(ExecutionJobVertex executionJobVertex) {
        int decideParallelismForVertex;
        if (executionJobVertex.isParallelismDecided()) {
            return;
        }
        Optional<List<BlockingResultInfo>> tryGetConsumedResultsInfo = tryGetConsumedResultsInfo(executionJobVertex);
        if (tryGetConsumedResultsInfo.isPresent()) {
            ForwardGroup forwardGroup = this.forwardGroupsByJobVertexId.get(executionJobVertex.getJobVertexId());
            if (forwardGroup == null || !forwardGroup.isParallelismDecided()) {
                decideParallelismForVertex = this.vertexParallelismDecider.decideParallelismForVertex(tryGetConsumedResultsInfo.get());
                if (forwardGroup != null) {
                    forwardGroup.setParallelism(decideParallelismForVertex);
                }
                this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {}.", new Object[]{executionJobVertex.getName(), executionJobVertex.getJobVertexId(), Integer.valueOf(decideParallelismForVertex)});
            } else {
                decideParallelismForVertex = forwardGroup.getParallelism();
                this.log.info("Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", new Object[]{executionJobVertex.getName(), executionJobVertex.getJobVertexId(), Integer.valueOf(decideParallelismForVertex)});
            }
            changeJobVertexParallelism(executionJobVertex, decideParallelismForVertex);
        }
    }

    private void changeJobVertexParallelism(ExecutionJobVertex executionJobVertex, int i) {
        executionJobVertex.getJobVertex().setParallelism(i);
        try {
            getExecutionGraph().setJsonPlan(JsonPlanGenerator.generatePlan(getJobGraph()));
        } catch (Throwable th) {
            this.log.warn("Cannot create JSON plan for job", th);
            getExecutionGraph().setJsonPlan("{}");
        }
        executionJobVertex.setParallelism(i);
    }

    private Optional<List<BlockingResultInfo>> tryGetConsumedResultsInfo(ExecutionJobVertex executionJobVertex) {
        ArrayList arrayList = new ArrayList();
        Iterator<? extends LogicalResult> it = this.logicalTopology.getVertex(executionJobVertex.getJobVertexId()).getConsumedResults().iterator();
        while (it.hasNext()) {
            DefaultLogicalResult defaultLogicalResult = (DefaultLogicalResult) it.next();
            if (!getExecutionJobVertex(defaultLogicalResult.getProducer2().getId()).isFinished()) {
                return Optional.empty();
            }
            IntermediateResult intermediateResult = getExecutionGraph().getAllIntermediateResults().get(defaultLogicalResult.getId());
            Preconditions.checkNotNull(intermediateResult);
            arrayList.add(BlockingResultInfo.createFromIntermediateResult(intermediateResult));
        }
        return Optional.of(arrayList);
    }

    private boolean canInitialize(ExecutionJobVertex executionJobVertex) {
        if (executionJobVertex.isInitialized() || !executionJobVertex.isParallelismDecided()) {
            return false;
        }
        Iterator<JobEdge> it = executionJobVertex.getJobVertex().getInputs().iterator();
        while (it.hasNext()) {
            ExecutionJobVertex jobVertex = getExecutionGraph().getJobVertex(it.next().getSource().getProducer().getID());
            Preconditions.checkNotNull(jobVertex);
            if (!jobVertex.isInitialized()) {
                return false;
            }
        }
        return true;
    }

    private void updateTopology(List<ExecutionJobVertex> list) {
        Iterator<ExecutionJobVertex> it = list.iterator();
        while (it.hasNext()) {
            initializeOperatorCoordinatorsFor(it.next());
        }
        getExecutionGraph().notifyNewlyInitializedJobVertices(list);
    }

    private void initializeOperatorCoordinatorsFor(ExecutionJobVertex executionJobVertex) {
        this.operatorCoordinatorHandler.registerAndStartNewCoordinators(executionJobVertex.getOperatorCoordinators(), getMainThreadExecutor());
    }

    @VisibleForTesting
    public static VertexParallelismStore computeVertexParallelismStoreForDynamicGraph(Iterable<JobVertex> iterable, int i) {
        return computeVertexParallelismStore(iterable, jobVertex -> {
            return jobVertex.getParallelism() > 0 ? Integer.valueOf(getDefaultMaxParallelism(jobVertex)) : Integer.valueOf(i);
        }, Function.identity());
    }
}
