TaskTokenManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.manager;
import lombok.experimental.UtilityClass;
import lombok.extern.log4j.Log4j2;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
/**
* TaskTokenManager is responsible for managing semaphore tokens for different jobs.
* It provides a method to acquire a semaphore token for a specific job ID with the given maximum concurrency.
* If a semaphore doesn't exist for the job ID, it creates a new one and adds it to the map.
*/
@Log4j2
@UtilityClass
public class TaskTokenManager {
private static final Map<Long, Semaphore> taskTokenMap = new ConcurrentHashMap<>(16);
/**
* Tries to acquire a semaphore token for the specified job ID with the given maximum concurrency.
* If a semaphore doesn't exist for the job ID, it creates a new one and adds it to the map.
*
* @param jobId the ID of the job
* @param maxConcurrent the maximum concurrency for the job
* @return the acquired semaphore
*/
public static Semaphore tryAcquire(long jobId, long maxConcurrent) {
Semaphore semaphore = taskTokenMap.computeIfAbsent(jobId, id -> new Semaphore((int) maxConcurrent));
try {
semaphore.acquire();
} catch (InterruptedException e) {
log.warn("Interrupted while acquiring semaphore for job id: {} ", jobId, e);
Thread.currentThread().interrupt();
}
return semaphore;
}
}