Skip to content

Commit e76f594

Browse files
MB-60844: MapReduce for PreSearch (blevesearch#1999)
- Instead of Merging all the PreSearch Results in one shot at the main coordinator node of an alias tree, merge them incrementally at each level of the tree instead, which would balance the reduction process across all the indexes in a distributed Bleve index, leading to a more even memory distribution. --------- Co-authored-by: Abhinav Dangeti <abhinav@couchbase.com>
1 parent a1e4a0e commit e76f594

6 files changed

Lines changed: 143 additions & 80 deletions

File tree

index_alias_impl.go

Lines changed: 41 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -163,16 +163,16 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
163163
return nil, ErrorAliasEmpty
164164
}
165165
if _, ok := ctx.Value(search.PreSearchKey).(bool); ok {
166-
// since presearchKey is set, it means that the request
167-
// is being executed as part of a presearch, which
166+
// since preSearchKey is set, it means that the request
167+
// is being executed as part of a preSearch, which
168168
// indicates that this index alias is set as an Index
169-
// in another alias, so we need to do a presearch search
169+
// in another alias, so we need to do a preSearch search
170170
// and NOT a real search
171171
return preSearchDataSearch(ctx, req, i.indexes...)
172172
}
173173

174174
// at this point we know we are doing a real search
175-
// either after a presearch is done, or directly
175+
// either after a preSearch is done, or directly
176176
// on the alias
177177

