Class BidirectionalBatchExchangeClient

java.lang.Object
org.elasticsearch.compute.operator.exchange.BidirectionalBatchExchangeClient
All Implemented Interfaces:
Closeable, AutoCloseable, org.elasticsearch.core.Releasable

public final class BidirectionalBatchExchangeClient extends Object
Client-side handler for bidirectional batch exchange.

The client:

  • Sends batches to the server via clientToServer exchange (using ExchangeSink)
  • Receives results from the server via serverToClient exchange (using ExchangeSource)
  • Tracks batch completion via markBatchCompleted(long)
  • Field Details

    • sessionId

      protected final String sessionId
    • exchangeService

      protected final ExchangeService exchangeService
    • executor

      protected final Executor executor
    • maxBufferSize

      protected final int maxBufferSize
    • transportService

      protected final TransportService transportService
    • task

      protected final Task task
    • settings

      protected final Settings settings
  • Constructor Details

    • BidirectionalBatchExchangeClient

      public BidirectionalBatchExchangeClient(String sessionId, ExchangeService exchangeService, Executor executor, int maxBufferSize, TransportService transportService, Task task, ActionListener<Void> batchExchangeStatusListener, Settings settings, BidirectionalBatchExchangeClient.ServerSetupCallback serverSetupCallback, @Nullable BiConsumer<String,String> lookupPlanConsumer, int maxWorkers, Supplier<DiscoveryNode> serverNodeSupplier)
      Create a new BidirectionalBatchExchangeClient.
      Parameters:
      sessionId - session ID for the client
      exchangeService - the exchange service
      executor - executor for async operations
      maxBufferSize - maximum buffer size for exchanges
      transportService - transport service for transport-based remote sink
      task - task for transport-based remote sink
      batchExchangeStatusListener - listener that will be called when batch exchange status is received (success or failure)
      settings - settings for exchange configuration
      serverSetupCallback - callback to send setup request when a new worker is connected
      lookupPlanConsumer - optional callback to receive (workerKey, planString) from server setup response
      maxWorkers - maximum number of workers (parallel connections) to create
      serverNodeSupplier - supplier for getting server nodes for new workers
  • Method Details

    • getServerToClientSourceHandler

      public ExchangeSourceHandler getServerToClientSourceHandler()
      Get the server-to-client source handler. Can be used to connect the server's sink by calling addRemoteSink() directly.
    • getSessionId

      public String getSessionId()
      Get the session ID (streaming session ID) used by this client.
      Returns:
      the session ID
    • hasFailed

      public boolean hasFailed()
      Check if the exchange client has failed.
      Returns:
      true if a failure has occurred, false otherwise
    • getPrimaryFailure

      public Exception getPrimaryFailure()
      Returns the highest-priority failure, or null if no failure has occurred.
    • getWorkerCount

      public int getWorkerCount()
      Get the number of workers in the pool. Used for testing to verify parallel worker distribution.
      Returns:
      the number of workers created
    • pollPage

      public Page pollPage()
      Polls a page from the cache. The consumer should call this to retrieve pages and check the page's BatchMetadata to detect batch completion via page.batchMetadata().isLastPageInBatch().

      With failFast=true, the exchange source may throw TaskCancelledException when aborted. We catch it here and return null — the real error will arrive via the response coordinator and notifyFailure(), which sets the operator's failure field.

      Returns:
      the next page, or null if no pages are available or exchange is aborted
    • waitForPage

      public IsBlockedResult waitForPage()
      Returns an IsBlockedResult that resolves when a page is available or when finished.

      If the exchange source is aborted (failFast=true), catches TaskCancelledException and blocks on failureNotified until the real error is available.

      Returns:
      NOT_BLOCKED if a page is available or finished, otherwise a blocked result
    • waitUntilPageReady

      public IsBlockedResult waitUntilPageReady()
      Blocks until a page is ready to be polled OR the exchange is finished. This guarantees that when NOT_BLOCKED is returned, either: This prevents busy-spinning when pages arrive out of order in multi-node scenarios.

      If the exchange source is aborted (failFast=true), catches TaskCancelledException and blocks on failureNotified until the real error is available.

      Returns:
      NOT_BLOCKED if a page is ready or finished, otherwise a blocked result
    • isExchangeDone

      public boolean isExchangeDone()
      Returns true if the exchange is done (upstream finished and no buffered pages).

      With failFast=true, the exchange source throws TaskCancelledException when aborted. We catch it and return false — the exchange is not "done", it's aborting.

    • isFinished

      public boolean isFinished()
      Returns true if the client has fully finished - all batches complete AND all worker responses were received. The worker responses confirm whether each worker's server succeeded or failed. If any worker failed, hasFailed() will return true and the failure can be retrieved.
    • waitForServerResponse

      public IsBlockedResult waitForServerResponse()
      Returns an IsBlockedResult that resolves when all worker channels have completed (all sink + status refs released). Use this to block while waiting for all workers' success/failure confirmation.
    • bufferedPageCount

      public int bufferedPageCount()
      Returns the current number of buffered pages.
    • hasReadyPages

      public boolean hasReadyPages()
      Returns true if there are pages ready to be output (in correct order). Unlike bufferedPageCount(), this only returns true when pages are actually ready for consumption, not when they're buffered waiting for out-of-order pages to arrive.
    • getSortedSource

      public BatchSortedExchangeSource getSortedSource()
      Returns the sorted source for diagnostic purposes (e.g. toString in timeout messages).
    • sendPage

      public void sendPage(Page page)
      Send a Page with BatchMetadata for processing. The worker is selected using least-loaded assignment strategy. Workers are lazily initialized as needed up to maxWorkers. The batchId should be monotonically increasing for each call. Currently, only single-page batches are supported, so isLastPageInBatch must always be true. Called only from the single-threaded Driver, so no synchronization needed.
      Parameters:
      page - the page with BatchMetadata to send
    • sendBatchMarker

      public void sendBatchMarker(long batchId)
      Send a marker page to signal batch completion for an empty batch. Called only from the single-threaded Driver, so no synchronization needed.
      Parameters:
      batchId - the batch ID
    • markBatchCompleted

      public void markBatchCompleted(long batchId)
      Mark a batch as completed. Called by the consumer when it finishes processing a batch. This decrements the pending count for the worker that processed the batch.
      Parameters:
      batchId - the completed batch ID
    • finish

      public void finish()
      Finish all client-to-server exchanges (no more batches will be sent to any worker).

      If there are pending worker connections (setup request sent but connection not yet established), this method will defer finishing until all connections are established. This prevents workers from finishing prematurely, which would cause the server-to-client exchange to close before all worker connections are added.

    • finishCollectingResponseHeaders

      public void finishCollectingResponseHeaders()
      Applies collected response headers (e.g. warnings from server-side drivers) to the current thread's context. Must be called on the coordinator driver thread (typically from the operator's close() method) so that DriverRunner can propagate them.
    • close

      public void close()
      Closes the client. The driver guarantees all workers have completed (via waitForServerResponse() blocking on allWorkersCompleted) before calling this method, so cleanup is purely synchronous.
    • buildClientToServerId

      protected static String buildClientToServerId(String sessionId)
      Constructs the client-to-server exchange ID from the session ID.
    • buildServerToClientId

      protected static String buildServerToClientId(String sessionId)
      Constructs the server-to-client exchange ID from the session ID.
    • connectRemoteSink

      protected void connectRemoteSink(DiscoveryNode node, String exchangeId, ExchangeSourceHandler sourceHandler, ActionListener<Void> listener, String errorMessagePrefix)
      Connects a remote sink to a source handler via transport. This is a common pattern used by both server and client to establish transport-based connections for bidirectional exchange.

      Always uses failFast=true so the source handler aborts immediately on sink failure. The caller collects the real error via the listener and an EsqlRefCountingListener whose FailureCollector picks it over the generic TaskCancelledException thrown by the aborted source.