Class BaseForExecOpParallelBindJoin<QueryType extends Query,MemberType extends FederationMember,ReqType extends DataRetrievalRequest,RespType extends DataRetrievalResponse<?>>

All Implemented Interfaces:
StatsProvider, ExecutableOperator, UnaryExecutableOp
Direct Known Subclasses:
BaseForExecOpParallelBindJoinSPARQL

public abstract class BaseForExecOpParallelBindJoin<QueryType extends Query,MemberType extends FederationMember,ReqType extends DataRetrievalRequest,RespType extends DataRetrievalResponse<?>> extends BaseForUnaryExecOpWithCollectedInput
A generic implementation of a batch-based bind-join algorithm that issues the bind-join requests without blocking, handling the processing of their responses in parallel (in the threads that the federation access manager uses to perform the requests). The implementation is generic in the sense that it works with any form of bind-join requests. Each concrete implementation that extends this base class needs to implement the createRequest(Set) method to create the requests that are specific to that concrete implementation. The algorithm works as follows: For every sequence of solution mappings from the input, the algorithm splits this sequence into batches where each such batch will then be used for a separate bind-join request. Each such batch is associated with a sub-multiset of the input solution mappings that are covered by the batch, whereas the batch itself consists of versions of these input solution mappings that are already restricted to the join variables (and that contain no blank nodes, see below). Hence, while the number of such already-restricted solution mappings per batch is fixed (see the batchSize argument of the constructor), the size of the sub-multiset of input solution mappings associated with each batch may be greater than the batch size. After splitting the current sequence of input solution mappings into batches, the last batch may not be full, in which case it is kept and will be populated further once the next sequence of input solution mappings is passed to the operator. The full batches are used to create bind-join requests, one per batch. The response to such a request is the subset of the solutions for the query/pattern of this operator that are join partners for at least one of the solutions that were used for creating the request. Each of the requests is issued using the asynchronous functionality of the federation access manager, which results in a CompletableFuture. The algorithm connects this future to a BaseForExecOpParallelBindJoin<QueryType extends Query,MemberType extends FederationMember,ReqType extends DataRetrievalRequest,RespType extends DataRetrievalResponse<?>>.MyResponseProcessor to process the response once it arrives (joining the solution mappings from the response with the solution mappings covered by the corresponding batch). All these futures are collected such that the algorithm can wait for their completion after the child operator has stopped producing input for this operator. This implementation is capable of separating out each input solution mapping that assigns a blank node to any of the join variables. Then, such solution mappings are not even considered when creating the requests because they cannot have any join partners in the results obtained from the federation member. Of course, in case the algorithm is used with outer-join semantics, these solution mappings are still returned to the output (without joining them with anything). Another feature of this implementation is that it switches into a full-retrieval mode as soon as there is an input solution mapping that does not have a binding for any of the join variables (which may happen only in cases in which none of the join variables is a certain variable). Such an input solution mapping is compatible with (and, thus, can be joined with) every solution mapping that the federation member has for the query / pattern of this bind-join operator. Therefore, when switching into full-retrieval mode, this implementation performs a request to retrieve the complete set of all these solution mappings and, then, uses this set to find join partners for the current and the future batches of input solution mappings (because, with the complete set available locally, there is no need anymore to issue further bind-join requests). This capability relies on the createExecutableReqOpForAll() method that needs to be provided by each concrete implementation that extends this base class.
  • Field Details

    • DEFAULT_BATCH_SIZE

      public static final int DEFAULT_BATCH_SIZE
      See Also:
    • query

      protected final QueryType extends Query query
    • fm

      protected final MemberType extends FederationMember fm
    • useOuterJoinSemantics

      protected final boolean useOuterJoinSemantics
    • varsInQuery

      protected final Set<org.apache.jena.sparql.core.Var> varsInQuery
    • allJoinVarsAreCertain

      protected final boolean allJoinVarsAreCertain
    • batchSize

      protected final int batchSize
      The number of solution mappings that this operator uses for each of the bind join requests.
    • solMapsCoveredByCurrentBatch

      protected List<SolutionMapping> solMapsCoveredByCurrentBatch
      This list is used to collect up the input solution mappings (obtained from the child operator in the execution plan) that are covered by the currently-assembled batch of solution mappings, where that batch is in currentBatch and will be used for a bind-join request. Note that the solution mappings in this collection and the ones in currentBatch are not necessarily the same; the ones in currentBatch are versions of the ones in this collection, restricted to join variables. Once currentBatch is full (i.e., the number of solution mappings in it is equal to batchSize), this list is appended to solMapsCoveredPerBatch and then set back to null, to be initialized to a new list when another input solution mapping arrives.
    • solMapsCoveredPerBatch

      protected final List<List<SolutionMapping>> solMapsCoveredPerBatch
    • currentBatch

      protected Set<org.apache.jena.sparql.engine.binding.Binding> currentBatch
      This set is used to collect up the solution mappings for the currently- assembled batch, which will then be used for a bind-join request. These solution mappings will be created by restricting relevant input solution mappings (obtained from the child operator in the execution plan) to the join variables; i.e., projecting away the non-join variables, as the bindings for non-join variables do not need to be shipped in the bind- join requests. The corresponding input solution mappings from which the solution mappings in this set have been created are collected in parallel in solMapsCoveredByCurrentBatch. It is possible that multiple input solution mappings may result in the same restricted solution mapping. Once the number of solution mappings in this set is equal to batchSize) (i.e., the current batch is complete), the set is appended to batches and currentBatch is set back to null, to be initialized with a new set when another input solution mapping arrives.
    • batches

      protected final List<Set<org.apache.jena.sparql.engine.binding.Binding>> batches
    • futures

      protected final List<CompletableFuture<?>> futures
      Used to collect the completable futures created for processing the batch requests and their responses.
    • fullResult

      protected Iterable<SolutionMapping> fullResult
      In case that this operator had to switch to full-retrieval mode, this one contains all solution mappings retrieved for the query of this operator.
    • numberOfRequestsUsed

      protected int numberOfRequestsUsed
    • requestDurationsInMS

      protected List<Long> requestDurationsInMS
    • numOfSolMapsRetrievedPerReq

      protected List<Integer> numOfSolMapsRetrievedPerReq
    • statsOfFullRetrievalReqOp

      protected ExecutableOperatorStats statsOfFullRetrievalReqOp
  • Constructor Details

    • BaseForExecOpParallelBindJoin

      public BaseForExecOpParallelBindJoin(QueryType query, Set<org.apache.jena.sparql.core.Var> varsInQuery, MemberType fm, ExpectedVariables inputVars, boolean useOuterJoinSemantics, int batchSize, boolean collectExceptions, QueryPlanningInfo qpInfo)
      Parameters:
      query - - the graph pattern (or other kind of query) to be evaluated (in a bind-join manner) at the federation member given as 'fm'
      varsInQuery - - the variables that occur in the 'query'
      fm - - the federation member targeted by this operator
      inputVars - - the variables to be expected in the solution mappings that will be pushed as input to this operator
      useOuterJoinSemantics - - true if the 'query' is to be evaluated under outer-join semantics; false for inner-join semantics
      batchSize - - the number of solution mappings to be included in each bind-join request
      collectExceptions - - true if this operator has to collect exceptions (which is handled entirely by one of the super classes); false if the operator should immediately throw every ExecOpExecutionException
  • Method Details