java.lang.Object
org.elasticsearch.common.component.AbstractLifecycleComponent
org.elasticsearch.compute.operator.exchange.ExchangeService
All Implemented Interfaces:
Closeable, AutoCloseable, LifecycleComponent, org.elasticsearch.core.Releasable

public final class ExchangeService extends AbstractLifecycleComponent
ExchangeService is responsible for exchanging pages between exchange sinks and sources on the same or different nodes. It holds a map of ExchangeSinkHandler instances for each node in the cluster to serve ExchangeRequests To connect exchange sources to exchange sinks, use ExchangeSourceHandler.addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener).
  • Field Details

    • EXCHANGE_ACTION_NAME

      public static final String EXCHANGE_ACTION_NAME
      See Also:
    • EXCHANGE_ACTION_NAME_FOR_CCS

      public static final String EXCHANGE_ACTION_NAME_FOR_CCS
      See Also:
    • OPEN_EXCHANGE_ACTION_NAME

      public static final String OPEN_EXCHANGE_ACTION_NAME
      See Also:
    • BATCH_EXCHANGE_STATUS_ACTION_NAME

      public static final String BATCH_EXCHANGE_STATUS_ACTION_NAME
      See Also:
    • INACTIVE_SINKS_INTERVAL_SETTING

      public static final String INACTIVE_SINKS_INTERVAL_SETTING
      The time interval for an exchange sink handler to be considered inactive and subsequently removed from the exchange service if no sinks are attached (i.e., no computation uses that sink handler).
      See Also:
    • INACTIVE_SINKS_INTERVAL_DEFAULT

      public static final org.elasticsearch.core.TimeValue INACTIVE_SINKS_INTERVAL_DEFAULT
  • Constructor Details

  • Method Details

    • registerTransportHandler

      public void registerTransportHandler(TransportService transportService)
    • registerBatchExchangeServer

      public void registerBatchExchangeServer(String serverToClientId, BidirectionalBatchExchangeServer server)
      Register a bidirectional batch exchange server. The server will receive BatchExchangeStatusRequest messages for its exchangeId.
    • unregisterBatchExchangeServer

      public void unregisterBatchExchangeServer(String serverToClientId)
      Unregister a bidirectional batch exchange server.
    • createSinkHandler

      public ExchangeSinkHandler createSinkHandler(String exchangeId, int maxBufferSize)
      Creates an ExchangeSinkHandler for the specified exchange id.
      Throws:
      IllegalStateException - if a sink handler for the given id already exists
    • getSinkHandler

      public ExchangeSinkHandler getSinkHandler(String exchangeId)
      Returns an exchange sink handler for the given id.
    • getOrCreateSinkHandler

      public ExchangeSinkHandler getOrCreateSinkHandler(String exchangeId, int maxBufferSize)
      Gets an existing ExchangeSinkHandler for the specified exchange id, or creates one if it doesn't exist. This is useful when the sink handler may have been pre-registered (e.g., for test setup coordination).
    • finishSinkHandler

      public void finishSinkHandler(String exchangeId, @Nullable Exception failure)
      Removes the exchange sink handler associated with the given exchange id. W will abort the sink handler if the given failure is not null.
    • openExchange

      public static void openExchange(TransportService transportService, Transport.Connection connection, String sessionId, int exchangeBuffer, Executor responseExecutor, ActionListener<Void> listener)
      Opens a remote sink handler on the remote node for the given session ID.
    • addExchangeSourceHandler

      public void addExchangeSourceHandler(String sessionId, ExchangeSourceHandler sourceHandler)
      Remember the exchange source handler for the given session ID. This can be used for async/stop requests.
    • removeExchangeSourceHandler

      public ExchangeSourceHandler removeExchangeSourceHandler(String sessionId)
    • finishSessionEarly

      public void finishSessionEarly(String sessionId, ActionListener<Void> listener)
      Finishes the session early, i.e., before all sources are finished. It is called by async/stop API and should be called on the node that coordinates the async request. It will close all sources and return the results - unlike cancel, this does not discard the results.
    • newRemoteSink

      public RemoteSink newRemoteSink(Task parentTask, String exchangeId, TransportService transportService, Transport.Connection conn)
      Creates a new RemoteSink that fetches pages from an exchange sink located on the remote node.
      Parameters:
      parentTask - the parent task that initialized the ESQL request
      exchangeId - the exchange ID
      transportService - the transport service
      conn - the connection to the remote node where the remote exchange sink is located
    • sendBatchExchangeStatusRequest

      public static void sendBatchExchangeStatusRequest(TransportService transportService, Transport.Connection connection, String exchangeId, Executor responseExecutor, ActionListener<BatchExchangeStatusResponse> listener)
      Sends a batch exchange status request from client to server. The server will reply after batch processing completes.
    • isEmpty

      public boolean isEmpty()
    • sinkKeys

      public Set<String> sinkKeys()
    • doStart

      protected void doStart()
      Specified by:
      doStart in class AbstractLifecycleComponent
    • doStop

      protected void doStop()
      Specified by:
      doStop in class AbstractLifecycleComponent
    • doClose

      protected void doClose()
      Specified by:
      doClose in class AbstractLifecycleComponent
    • toString

      public String toString()
      Overrides:
      toString in class Object