Module org.elasticsearch.compute
Class BidirectionalBatchExchangeServer
java.lang.Object
org.elasticsearch.compute.operator.exchange.BidirectionalBatchExchangeServer
- All Implemented Interfaces:
Closeable,AutoCloseable,org.elasticsearch.core.Releasable
Server-side handler for bidirectional batch exchange.
The server:
- Receives batches from the client via clientToServer exchange (using ExchangeSource)
- Sends results to the client via serverToClient exchange (using ExchangeSink)
- Uses BatchDriver to process batches
- Sends empty marker page when batch completes (via onBatchDone callback)
Only one batch can be active at a time. BatchDriver will throw an exception if multiple batches are sent concurrently.
-
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final ExchangeServiceprotected final Executorprotected final intprotected final Stringprotected final Settingsprotected final Taskprotected final TransportService -
Constructor Summary
ConstructorsConstructorDescriptionBidirectionalBatchExchangeServer(String sessionId, String clientToServerId, String serverToClientId, ExchangeService exchangeService, Executor executor, int maxBufferSize, TransportService transportService, Task task, DiscoveryNode clientNode, Settings settings) Create a new BidirectionalBatchExchangeServer with explicit exchange IDs. -
Method Summary
Modifier and TypeMethodDescriptionprotected static StringbuildClientToServerId(String sessionId) Constructs the client-to-server exchange ID from the session ID.protected static StringbuildServerToClientId(String sessionId) Constructs the server-to-client exchange ID from the session ID.voidclose()protected voidconnectRemoteSink(DiscoveryNode node, String exchangeId, ExchangeSourceHandler sourceHandler, ActionListener<Void> listener, String errorMessagePrefix) Connects a remote sink to a source handler via transport.Get the source operator factory for use in planning.voidhandleBatchExchangeStatusRequest(BatchExchangeStatusRequest request, TransportChannel channel, Task task) Handle BatchExchangeStatusRequest from the client.voidstartWithOperators(DriverContext driverContext, ThreadContext threadContext, List<Operator> intermediateOperators, String clusterName, org.elasticsearch.core.Releasable releasable, ActionListener<Void> readyListener) Start batch processing with the intermediate operators.
-
Field Details
-
sessionId
-
exchangeService
-
executor
-
maxBufferSize
protected final int maxBufferSize -
transportService
-
task
-
settings
-
-
Constructor Details
-
BidirectionalBatchExchangeServer
public BidirectionalBatchExchangeServer(String sessionId, String clientToServerId, String serverToClientId, ExchangeService exchangeService, Executor executor, int maxBufferSize, TransportService transportService, Task task, DiscoveryNode clientNode, Settings settings) Create a new BidirectionalBatchExchangeServer with explicit exchange IDs. CallstartWithOperators(DriverContext, ThreadContext, List, String, Releasable, ActionListener)to complete setup and start processing after planning is complete
-
-
Method Details
-
getSourceOperatorFactory
Get the source operator factory for use in planning. This can be called after construction to get the factory before calling startWithOperators.- Returns:
- the source operator factory
-
startWithOperators
public void startWithOperators(DriverContext driverContext, ThreadContext threadContext, List<Operator> intermediateOperators, String clusterName, org.elasticsearch.core.Releasable releasable, ActionListener<Void> readyListener) Start batch processing with the intermediate operators. This must be called after planning is complete. -
handleBatchExchangeStatusRequest
public void handleBatchExchangeStatusRequest(BatchExchangeStatusRequest request, TransportChannel channel, Task task) Handle BatchExchangeStatusRequest from the client. Called by ExchangeService's singleton handler which routes requests to the appropriate server.The server stores the response channel BEFORE starting processing, ensuring it can always reply if an error occurs. Processing only starts after this request is received.
-
close
public void close() -
buildClientToServerId
Constructs the client-to-server exchange ID from the session ID. -
buildServerToClientId
Constructs the server-to-client exchange ID from the session ID. -
connectRemoteSink
protected void connectRemoteSink(DiscoveryNode node, String exchangeId, ExchangeSourceHandler sourceHandler, ActionListener<Void> listener, String errorMessagePrefix) Connects a remote sink to a source handler via transport. This is a common pattern used by both server and client to establish transport-based connections for bidirectional exchange.Always uses failFast=true so the source handler aborts immediately on sink failure. The caller collects the real error via the listener and an
EsqlRefCountingListenerwhoseFailureCollectorpicks it over the genericTaskCancelledExceptionthrown by the aborted source.
-