178178
// check if request has preSearchData which would indicate that the
@@ -203,9 +203,9 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
203203
// check if preSearchData needs to be gathered from all indexes
204204
// before executing the query
205205
var err error
206-
// only perform presearch if
206+
// only perform preSearch if
207207
// - the request does not already have preSearchData
208-
// - the request requires presearch
208+
// - the request requires preSearch
209209
var preSearchDuration time.Duration
210210
var sr *SearchResult
211211
if req.PreSearchData == nil && preSearchRequired(req) {
@@ -214,15 +214,19 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
214214
if err != nil {
215215
return nil, err
216216
}
217-
// check if the presearch result has any errors and if so
217+
// check if the preSearch result has any errors and if so
218218
// return the search result as is without executing the query
219219
// so that the errors are not lost
220-
if preSearchResult.Status.Failed > 0 {
220+
if preSearchResult.Status.Failed > 0 || len(preSearchResult.Status.Errors) > 0 {
221221
return preSearchResult, nil
222222
}
223+
// finalize the preSearch result now
224+
finalizePreSearchResult(req, preSearchResult)
223225

224-
// if there are no errors, then merge the data in the presearch result
225-
preSearchResult = mergePreSearchResult(req, preSearchResult, i.indexes)
226+
// if there are no errors, then merge the data in the preSearch result
227+
// and construct the preSearchData to be used in the actual search
228+
// if the request is satisfied by the preSearch result, then we can
229+
// directly return the preSearch result as the final result
226230
if requestSatisfiedByPreSearch(req) {
227231
sr = finalizeSearchResult(req, preSearchResult)
228232
// no need to run the 2nd phase MultiSearch(..)
@@ -235,7 +239,7 @@ func (i *indexAliasImpl) SearchInContext(ctx context.Context, req *SearchRequest
235239
preSearchDuration = time.Since(searchStart)
236240
}
237241

238-
// check if search result was generated as part of presearch itself
242+
// check if search result was generated as part of preSearch itself
239243
if sr == nil {
240244
sr, err = MultiSearch(ctx, req, preSearchData, i.indexes...)
241245
if err != nil {
@@ -533,13 +537,7 @@ func preSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*Sear
533537
return preSearchDataSearch(newCtx, dummyRequest, indexes...)
534538
}
535539

536-
func tagHitsWithIndexName(sr *SearchResult, indexName string) {
537-
for _, hit := range sr.Hits {
538-
hit.IndexNames = append(hit.IndexNames, indexName)
539-
}
540-
}
541-
542-
// if the request is satisfied by just the presearch result,
540+
// if the request is satisfied by just the preSearch result,
543541
// finalize the result and return it directly without
544542
// performing multi search
545543
func finalizeSearchResult(req *SearchRequest, preSearchResult *SearchResult) *SearchResult {
@@ -551,7 +549,7 @@ func finalizeSearchResult(req *SearchRequest, preSearchResult *SearchResult) *Se
551549
preSearchResult.Total = uint64(preSearchResult.Hits.Len())
552550
maxScore := float64(0)
553551
for i, hit := range preSearchResult.Hits {
554-
// since we are now using the presearch result as the final result
552+
// since we are now using the preSearch result as the final result
555553
// we can discard the indexNames from the hits as they are no longer
556554
// relevant.
557555
hit.IndexNames = nil
@@ -587,14 +585,6 @@ func finalizeSearchResult(req *SearchRequest, preSearchResult *SearchResult) *Se
587585
return preSearchResult
588586
}
589587

590-
func mergePreSearchResult(req *SearchRequest, res *SearchResult,
591-
indexes []Index) *SearchResult {
592-
if requestHasKNN(req) {
593-
res.Hits = mergeKNNDocumentMatches(req, res.Hits)
594-
}
595-
return res
596-
}
597-
598588
func requestSatisfiedByPreSearch(req *SearchRequest) bool {
599589
if requestHasKNN(req) && isKNNrequestSatisfiedByPreSearch(req) {
600590
return true
@@ -609,7 +599,7 @@ func constructPreSearchData(req *SearchRequest, preSearchResult *SearchResult, i
609599
}
610600
var err error
611601
if requestHasKNN(req) {
612-
mergedOut, err = constructKnnPresearchData(mergedOut, preSearchResult, indexes)
602+
mergedOut, err = constructKnnPreSearchData(mergedOut, preSearchResult, indexes)
613603
if err != nil {
614604
return nil, err
615605
}
@@ -619,50 +609,53 @@ func constructPreSearchData(req *SearchRequest, preSearchResult *SearchResult, i
619609

620610
func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Index) (*SearchResult, error) {
621611
asyncResults := make(chan *asyncSearchResult, len(indexes))
622-
623612
// run search on each index in separate go routine
624613
var waitGroup sync.WaitGroup
625-
626614
var searchChildIndex = func(in Index, childReq *SearchRequest) {
627615
rv := asyncSearchResult{Name: in.Name()}
628616
rv.Result, rv.Err = in.SearchInContext(ctx, childReq)
629617
asyncResults <- &rv
630618
waitGroup.Done()
631619
}
632-
633620
waitGroup.Add(len(indexes))
634621
for _, in := range indexes {
635622
go searchChildIndex(in, createChildSearchRequest(req, nil))
636623
}
637-
638624
// on another go routine, close after finished
639625
go func() {
640626
waitGroup.Wait()
641627
close(asyncResults)
642628
}()
643-
629+
// the final search result to be returned after combining the preSearch results
644630
var sr *SearchResult
631+
// the preSearch result processor
632+
var prp preSearchResultProcessor
633+
// error map
645634
indexErrors := make(map[string]error)
646-
647635
for asr := range asyncResults {
648636
if asr.Err == nil {
637+
// a valid preSearch result
638+
if prp == nil {
639+
// first valid preSearch result
640+
// create a new preSearch result processor
641+
prp = createPreSearchResultProcessor(req)
642+
}
643+
prp.add(asr.Result, asr.Name)
649644
if sr == nil {
650645
// first result
651-
sr = asr.Result
652-
tagHitsWithIndexName(sr, asr.Name)
646+
sr = &SearchResult{
647+
Status: asr.Result.Status,
648+
Cost: asr.Result.Cost,
649+
}
653650
} else {
654651
// merge with previous
655-
tagHitsWithIndexName(asr.Result, asr.Name)
656-
sr.Merge(asr.Result)
652+
sr.Status.Merge(asr.Result.Status)
653+
sr.Cost += asr.Result.Cost
657654
}
658655
} else {
659656
indexErrors[asr.Name] = asr.Err
660657
}
661658
}
662-
663-
// merge just concatenated all the hits
664-
// now lets clean it up
665-
666659
// handle case where no results were successful
667660
if sr == nil {
668661
sr = &SearchResult{
@@ -671,16 +664,12 @@ func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Ind
671664
},
672665
}
673666
}
674-
675-
// in presearch partial results are not allowed as it can lead to
667+
// in preSearch, partial results are not allowed as it can lead to
676668
// the real search giving incorrect results, and hence the search
677-
// result is reset.
678-
// discard partial hits if some child index has failed or
679-
// if some child alias has returned partial results.
669+
// result is not populated with any of the processed data from
670+
// the preSearch result processor if there are any errors
671+
// or the preSearch result status has any failures
680672
if len(indexErrors) > 0 || sr.Status.Failed > 0 {
681-
sr = &SearchResult{
682-
Status: sr.Status,
683-
}
684673
if sr.Status.Errors == nil {
685674
sr.Status.Errors = make(map[string]error)
686675
}
@@ -689,8 +678,9 @@ func preSearchDataSearch(ctx context.Context, req *SearchRequest, indexes ...Ind
689678
sr.Status.Total++
690679
sr.Status.Failed++
691680
}
681+
} else {
682+
prp.finalize(sr)
692683
}
693-
694684
return sr, nil
695685
}
696686

index_impl.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,6 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
549549
ctx = context.WithValue(ctx, search.GeoBufferPoolCallbackKey,
550550
search.GeoBufferPoolCallbackFunc(getBufferPool))
551551

552-
553552
searcher, err := req.Query.Searcher(ctx, indexReader, i.m, search.SearcherOptions{
554553
Explain: req.Explain,
555554
IncludeTermVectors: req.IncludeLocations || req.Highlight != nil,

pre_search.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright (c) 2024 Couchbase, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package bleve
16+
17+
// A preSearchResultProcessor processes the data in
18+
// the preSearch result from multiple
19+
// indexes in an alias and merges them together to
20+
// create the final preSearch result
21+
type preSearchResultProcessor interface {
22+
// adds the preSearch result to the processor
23+
add(*SearchResult, string)
24+
// updates the final search result with the finalized
25+
// data from the processor
26+
finalize(*SearchResult)
27+
}
28+
29+
type knnPreSearchResultProcessor struct {
30+
addFn func(sr *SearchResult, indexName string)
31+
finalizeFn func(sr *SearchResult)
32+
}
33+
34+
func (k *knnPreSearchResultProcessor) add(sr *SearchResult, indexName string) {
35+
if k.addFn != nil {
36+
k.addFn(sr, indexName)
37+
}
38+
}
39+
40+
func (k *knnPreSearchResultProcessor) finalize(sr *SearchResult) {
41+
if k.finalizeFn != nil {
42+
k.finalizeFn(sr)
43+
}
44+
}
45+
46+
// -----------------------------------------------------------------------------
47+
48+
func finalizePreSearchResult(req *SearchRequest, preSearchResult *SearchResult) {
49+
if requestHasKNN(req) {
50+
preSearchResult.Hits = finalizeKNNResults(req, preSearchResult.Hits)
51+
}
52+
}
53+
54+
func createPreSearchResultProcessor(req *SearchRequest) preSearchResultProcessor {
55+
if requestHasKNN(req) {
56+
return newKnnPreSearchResultProcessor(req)
57+
}
58+
return &knnPreSearchResultProcessor{} // equivalent to nil
59+
}

search/scorer/scorer_knn.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func NewKNNQueryScorer(queryVector []float32, queryField string, queryBoost floa
7171

7272
// Score used when the knnMatch.Score = 0 ->
7373
// the query and indexed vector are exactly the same.
74-
const maxKNNScore = math.MaxFloat64
74+
const maxKNNScore = math.MaxFloat32
7575

7676
func (sqs *KNNQueryScorer) Score(ctx *search.SearchContext,
7777
knnMatch *index.VectorDoc) *search.DocumentMatch {

search_knn.go

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,9 @@ func (i *indexImpl) runKnnCollector(ctx context.Context, req *SearchRequest, rea
294294
}
295295
knnHits := knnCollector.Results()
296296
if !preSearch {
297-
knnHits = finalizeKNNResults(req, knnHits, len(req.KNN))
297+
knnHits = finalizeKNNResults(req, knnHits)
298298
}
299-
// at this point, irrespective of whether it is a presearch or not,
299+
// at this point, irrespective of whether it is a preSearch or not,
300300
// the knn hits are populated with Sort and Fields.
301301
// it must be ensured downstream that the Sort and Fields are not
302302
// re-evaluated, for these hits.
@@ -324,13 +324,13 @@ func setKnnHitsInCollector(knnHits []*search.DocumentMatch, req *SearchRequest,
324324
}
325325
}
326326

327-
func finalizeKNNResults(req *SearchRequest, knnHits []*search.DocumentMatch, numKNNQueries int) []*search.DocumentMatch {
327+
func finalizeKNNResults(req *SearchRequest, knnHits []*search.DocumentMatch) []*search.DocumentMatch {
328328
// if the KNN operator is AND, then we need to filter out the hits that
329329
// do not have match the KNN queries.
330330
if req.KNNOperator == knnOperatorAnd {
331331
idx := 0
332332
for _, hit := range knnHits {
333-
if len(hit.ScoreBreakdown) == numKNNQueries {
333+
if len(hit.ScoreBreakdown) == len(req.KNN) {
334334
knnHits[idx] = hit
335335
idx++
336336
}
@@ -362,22 +362,6 @@ func finalizeKNNResults(req *SearchRequest, knnHits []*search.DocumentMatch, num
362362
return knnHits
363363
}
364364

365-
func mergeKNNDocumentMatches(req *SearchRequest, knnHits []*search.DocumentMatch) []*search.DocumentMatch {
366-
kArray := make([]int64, len(req.KNN))
367-
for i, knnReq := range req.KNN {
368-
kArray[i] = knnReq.K
369-
}
370-
knnStore := collector.GetNewKNNCollectorStore(kArray)
371-
for _, hit := range knnHits {
372-
knnStore.AddDocument(hit)
373-
}
374-
// passing nil as the document fixup function, because we don't need to
375-
// fixup the document, since this was already done in the first phase.
376-
// hence error is always nil.
377-
mergedKNNhits, _ := knnStore.Final(nil)
378-
return finalizeKNNResults(req, mergedKNNhits, len(req.KNN))
379-
}
380-
381365
// when we are setting KNN hits in the preSearchData, we need to make sure that
382366
// the KNN hit goes to the right index. This is because the KNN hits are
383367
// collected from all the indexes in the alias, but the preSearchData is
@@ -433,7 +417,7 @@ func requestHasKNN(req *SearchRequest) bool {
433417
}
434418

435419
// returns true if the search request contains a KNN request that can be
436-
// satisfied by just performing a presearch, completely bypassing the
420+
// satisfied by just performing a preSearch, completely bypassing the
437421
// actual search.
438422
func isKNNrequestSatisfiedByPreSearch(req *SearchRequest) bool {
439423
// if req.Query is not match_none => then we need to go to phase 2
@@ -456,7 +440,7 @@ func isKNNrequestSatisfiedByPreSearch(req *SearchRequest) bool {
456440
return true
457441
}
458442

459-
func constructKnnPresearchData(mergedOut map[string]map[string]interface{}, preSearchResult *SearchResult,
443+
func constructKnnPreSearchData(mergedOut map[string]map[string]interface{}, preSearchResult *SearchResult,
460444
indexes []Index) (map[string]map[string]interface{}, error) {
461445

462446
distributedHits, err := validateAndDistributeKNNHits([]*search.DocumentMatch(preSearchResult.Hits), indexes)
@@ -511,3 +495,30 @@ func redistributeKNNPreSearchData(req *SearchRequest, indexes []Index) (map[stri
511495
}
512496
return rv, nil
513497
}
498+
499+
func newKnnPreSearchResultProcessor(req *SearchRequest) *knnPreSearchResultProcessor {
500+
kArray := make([]int64, len(req.KNN))
501+
for i, knnReq := range req.KNN {
502+
kArray[i] = knnReq.K
503+
}
504+
knnStore := collector.GetNewKNNCollectorStore(kArray)
505+
return &knnPreSearchResultProcessor{
506+
addFn: func(sr *SearchResult, indexName string) {
507+
for _, hit := range sr.Hits {
508+
// tag the hit with the index name, so that when the
509+
// final search result is constructed, the hit will have
510+
// a valid path to follow along the alias tree to reach
511+
// the index.
512+
hit.IndexNames = append(hit.IndexNames, indexName)
513+
knnStore.AddDocument(hit)
514+
}
515+
},
516+
finalizeFn: func(sr *SearchResult) {
517+
// passing nil as the document fixup function, because we don't need to
518+
// fixup the document, since this was already done in the first phase,
519+
// hence error is always nil.
520+
// the merged knn hits are finalized and set in the search result.
521+
sr.Hits, _ = knnStore.Final(nil)
522+
},
523+
}
524+
}

0 commit comments

Comments
 (0)