SplitSource.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.common.UserException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* If there are many files, splitting these files into scan ranges will consume a lot of time.
* Even the simplest queries(e.g. select * from large_table limit 1) can get stuck or crash due to the split process.
* Furthermore, during the splitting process, the backend did not do anything.
* It is completely possible to split files whiling scanning data on the ready splits at once.
* `SplitSource` introduce a lazy and batch mode to provide the file splits. Each `SplitSource` has a unique ID,
* which is used by backends to call `FrontendServiceImpl#fetchSplitBatch` to fetch splits batch by batch.
* `SplitSource`s are managed by `SplitSourceManager`, which stores `SplitSource` as a weak reference, and clean
* the split source when its related scan node is GC.
*/
public class SplitSource {
private static final AtomicLong UNIQUE_ID_GENERATOR = new AtomicLong(0);
private static final long WAIT_TIME_OUT = 100; // 100ms
private final long uniqueId;
private final Backend backend;
private final SplitAssignment splitAssignment;
private final AtomicBoolean isLastBatch;
private final long maxWaitTime;
public SplitSource(Backend backend, SplitAssignment splitAssignment, long maxWaitTime) {
this.uniqueId = UNIQUE_ID_GENERATOR.getAndIncrement();
this.backend = backend;
this.splitAssignment = splitAssignment;
this.maxWaitTime = maxWaitTime;
this.isLastBatch = new AtomicBoolean(false);
splitAssignment.registerSource(uniqueId);
}
public long getUniqueId() {
return uniqueId;
}
/**
* Get the next batch of file splits. If there's no more split, return empty list.
*/
public List<TScanRangeLocations> getNextBatch(int maxBatchSize) throws UserException {
if (isLastBatch.get()) {
return Collections.emptyList();
}
List<TScanRangeLocations> scanRanges = Lists.newArrayListWithExpectedSize(maxBatchSize);
long startTime = System.currentTimeMillis();
while (scanRanges.size() < maxBatchSize && System.currentTimeMillis() - startTime < maxWaitTime) {
BlockingQueue<Collection<TScanRangeLocations>> splits = splitAssignment.getAssignedSplits(backend);
if (splits == null) {
isLastBatch.set(true);
break;
}
while (scanRanges.size() < maxBatchSize) {
try {
Collection<TScanRangeLocations> splitCollection = splits.poll(WAIT_TIME_OUT, TimeUnit.MILLISECONDS);
if (splitCollection != null) {
scanRanges.addAll(splitCollection);
}
if (!scanRanges.isEmpty() && System.currentTimeMillis() - startTime > maxWaitTime) {
return scanRanges;
}
if (splitCollection == null) {
break;
}
} catch (InterruptedException e) {
throw new UserException(e.getMessage(), e);
}
}
}
if (scanRanges.isEmpty() && !isLastBatch.get()) {
// This is timeout
throw new UserException("Timeout. Max wait time(ms): " + maxWaitTime);
}
return scanRanges;
}
}