Module org.elasticsearch.compute
Class ExchangeService
java.lang.Object
org.elasticsearch.common.component.AbstractLifecycleComponent
org.elasticsearch.compute.operator.exchange.ExchangeService
- All Implemented Interfaces:
Closeable,AutoCloseable,LifecycleComponent,org.elasticsearch.core.Releasable
ExchangeService is responsible for exchanging pages between exchange sinks and sources on the same or different nodes.
It holds a map of ExchangeSinkHandler instances for each node in the cluster to serve ExchangeRequests
To connect exchange sources to exchange sinks,
use ExchangeSourceHandler.addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener).-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final Stringstatic final Stringstatic final Stringstatic final org.elasticsearch.core.TimeValuestatic final StringThe time interval for an exchange sink handler to be considered inactive and subsequently removed from the exchange service if no sinks are attached (i.e., no computation uses that sink handler).static final StringFields inherited from class org.elasticsearch.common.component.AbstractLifecycleComponent
lifecycle -
Constructor Summary
ConstructorsConstructorDescriptionExchangeService(Settings settings, ThreadPool threadPool, String executorName, BlockFactory blockFactory) -
Method Summary
Modifier and TypeMethodDescriptionvoidaddExchangeSourceHandler(String sessionId, ExchangeSourceHandler sourceHandler) Remember the exchange source handler for the given session ID.createSinkHandler(String exchangeId, int maxBufferSize) Creates anExchangeSinkHandlerfor the specified exchange id.protected voiddoClose()protected voiddoStart()protected voiddoStop()voidfinishSessionEarly(String sessionId, ActionListener<Void> listener) Finishes the session early, i.e., before all sources are finished.voidfinishSinkHandler(String exchangeId, Exception failure) Removes the exchange sink handler associated with the given exchange id.getOrCreateSinkHandler(String exchangeId, int maxBufferSize) Gets an existingExchangeSinkHandlerfor the specified exchange id, or creates one if it doesn't exist.getSinkHandler(String exchangeId) Returns an exchange sink handler for the given id.booleanisEmpty()newRemoteSink(Task parentTask, String exchangeId, TransportService transportService, Transport.Connection conn) Creates a newRemoteSinkthat fetches pages from an exchange sink located on the remote node.static voidopenExchange(TransportService transportService, Transport.Connection connection, String sessionId, int exchangeBuffer, Executor responseExecutor, ActionListener<Void> listener) Opens a remote sink handler on the remote node for the given session ID.voidregisterBatchExchangeServer(String serverToClientId, BidirectionalBatchExchangeServer server) Register a bidirectional batch exchange server.voidregisterTransportHandler(TransportService transportService) removeExchangeSourceHandler(String sessionId) static voidsendBatchExchangeStatusRequest(TransportService transportService, Transport.Connection connection, String exchangeId, Executor responseExecutor, ActionListener<BatchExchangeStatusResponse> listener) Sends a batch exchange status request from client to server.sinkKeys()toString()voidunregisterBatchExchangeServer(String serverToClientId) Unregister a bidirectional batch exchange server.Methods inherited from class org.elasticsearch.common.component.AbstractLifecycleComponent
addLifecycleListener, close, lifecycleState, start, stop
-
Field Details
-
EXCHANGE_ACTION_NAME
- See Also:
-
EXCHANGE_ACTION_NAME_FOR_CCS
- See Also:
-
OPEN_EXCHANGE_ACTION_NAME
- See Also:
-
BATCH_EXCHANGE_STATUS_ACTION_NAME
- See Also:
-
INACTIVE_SINKS_INTERVAL_SETTING
The time interval for an exchange sink handler to be considered inactive and subsequently removed from the exchange service if no sinks are attached (i.e., no computation uses that sink handler).- See Also:
-
INACTIVE_SINKS_INTERVAL_DEFAULT
public static final org.elasticsearch.core.TimeValue INACTIVE_SINKS_INTERVAL_DEFAULT
-
-
Constructor Details
-
ExchangeService
public ExchangeService(Settings settings, ThreadPool threadPool, String executorName, BlockFactory blockFactory)
-
-
Method Details
-
registerTransportHandler
-
registerBatchExchangeServer
public void registerBatchExchangeServer(String serverToClientId, BidirectionalBatchExchangeServer server) Register a bidirectional batch exchange server. The server will receive BatchExchangeStatusRequest messages for its exchangeId. -
unregisterBatchExchangeServer
Unregister a bidirectional batch exchange server. -
createSinkHandler
Creates anExchangeSinkHandlerfor the specified exchange id.- Throws:
IllegalStateException- if a sink handler for the given id already exists
-
getSinkHandler
Returns an exchange sink handler for the given id. -
getOrCreateSinkHandler
Gets an existingExchangeSinkHandlerfor the specified exchange id, or creates one if it doesn't exist. This is useful when the sink handler may have been pre-registered (e.g., for test setup coordination). -
finishSinkHandler
Removes the exchange sink handler associated with the given exchange id. W will abort the sink handler if the given failure is not null. -
openExchange
public static void openExchange(TransportService transportService, Transport.Connection connection, String sessionId, int exchangeBuffer, Executor responseExecutor, ActionListener<Void> listener) Opens a remote sink handler on the remote node for the given session ID. -
addExchangeSourceHandler
Remember the exchange source handler for the given session ID. This can be used for async/stop requests. -
removeExchangeSourceHandler
-
finishSessionEarly
Finishes the session early, i.e., before all sources are finished. It is called by async/stop API and should be called on the node that coordinates the async request. It will close all sources and return the results - unlike cancel, this does not discard the results. -
newRemoteSink
public RemoteSink newRemoteSink(Task parentTask, String exchangeId, TransportService transportService, Transport.Connection conn) Creates a newRemoteSinkthat fetches pages from an exchange sink located on the remote node.- Parameters:
parentTask- the parent task that initialized the ESQL requestexchangeId- the exchange IDtransportService- the transport serviceconn- the connection to the remote node where the remote exchange sink is located
-
sendBatchExchangeStatusRequest
public static void sendBatchExchangeStatusRequest(TransportService transportService, Transport.Connection connection, String exchangeId, Executor responseExecutor, ActionListener<BatchExchangeStatusResponse> listener) Sends a batch exchange status request from client to server. The server will reply after batch processing completes. -
isEmpty
public boolean isEmpty() -
sinkKeys
-
doStart
protected void doStart()- Specified by:
doStartin classAbstractLifecycleComponent
-
doStop
protected void doStop()- Specified by:
doStopin classAbstractLifecycleComponent
-
doClose
protected void doClose()- Specified by:
doClosein classAbstractLifecycleComponent
-
toString
-