6
6
package org .opensearch .knn .index .codec .KNN80Codec ;
7
7
8
8
import com .google .common .collect .ImmutableMap ;
9
- import lombok .NonNull ;
10
9
import lombok .extern .log4j .Log4j2 ;
11
10
import org .apache .lucene .store .ChecksumIndexInput ;
12
11
import org .opensearch .common .StopWatch ;
61
60
* This class writes the KNN docvalues to the segments
62
61
*/
63
62
@ Log4j2
64
- class KNN80DocValuesConsumer extends DocValuesConsumer implements Closeable {
63
+ public class KNN80DocValuesConsumer extends DocValuesConsumer implements Closeable {
65
64
66
65
private final Logger logger = LogManager .getLogger (KNN80DocValuesConsumer .class );
67
66
@@ -90,22 +89,14 @@ public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) th
90
89
}
91
90
92
91
private boolean isKNNBinaryFieldRequired (FieldInfo field ) {
93
- final KNNEngine knnEngine = getKNNEngine (field );
92
+ final KNNEngine knnEngine = KNNCodecUtil . getKNNEngine (field );
94
93
log .debug (String .format ("Read engine [%s] for field [%s]" , knnEngine .getName (), field .getName ()));
95
- return field .attributes ().containsKey (KNNVectorFieldMapper .KNN_FIELD )
94
+ // This value will not be set: field.getVectorDimension()
95
+ return field .getVectorDimension () <= 0
96
+ && field .attributes ().containsKey (KNNVectorFieldMapper .KNN_FIELD )
96
97
&& KNNEngine .getEnginesThatCreateCustomSegmentFiles ().stream ().anyMatch (engine -> engine == knnEngine );
97
98
}
98
99
99
- private KNNEngine getKNNEngine (@ NonNull FieldInfo field ) {
100
- final String modelId = field .attributes ().get (MODEL_ID );
101
- if (modelId != null ) {
102
- var model = ModelCache .getInstance ().get (modelId );
103
- return model .getModelMetadata ().getKnnEngine ();
104
- }
105
- final String engineName = field .attributes ().getOrDefault (KNNConstants .KNN_ENGINE , KNNEngine .DEFAULT .getName ());
106
- return KNNEngine .getEngine (engineName );
107
- }
108
-
109
100
public void addKNNBinaryField (FieldInfo field , DocValuesProducer valuesProducer , boolean isMerge , boolean isRefresh )
110
101
throws IOException {
111
102
// Get values to be indexed
@@ -123,7 +114,18 @@ public void addKNNBinaryField(FieldInfo field, DocValuesProducer valuesProducer,
123
114
}
124
115
// Increment counter for number of graph index requests
125
116
KNNCounter .GRAPH_INDEX_REQUESTS .increment ();
126
- final KNNEngine knnEngine = getKNNEngine (field );
117
+ if (isMerge ) {
118
+ recordMergeStats (pair .docs .length , arraySize );
119
+ }
120
+
121
+ if (isRefresh ) {
122
+ recordRefreshStats ();
123
+ }
124
+ createNativeIndex (state , field , pair );
125
+ }
126
+
127
+ public static void createNativeIndex (SegmentWriteState state , FieldInfo field , KNNCodecUtil .Pair pair ) throws IOException {
128
+ final KNNEngine knnEngine = KNNCodecUtil .getKNNEngine (field );
127
129
final String engineFileName = buildEngineFileName (
128
130
state .segmentInfo .name ,
129
131
knnEngine .getVersion (),
@@ -147,20 +149,12 @@ public void addKNNBinaryField(FieldInfo field, DocValuesProducer valuesProducer,
147
149
indexCreator = () -> createKNNIndexFromScratch (field , pair , knnEngine , indexPath );
148
150
}
149
151
150
- if (isMerge ) {
151
- recordMergeStats (pair .docs .length , arraySize );
152
- }
153
-
154
- if (isRefresh ) {
155
- recordRefreshStats ();
156
- }
157
-
158
152
// This is a bit of a hack. We have to create an output here and then immediately close it to ensure that
159
153
// engineFileName is added to the tracked files by Lucene's TrackingDirectoryWrapper. Otherwise, the file will
160
154
// not be marked as added to the directory.
161
155
state .directory .createOutput (engineFileName , state .context ).close ();
162
156
indexCreator .createIndex ();
163
- writeFooter (indexPath , engineFileName );
157
+ writeFooter (state , indexPath , engineFileName );
164
158
}
165
159
166
160
private void recordMergeStats (int length , long arraySize ) {
@@ -176,7 +170,7 @@ private void recordRefreshStats() {
176
170
KNNGraphValue .REFRESH_TOTAL_OPERATIONS .increment ();
177
171
}
178
172
179
- private void createKNNIndexFromTemplate (byte [] model , KNNCodecUtil .Pair pair , KNNEngine knnEngine , String indexPath ) {
173
+ private static void createKNNIndexFromTemplate (byte [] model , KNNCodecUtil .Pair pair , KNNEngine knnEngine , String indexPath ) {
180
174
Map <String , Object > parameters = ImmutableMap .of (
181
175
KNNConstants .INDEX_THREAD_QTY ,
182
176
KNNSettings .state ().getSettingValue (KNNSettings .KNN_ALGO_PARAM_INDEX_THREAD_QTY )
@@ -195,7 +189,7 @@ private void createKNNIndexFromTemplate(byte[] model, KNNCodecUtil.Pair pair, KN
195
189
});
196
190
}
197
191
198
- private void createKNNIndexFromScratch (FieldInfo fieldInfo , KNNCodecUtil .Pair pair , KNNEngine knnEngine , String indexPath )
192
+ private static void createKNNIndexFromScratch (FieldInfo fieldInfo , KNNCodecUtil .Pair pair , KNNEngine knnEngine , String indexPath )
199
193
throws IOException {
200
194
Map <String , Object > parameters = new HashMap <>();
201
195
Map <String , String > fieldAttributes = fieldInfo .attributes ();
@@ -295,7 +289,7 @@ private interface NativeIndexCreator {
295
289
void createIndex () throws IOException ;
296
290
}
297
291
298
- private void writeFooter (String indexPath , String engineFileName ) throws IOException {
292
+ private static void writeFooter (SegmentWriteState state , String indexPath , String engineFileName ) throws IOException {
299
293
// Opens the engine file that was created and appends a footer to it. The footer consists of
300
294
// 1. A Footer magic number (int - 4 bytes)
301
295
// 2. A checksum algorithm id (int - 4 bytes)
@@ -325,7 +319,7 @@ private void writeFooter(String indexPath, String engineFileName) throws IOExcep
325
319
os .close ();
326
320
}
327
321
328
- private boolean isChecksumValid (long value ) {
322
+ private static boolean isChecksumValid (long value ) {
329
323
// Check pulled from
330
324
// https://github.com/apache/lucene/blob/branch_9_0/lucene/core/src/java/org/apache/lucene/codecs/CodecUtil.java#L644-L647
331
325
return (value & CRC32_CHECKSUM_SANITY ) != 0 ;
0 commit comments