Class BaseForExecOpParallelBindJoin<QueryType extends Query,MemberType extends FederationMember,ReqType extends DataRetrievalRequest,RespType extends DataRetrievalResponse<?>>
java.lang.Object
se.liu.ida.hefquin.engine.queryplan.executable.impl.ops.BaseForExecOps
se.liu.ida.hefquin.engine.queryplan.executable.impl.ops.UnaryExecutableOpBase
se.liu.ida.hefquin.engine.queryplan.executable.impl.ops.BaseForUnaryExecOpWithCollectedInput
se.liu.ida.hefquin.engine.queryplan.executable.impl.ops.BaseForExecOpParallelBindJoin<QueryType,MemberType,ReqType,RespType>
- All Implemented Interfaces:
StatsProvider,ExecutableOperator,UnaryExecutableOp
- Direct Known Subclasses:
BaseForExecOpParallelBindJoinSPARQL
public abstract class BaseForExecOpParallelBindJoin<QueryType extends Query,MemberType extends FederationMember,ReqType extends DataRetrievalRequest,RespType extends DataRetrievalResponse<?>>
extends BaseForUnaryExecOpWithCollectedInput
A generic implementation of a batch-based bind-join algorithm that issues
the bind-join requests without blocking, handling the processing of their
responses in parallel (in the threads that the federation access manager
uses to perform the requests).
The implementation is generic in the sense that it works with any form of
bind-join requests. Each concrete implementation that extends this base
class needs to implement the
createRequest(Set) method to create
the requests that are specific to that concrete implementation.
The algorithm works as follows: For every sequence of solution mappings
from the input, the algorithm splits this sequence into batches where
each such batch will then be used for a separate bind-join request. Each
such batch is associated with a sub-multiset of the input solution mappings
that are covered by the batch, whereas the batch itself consists of versions
of these input solution mappings that are already restricted to the join
variables (and that contain no blank nodes, see below). Hence, while the
number of such already-restricted solution mappings per batch is fixed
(see the batchSize argument of the constructor), the size of the
sub-multiset of input solution mappings associated with each batch may be
greater than the batch size.
After splitting the current sequence of input solution mappings into
batches, the last batch may not be full, in which case it is kept and
will be populated further once the next sequence of input solution mappings
is passed to the operator. The full batches are used to create bind-join
requests, one per batch. The response to such a request is the subset of
the solutions for the query/pattern of this operator that are join partners
for at least one of the solutions that were used for creating the request.
Each of the requests is issued using the asynchronous functionality of the
federation access manager, which results in a CompletableFuture.
The algorithm connects this future to a BaseForExecOpParallelBindJoin<QueryType extends Query,MemberType extends FederationMember,ReqType extends DataRetrievalRequest,RespType extends DataRetrievalResponse<?>>.MyResponseProcessor to
process the response once it arrives (joining the solution mappings from
the response with the solution mappings covered by the corresponding batch).
All these futures are collected such that the algorithm can wait for their
completion after the child operator has stopped producing input for this
operator.
This implementation is capable of separating out each input solution mapping
that assigns a blank node to any of the join variables. Then, such solution
mappings are not even considered when creating the requests because they
cannot have any join partners in the results obtained from the federation
member. Of course, in case the algorithm is used with outer-join semantics,
these solution mappings are still returned to the output (without joining
them with anything).
Another feature of this implementation is that it switches into a
full-retrieval mode as soon as there is an input solution mapping that
does not have a binding for any of the join variables (which may happen
only in cases in which none of the join variables is a certain variable).
Such an input solution mapping is compatible with (and, thus, can be joined
with) every solution mapping that the federation member has for the query /
pattern of this bind-join operator. Therefore, when switching into
full-retrieval mode, this implementation performs a request to retrieve
the complete set of all these solution mappings and, then, uses this set to
find join partners for the current and the future batches of input solution
mappings (because, with the complete set available locally, there is no need
anymore to issue further bind-join requests). This capability relies on the
createExecutableReqOpForAll() method that needs to be provided by
each concrete implementation that extends this base class.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected classSuch a response processor will obtain the result from a bind-join request and join that result with the solution mappings that have been covered by the batch used for creating the request. -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final booleanprotected final intThe number of solution mappings that this operator uses for each of the bind join requests.protected Set<org.apache.jena.sparql.engine.binding.Binding> This set is used to collect up the solution mappings for the currently- assembled batch, which will then be used for a bind-join request.static final intprotected final MemberTypeprotected Iterable<SolutionMapping> In case that this operator had to switch to full-retrieval mode, this one contains all solution mappings retrieved for the query of this operator.protected final List<CompletableFuture<?>> Used to collect the completable futures created for processing the batch requests and their responses.protected intprotected final QueryTypeprotected List<SolutionMapping> This list is used to collect up the input solution mappings (obtained from the child operator in the execution plan) that are covered by the currently-assembled batch of solution mappings, where that batch is incurrentBatchand will be used for a bind-join request.protected final List<List<SolutionMapping>> protected ExecutableOperatorStatsprotected final booleanprotected final Set<org.apache.jena.sparql.core.Var> Fields inherited from class se.liu.ida.hefquin.engine.queryplan.executable.impl.ops.BaseForUnaryExecOpWithCollectedInput
collectedInputSolMaps, minimumCollectionSizeFields inherited from class se.liu.ida.hefquin.engine.queryplan.executable.impl.ops.BaseForExecOps
collectExceptions, qpInfo -
Constructor Summary
ConstructorsConstructorDescriptionBaseForExecOpParallelBindJoin(QueryType query, Set<org.apache.jena.sparql.core.Var> varsInQuery, MemberType fm, ExpectedVariables inputVars, boolean useOuterJoinSemantics, int batchSize, boolean collectExceptions, QueryPlanningInfo qpInfo) -
Method Summary
Modifier and TypeMethodDescriptionprotected void_concludeExecution(List<SolutionMapping> inputSolMaps, IntermediateResultElementSink sink, ExecutionContext execCxt) Implementations of this function need to process the given solution mappings as last input, conclude the execution of this operator, and send the remaining result elements (if any) to the given sink.protected void_processCollectedInput(List<SolutionMapping> inputSolMaps, IntermediateResultElementSink sink, ExecutionContext execCxt) Implementations of this function need to process the given solution mappings as input and send the produced result elements (if any) to the given sink.protected booleanalreadyCovered(org.apache.jena.sparql.engine.binding.Binding inputSolMapRestricted) protected voidbatchUp(SolutionMapping inputSolMap, IntermediateResultElementSink sink, ExecutionContext execCxt) protected abstract NullaryExecutableOpImplementations of this function should create an executable operator that can perform a request to retrieve all solution mappings for the query of this operator (i.e., not a bind-join request).protected abstract ReqTypecreateRequest(Set<org.apache.jena.sparql.engine.binding.Binding> batch) Implementations of this function should create a bind-join request for the given batch of solution mappings.protected ExecutableOperatorStatsImplprotected abstract Iterable<SolutionMapping> extractSolMaps(RespType response) Implementations of this function should extract solution mappings from the given response (obtained via a bind-join request).protected voidMakes sure that all the input solution mappings collected so far are joined with the retrieved solution mappings infullResult; resulting solution mappings are send to the given sink.protected voidIssues a bind-join request for every *full* batch of solution mappings that has been created from the given input solution mappings up to this point.protected longinnerJoin(Iterable<SolutionMapping> left, Iterable<SolutionMapping> right, IntermediateResultElementSink sink) protected voidjoinInFullRetrievalMode(Iterable<SolutionMapping> batchOfSolMaps, IntermediateResultElementSink sink) protected voidjoinInFullRetrievalMode(SolutionMapping inputSolMap, IntermediateResultElementSink sink) protected longleftOuterJoin(Iterable<SolutionMapping> left, Iterable<SolutionMapping> right, IntermediateResultElementSink sink) protected voidobtainFullResult(ExecutionContext execCxt) Performs a request to retrieve all solution mappings for the query of this operator (seecreateExecutableReqOpForAll()) and puts the retrieved solution mappings intofullResult.protected voidrecordException(String msg, Exception cause) voidprotected voidPerforms a request to retrieve all solution mappings for the query of this operator (seecreateExecutableReqOpForAll()), puts the retrieved solution mappings intofullResult, and joins these retrieved solution mappings with all the input solution mappings collected so far; resulting solution mappings are send to the given sink.Methods inherited from class se.liu.ida.hefquin.engine.queryplan.executable.impl.ops.BaseForUnaryExecOpWithCollectedInput
_concludeExecution, _process, _processMethods inherited from class se.liu.ida.hefquin.engine.queryplan.executable.impl.ops.UnaryExecutableOpBase
concludeExecution, getStats, process, processMethods inherited from class se.liu.ida.hefquin.engine.queryplan.executable.impl.ops.BaseForExecOps
getExceptionsCaughtDuringExecution, getQueryPlanningInfo, recordExceptionCaughtDuringExecutionMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface se.liu.ida.hefquin.engine.queryplan.executable.ExecutableOperator
getExceptionsCaughtDuringExecution, getQueryPlanningInfo
-
Field Details
-
DEFAULT_BATCH_SIZE
public static final int DEFAULT_BATCH_SIZE- See Also:
-
query
-
fm
-
useOuterJoinSemantics
protected final boolean useOuterJoinSemantics -
varsInQuery
-
allJoinVarsAreCertain
protected final boolean allJoinVarsAreCertain -
batchSize
protected final int batchSizeThe number of solution mappings that this operator uses for each of the bind join requests. -
solMapsCoveredByCurrentBatch
This list is used to collect up the input solution mappings (obtained from the child operator in the execution plan) that are covered by the currently-assembled batch of solution mappings, where that batch is incurrentBatchand will be used for a bind-join request. Note that the solution mappings in this collection and the ones incurrentBatchare not necessarily the same; the ones incurrentBatchare versions of the ones in this collection, restricted to join variables. OncecurrentBatchis full (i.e., the number of solution mappings in it is equal tobatchSize), this list is appended tosolMapsCoveredPerBatchand then set back tonull, to be initialized to a new list when another input solution mapping arrives. -
solMapsCoveredPerBatch
-
currentBatch
This set is used to collect up the solution mappings for the currently- assembled batch, which will then be used for a bind-join request. These solution mappings will be created by restricting relevant input solution mappings (obtained from the child operator in the execution plan) to the join variables; i.e., projecting away the non-join variables, as the bindings for non-join variables do not need to be shipped in the bind- join requests. The corresponding input solution mappings from which the solution mappings in this set have been created are collected in parallel insolMapsCoveredByCurrentBatch. It is possible that multiple input solution mappings may result in the same restricted solution mapping. Once the number of solution mappings in this set is equal tobatchSize) (i.e., the current batch is complete), the set is appended tobatchesandcurrentBatchis set back tonull, to be initialized with a new set when another input solution mapping arrives. -
batches
-
futures
Used to collect the completable futures created for processing the batch requests and their responses. -
fullResult
In case that this operator had to switch to full-retrieval mode, this one contains all solution mappings retrieved for the query of this operator. -
numberOfRequestsUsed
protected int numberOfRequestsUsed -
requestDurationsInMS
-
numOfSolMapsRetrievedPerReq
-
statsOfFullRetrievalReqOp
-
-
Constructor Details
-
BaseForExecOpParallelBindJoin
public BaseForExecOpParallelBindJoin(QueryType query, Set<org.apache.jena.sparql.core.Var> varsInQuery, MemberType fm, ExpectedVariables inputVars, boolean useOuterJoinSemantics, int batchSize, boolean collectExceptions, QueryPlanningInfo qpInfo) - Parameters:
query- - the graph pattern (or other kind of query) to be evaluated (in a bind-join manner) at the federation member given as 'fm'varsInQuery- - the variables that occur in the 'query'fm- - the federation member targeted by this operatorinputVars- - the variables to be expected in the solution mappings that will be pushed as input to this operatoruseOuterJoinSemantics- -trueif the 'query' is to be evaluated under outer-join semantics;falsefor inner-join semanticsbatchSize- - the number of solution mappings to be included in each bind-join requestcollectExceptions- -trueif this operator has to collect exceptions (which is handled entirely by one of the super classes);falseif the operator should immediately throw everyExecOpExecutionException
-
-
Method Details
-
_processCollectedInput
protected void _processCollectedInput(List<SolutionMapping> inputSolMaps, IntermediateResultElementSink sink, ExecutionContext execCxt) throws ExecOpExecutionException Description copied from class:BaseForUnaryExecOpWithCollectedInputImplementations of this function need to process the given solution mappings as input and send the produced result elements (if any) to the given sink. If an exception occurs while processing the solution mappings, then this exception needs to be thrown.- Specified by:
_processCollectedInputin classBaseForUnaryExecOpWithCollectedInput- Throws:
ExecOpExecutionException
-
batchUp
protected void batchUp(SolutionMapping inputSolMap, IntermediateResultElementSink sink, ExecutionContext execCxt) throws ExecOpExecutionException - Throws:
ExecOpExecutionException
-
alreadyCovered
protected boolean alreadyCovered(org.apache.jena.sparql.engine.binding.Binding inputSolMapRestricted) -
_concludeExecution
protected void _concludeExecution(List<SolutionMapping> inputSolMaps, IntermediateResultElementSink sink, ExecutionContext execCxt) throws ExecOpExecutionException Description copied from class:BaseForUnaryExecOpWithCollectedInputImplementations of this function need to process the given solution mappings as last input, conclude the execution of this operator, and send the remaining result elements (if any) to the given sink. Notice that the list of solution mappings given here may contain fewer solution mappings than the minimum collection size, and it may even be empty! If an exception occurs during this process, then this exception needs to be thrown.- Specified by:
_concludeExecutionin classBaseForUnaryExecOpWithCollectedInput- Throws:
ExecOpExecutionException
-
initiateProcessingOfBatches
protected void initiateProcessingOfBatches(IntermediateResultElementSink sink, ExecutionContext execCxt) throws ExecOpExecutionException Issues a bind-join request for every *full* batch of solution mappings that has been created from the given input solution mappings up to this point. TheCompletableFutures obtained for issuing each of these requests is connected to aBaseForExecOpParallelBindJoin<QueryType extends Query,that will handle the processing of the response (joining the solution mappings from the response with the solution mappings covered by the corresponding batch.MemberType extends FederationMember, ReqType extends DataRetrievalRequest, RespType extends DataRetrievalResponse<?>>.MyResponseProcessor - Throws:
ExecOpExecutionException
-
recordException
-
innerJoin
protected long innerJoin(Iterable<SolutionMapping> left, Iterable<SolutionMapping> right, IntermediateResultElementSink sink) -
leftOuterJoin
protected long leftOuterJoin(Iterable<SolutionMapping> left, Iterable<SolutionMapping> right, IntermediateResultElementSink sink) -
createRequest
Implementations of this function should create a bind-join request for the given batch of solution mappings. Implementations can assume that the given solution mappings are already restricted to contain bindings only for the join variables, that none of the given solution mappings contains blank nodes, that none of the given solution mappings is the empty solution mapping, and that the given set of solution mappings is duplicate free and nonempty. -
extractSolMaps
protected abstract Iterable<SolutionMapping> extractSolMaps(RespType response) throws UnsupportedOperationDueToRetrievalError Implementations of this function should extract solution mappings from the given response (obtained via a bind-join request).- Throws:
UnsupportedOperationDueToRetrievalError- if the given response does not contain retrieved data but an indication of a data retrieval error
-
switchToFullRetrievalMode
protected void switchToFullRetrievalMode(ExecutionContext execCxt, IntermediateResultElementSink sink) throws ExecOpExecutionException Performs a request to retrieve all solution mappings for the query of this operator (seecreateExecutableReqOpForAll()), puts the retrieved solution mappings intofullResult, and joins these retrieved solution mappings with all the input solution mappings collected so far; resulting solution mappings are send to the given sink.- Throws:
ExecOpExecutionException
-
obtainFullResult
Performs a request to retrieve all solution mappings for the query of this operator (seecreateExecutableReqOpForAll()) and puts the retrieved solution mappings intofullResult.- Throws:
ExecOpExecutionException
-
handleCollectedSolMaps
Makes sure that all the input solution mappings collected so far are joined with the retrieved solution mappings infullResult; resulting solution mappings are send to the given sink. -
joinInFullRetrievalMode
protected void joinInFullRetrievalMode(Iterable<SolutionMapping> batchOfSolMaps, IntermediateResultElementSink sink) -
joinInFullRetrievalMode
protected void joinInFullRetrievalMode(SolutionMapping inputSolMap, IntermediateResultElementSink sink) -
createExecutableReqOpForAll
Implementations of this function should create an executable operator that can perform a request to retrieve all solution mappings for the query of this operator (i.e., not a bind-join request). The operator created by this function should throws exceptions instead of collecting them. -
resetStats
public void resetStats()- Specified by:
resetStatsin interfaceStatsProvider- Overrides:
resetStatsin classBaseForUnaryExecOpWithCollectedInput
-
createStats
- Overrides:
createStatsin classBaseForUnaryExecOpWithCollectedInput
-