Class AsyncExternalSourceOperatorFactory
java.lang.Object
org.elasticsearch.xpack.esql.datasources.AsyncExternalSourceOperatorFactory
- All Implemented Interfaces:
Describable,Operator.OperatorFactory,SourceOperator.SourceOperatorFactory
public class AsyncExternalSourceOperatorFactory
extends Object
implements SourceOperator.SourceOperatorFactory
Dual-mode async factory for creating source operators that read from external storage.
This factory automatically selects the optimal execution mode based on the FormatReader's capabilities:
- Sync Wrapper Mode: For simple formats (CSV, JSON) that don't have native async
support. The sync
FormatReader.read(org.elasticsearch.xpack.esql.datasources.spi.StorageObject, org.elasticsearch.xpack.esql.datasources.spi.FormatReadContext)method is wrapped in a background thread from the ES ThreadPool. - Native Async Mode: For async-capable formats (Parquet with parallel row groups)
that implement
FormatReader.readAsync(org.elasticsearch.xpack.esql.datasources.spi.StorageObject, org.elasticsearch.xpack.esql.datasources.spi.FormatReadContext, java.util.concurrent.Executor, org.elasticsearch.action.ActionListener<org.elasticsearch.compute.operator.CloseableIterator<org.elasticsearch.compute.data.Page>>). This avoids wrapper thread overhead by letting the reader control its own threading.
Key design principles:
- Simple things stay simple - CSV/JSON readers just implement sync read()
- Async when beneficial - Parquet can override readAsync() for parallel I/O
- ES ThreadPool integration - All executors come from ES, not standalone threads
- Backpressure via buffer - Uses
AsyncExternalSourceBufferwith waitForSpace()
- See Also:
-
Constructor Summary
ConstructorsConstructorDescriptionAsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, int rowLimit, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue) AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, int rowLimit, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue, ErrorPolicy errorPolicy) AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, int rowLimit, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue, ErrorPolicy errorPolicy, int parsingParallelism) AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, int rowLimit, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue, ErrorPolicy errorPolicy, int parsingParallelism, org.elasticsearch.core.TimeValue drainTimeout) AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, int rowLimit, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue, ErrorPolicy errorPolicy, int parsingParallelism, org.elasticsearch.core.TimeValue drainTimeout, List<Expression> pushedExpressions, FilterPushdownSupport pushdownSupport) AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, Executor executor) AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, Executor executor, FileList fileList) AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues) AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue) -
Method Summary
-
Constructor Details
-
AsyncExternalSourceOperatorFactory
public AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, int rowLimit, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue, ErrorPolicy errorPolicy, int parsingParallelism, org.elasticsearch.core.TimeValue drainTimeout, @Nullable List<Expression> pushedExpressions, @Nullable FilterPushdownSupport pushdownSupport) -
AsyncExternalSourceOperatorFactory
public AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, int rowLimit, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue, ErrorPolicy errorPolicy, int parsingParallelism, org.elasticsearch.core.TimeValue drainTimeout) -
AsyncExternalSourceOperatorFactory
public AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, int rowLimit, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue, ErrorPolicy errorPolicy, int parsingParallelism) -
AsyncExternalSourceOperatorFactory
public AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, int rowLimit, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue, ErrorPolicy errorPolicy) -
AsyncExternalSourceOperatorFactory
public AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, int rowLimit, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue) -
AsyncExternalSourceOperatorFactory
public AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues, ExternalSliceQueue sliceQueue) -
AsyncExternalSourceOperatorFactory
public AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, Executor executor, FileList fileList, Set<String> partitionColumnNames, Map<String, Object> partitionValues) -
AsyncExternalSourceOperatorFactory
public AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, Executor executor, FileList fileList) -
AsyncExternalSourceOperatorFactory
public AsyncExternalSourceOperatorFactory(StorageProvider storageProvider, FormatReader formatReader, StoragePath path, List<Attribute> attributes, int batchSize, int maxBufferSize, Executor executor)
-
-
Method Details
-
get
- Specified by:
getin interfaceOperator.OperatorFactory- Specified by:
getin interfaceSourceOperator.SourceOperatorFactory
-
describe
- Specified by:
describein interfaceDescribable
-
storageProvider
-
formatReader
-
path
-
attributes
-
batchSize
public int batchSize() -
maxBufferSize
public int maxBufferSize() -
rowLimit
public int rowLimit() -
executor
-
fileList
-
partitionColumnNames
-
partitionValues
-
sliceQueue
-
errorPolicy
-
parsingParallelism
public int parsingParallelism() -
drainTimeout
public org.elasticsearch.core.TimeValue drainTimeout()
-