Class BidirectionalBatchExchangeClient
- All Implemented Interfaces:
Closeable,AutoCloseable,org.elasticsearch.core.Releasable
The client:
- Sends batches to the server via clientToServer exchange (using ExchangeSink)
- Receives results from the server via serverToClient exchange (using ExchangeSource)
- Tracks batch completion via
markBatchCompleted(long)
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interfaceFunctional interface for server setup callback. -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final ExchangeServiceprotected final Executorprotected final intprotected final Stringprotected final Settingsprotected final Taskprotected final TransportService -
Constructor Summary
ConstructorsConstructorDescriptionBidirectionalBatchExchangeClient(String sessionId, ExchangeService exchangeService, Executor executor, int maxBufferSize, TransportService transportService, Task task, ActionListener<Void> batchExchangeStatusListener, Settings settings, BidirectionalBatchExchangeClient.ServerSetupCallback serverSetupCallback, BiConsumer<String, String> lookupPlanConsumer, int maxWorkers, Supplier<DiscoveryNode> serverNodeSupplier) Create a new BidirectionalBatchExchangeClient. -
Method Summary
Modifier and TypeMethodDescriptionintReturns the current number of buffered pages.protected static StringbuildClientToServerId(String sessionId) Constructs the client-to-server exchange ID from the session ID.protected static StringbuildServerToClientId(String sessionId) Constructs the server-to-client exchange ID from the session ID.voidclose()Closes the client.protected voidconnectRemoteSink(DiscoveryNode node, String exchangeId, ExchangeSourceHandler sourceHandler, ActionListener<Void> listener, String errorMessagePrefix) Connects a remote sink to a source handler via transport.voidfinish()Finish all client-to-server exchanges (no more batches will be sent to any worker).voidApplies collected response headers (e.g.Returns the highest-priority failure, or null if no failure has occurred.Get the server-to-client source handler.Get the session ID (streaming session ID) used by this client.Returns the sorted source for diagnostic purposes (e.g.intGet the number of workers in the pool.booleanCheck if the exchange client has failed.booleanReturns true if there are pages ready to be output (in correct order).booleanReturns true if the exchange is done (upstream finished and no buffered pages).booleanReturns true if the client has fully finished - all batches complete AND all worker responses were received.voidmarkBatchCompleted(long batchId) Mark a batch as completed.pollPage()Polls a page from the cache.voidsendBatchMarker(long batchId) Send a marker page to signal batch completion for an empty batch.voidSend a Page with BatchMetadata for processing.Returns anIsBlockedResultthat resolves when a page is available or when finished.Returns anIsBlockedResultthat resolves when all worker channels have completed (all sink + status refs released).Blocks until a page is ready to be polled OR the exchange is finished.
-
Field Details
-
sessionId
-
exchangeService
-
executor
-
maxBufferSize
protected final int maxBufferSize -
transportService
-
task
-
settings
-
-
Constructor Details
-
BidirectionalBatchExchangeClient
public BidirectionalBatchExchangeClient(String sessionId, ExchangeService exchangeService, Executor executor, int maxBufferSize, TransportService transportService, Task task, ActionListener<Void> batchExchangeStatusListener, Settings settings, BidirectionalBatchExchangeClient.ServerSetupCallback serverSetupCallback, @Nullable BiConsumer<String, String> lookupPlanConsumer, int maxWorkers, Supplier<DiscoveryNode> serverNodeSupplier) Create a new BidirectionalBatchExchangeClient.- Parameters:
sessionId- session ID for the clientexchangeService- the exchange serviceexecutor- executor for async operationsmaxBufferSize- maximum buffer size for exchangestransportService- transport service for transport-based remote sinktask- task for transport-based remote sinkbatchExchangeStatusListener- listener that will be called when batch exchange status is received (success or failure)settings- settings for exchange configurationserverSetupCallback- callback to send setup request when a new worker is connectedlookupPlanConsumer- optional callback to receive (workerKey, planString) from server setup responsemaxWorkers- maximum number of workers (parallel connections) to createserverNodeSupplier- supplier for getting server nodes for new workers
-
-
Method Details
-
getServerToClientSourceHandler
Get the server-to-client source handler. Can be used to connect the server's sink by calling addRemoteSink() directly. -
getSessionId
Get the session ID (streaming session ID) used by this client.- Returns:
- the session ID
-
hasFailed
public boolean hasFailed()Check if the exchange client has failed.- Returns:
- true if a failure has occurred, false otherwise
-
getPrimaryFailure
Returns the highest-priority failure, or null if no failure has occurred. -
getWorkerCount
public int getWorkerCount()Get the number of workers in the pool. Used for testing to verify parallel worker distribution.- Returns:
- the number of workers created
-
pollPage
Polls a page from the cache. The consumer should call this to retrieve pages and check the page's BatchMetadata to detect batch completion viapage.batchMetadata().isLastPageInBatch().With failFast=true, the exchange source may throw TaskCancelledException when aborted. We catch it here and return null — the real error will arrive via the response coordinator and notifyFailure(), which sets the operator's failure field.
- Returns:
- the next page, or null if no pages are available or exchange is aborted
-
waitForPage
Returns anIsBlockedResultthat resolves when a page is available or when finished.If the exchange source is aborted (failFast=true), catches TaskCancelledException and blocks on
failureNotifieduntil the real error is available.- Returns:
- NOT_BLOCKED if a page is available or finished, otherwise a blocked result
-
waitUntilPageReady
Blocks until a page is ready to be polled OR the exchange is finished. This guarantees that when NOT_BLOCKED is returned, either:hasReadyPages()== true (pollPage will return a page)isExchangeDone()== true (no more pages expected)
If the exchange source is aborted (failFast=true), catches TaskCancelledException and blocks on
failureNotifieduntil the real error is available.- Returns:
- NOT_BLOCKED if a page is ready or finished, otherwise a blocked result
-
isExchangeDone
public boolean isExchangeDone()Returns true if the exchange is done (upstream finished and no buffered pages).With failFast=true, the exchange source throws TaskCancelledException when aborted. We catch it and return false — the exchange is not "done", it's aborting.
-
isFinished
public boolean isFinished()Returns true if the client has fully finished - all batches complete AND all worker responses were received. The worker responses confirm whether each worker's server succeeded or failed. If any worker failed, hasFailed() will return true and the failure can be retrieved. -
waitForServerResponse
Returns anIsBlockedResultthat resolves when all worker channels have completed (all sink + status refs released). Use this to block while waiting for all workers' success/failure confirmation. -
bufferedPageCount
public int bufferedPageCount()Returns the current number of buffered pages. -
hasReadyPages
public boolean hasReadyPages()Returns true if there are pages ready to be output (in correct order). UnlikebufferedPageCount(), this only returns true when pages are actually ready for consumption, not when they're buffered waiting for out-of-order pages to arrive. -
getSortedSource
Returns the sorted source for diagnostic purposes (e.g. toString in timeout messages). -
sendPage
Send a Page with BatchMetadata for processing. The worker is selected using least-loaded assignment strategy. Workers are lazily initialized as needed up to maxWorkers. The batchId should be monotonically increasing for each call. Currently, only single-page batches are supported, so isLastPageInBatch must always be true. Called only from the single-threaded Driver, so no synchronization needed.- Parameters:
page- the page with BatchMetadata to send
-
sendBatchMarker
public void sendBatchMarker(long batchId) Send a marker page to signal batch completion for an empty batch. Called only from the single-threaded Driver, so no synchronization needed.- Parameters:
batchId- the batch ID
-
markBatchCompleted
public void markBatchCompleted(long batchId) Mark a batch as completed. Called by the consumer when it finishes processing a batch. This decrements the pending count for the worker that processed the batch.- Parameters:
batchId- the completed batch ID
-
finish
public void finish()Finish all client-to-server exchanges (no more batches will be sent to any worker).If there are pending worker connections (setup request sent but connection not yet established), this method will defer finishing until all connections are established. This prevents workers from finishing prematurely, which would cause the server-to-client exchange to close before all worker connections are added.
-
finishCollectingResponseHeaders
public void finishCollectingResponseHeaders()Applies collected response headers (e.g. warnings from server-side drivers) to the current thread's context. Must be called on the coordinator driver thread (typically from the operator'sclose()method) so thatDriverRunnercan propagate them. -
close
public void close()Closes the client. The driver guarantees all workers have completed (viawaitForServerResponse()blocking onallWorkersCompleted) before calling this method, so cleanup is purely synchronous. -
buildClientToServerId
Constructs the client-to-server exchange ID from the session ID. -
buildServerToClientId
Constructs the server-to-client exchange ID from the session ID. -
connectRemoteSink
protected void connectRemoteSink(DiscoveryNode node, String exchangeId, ExchangeSourceHandler sourceHandler, ActionListener<Void> listener, String errorMessagePrefix) Connects a remote sink to a source handler via transport. This is a common pattern used by both server and client to establish transport-based connections for bidirectional exchange.Always uses failFast=true so the source handler aborts immediately on sink failure. The caller collects the real error via the listener and an
EsqlRefCountingListenerwhoseFailureCollectorpicks it over the genericTaskCancelledExceptionthrown by the aborted source.
-