Skip to content

Commit bfb7ecd

Browse files
committed
move prefixing with subgraph to neo4j creation stage
1 parent 583a2f8 commit bfb7ecd

File tree

4 files changed

+34
-23
lines changed

4 files changed

+34
-23
lines changed

02_assign_ids/grebi_assign_ids/src/main.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -118,15 +118,18 @@ fn main() {
118118
}
119119
}
120120

121+
writer.write_all("{\"grebi:nodeId\":\"".as_bytes()).unwrap();
122+
121123
let group = id_to_group.get(id.unwrap());
122-
if !group.is_some() {
123-
panic!("could not find identifier group for id: {}", String::from_utf8(id.unwrap().to_vec()).unwrap());
124+
if group.is_some() {
125+
writer.write_all(group.unwrap().as_slice()).unwrap();
126+
} else {
127+
writer.write_all(id.unwrap()).unwrap();
124128
}
125129

126-
writer.write_all("{\"grebi:nodeId\":\"".as_bytes()).unwrap();
127-
writer.write_all(group.unwrap().as_slice()).unwrap();
128130
writer.write_all("\"".as_bytes()).unwrap();
129131

132+
130133
json.rewind();
131134
while json.peek().kind != JsonTokenType::EndObject {
132135

02_assign_ids/grebi_identifiers2groups/src/main.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,15 @@ struct Args {
1818
#[arg(long)]
1919
add_group: Vec<String>,
2020

21-
#[arg(long)]
22-
add_prefix: String, // used to prepend the subgraph name like hra_kg:g:
23-
24-
#[arg(long)]
25-
prealloc_size: usize
2621
}
2722

2823

2924
fn main() {
3025

3126
let args = Args::parse();
3227

33-
let mut group_to_entities:HashMap<u64, HashSet<Vec<u8>>> = HashMap::with_capacity(args.prealloc_size);
34-
let mut entity_to_group:HashMap<Vec<u8>, u64> = HashMap::with_capacity(args.prealloc_size);
28+
let mut group_to_entities:HashMap<u64, HashSet<Vec<u8>>> = HashMap::new();
29+
let mut entity_to_group:HashMap<Vec<u8>, u64> = HashMap::new();
3530

3631
let mut next_group_id:u64 = 1;
3732

@@ -127,6 +122,12 @@ fn main() {
127122

128123
for group in group_to_entities {
129124

125+
if group.1.len() == 1 {
126+
// this is a unique id with no equivalences, no need to
127+
// write it as a group.
128+
continue;
129+
}
130+
130131
n2 = n2 + 1;
131132

132133
// writer.write_all("group_".as_bytes()).unwrap();
@@ -140,7 +141,6 @@ fn main() {
140141

141142
for entity in sorted_ids {
142143
if is_first_value {
143-
writer.write_all(&args.add_prefix.as_bytes()).unwrap();
144144
writer.write_all(entity.as_slice()).unwrap();
145145
writer.write_all("\t".as_bytes()).unwrap();
146146
is_first_value = false;

06_prepare_db_import/grebi_make_neo_csv/src/main.rs

+16-6
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,17 @@ struct Args {
4040

4141
#[arg(long)]
4242
out_id_edges_csv_path: String,
43+
44+
#[arg(long)]
45+
add_prefix: String, // used to prepend the subgraph name like hra_kg:g:
4346
}
4447

4548
fn main() -> std::io::Result<()> {
4649

4750
let args = Args::parse();
4851

52+
let add_prefix = args.add_prefix.as_bytes();
53+
4954
let start_time = std::time::Instant::now();
5055

5156

@@ -123,7 +128,7 @@ fn main() -> std::io::Result<()> {
123128

124129
let sliced = SlicedEntity::from_json(&line);
125130

126-
write_node(&line, &sliced, &all_entity_props, &mut nodes_writer, &mut id_edges_writer);
131+
write_node(&line, &sliced, &all_entity_props, &mut nodes_writer, &mut id_edges_writer, &add_prefix);
127132

128133
n_nodes = n_nodes + 1;
129134
if n_nodes % 1000000 == 0 {
@@ -146,7 +151,7 @@ fn main() -> std::io::Result<()> {
146151

147152
let sliced = SlicedEdge::from_json(&line);
148153

149-
write_edge(&line, sliced, &all_edge_props, &mut edges_writer);
154+
write_edge(&line, sliced, &all_edge_props, &mut edges_writer, &add_prefix);
150155

151156
n_edges = n_edges + 1;
152157
if n_edges % 1000000 == 0 {
@@ -165,12 +170,13 @@ fn main() -> std::io::Result<()> {
165170
Ok(())
166171
}
167172

168-
fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet<String>, nodes_writer:&mut BufWriter<&File>, id_edges_writer:&mut BufWriter<&File>) {
173+
fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet<String>, nodes_writer:&mut BufWriter<&File>, id_edges_writer:&mut BufWriter<&File>, add_prefix:&[u8]) {
169174

170175
let refs:Map<String,Value> = serde_json::from_slice(entity._refs.unwrap()).unwrap();
171176

172177
// grebi:nodeId
173178
nodes_writer.write_all(b"\"").unwrap();
179+
nodes_writer.write_all(&add_prefix).unwrap();
174180
write_escaped_value(entity.id, nodes_writer);
175181
nodes_writer.write_all(b"\",\"").unwrap();
176182

@@ -215,7 +221,7 @@ fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet<Stri
215221
if header_prop.as_bytes() == row_prop.key {
216222
if row_prop.key == "id".as_bytes() {
217223
for val in row_prop.values.iter() {
218-
write_id_row(val, id_edges_writer, &entity.id);
224+
write_id_row(val, id_edges_writer, &entity.id, &add_prefix);
219225
}
220226
}
221227
for val in row_prop.values.iter() {
@@ -246,17 +252,20 @@ fn write_node(src_line:&[u8], entity:&SlicedEntity, all_node_props:&HashSet<Stri
246252
nodes_writer.write_all(b"\n").unwrap();
247253
}
248254

249-
fn write_edge(src_line:&[u8], edge:SlicedEdge, all_edge_props:&HashSet<String>, edges_writer: &mut BufWriter<&File>) {
255+
fn write_edge(src_line:&[u8], edge:SlicedEdge, all_edge_props:&HashSet<String>, edges_writer: &mut BufWriter<&File>, add_prefix:&[u8]) {
250256

251257
let refs:Map<String,Value> = serde_json::from_slice(edge._refs.unwrap()).unwrap();
252258

253259
edges_writer.write_all(b"\"").unwrap();
260+
write_escaped_value(&add_prefix, edges_writer);
254261
write_escaped_value(edge.from, edges_writer);
255262
edges_writer.write_all(b"\",\"").unwrap();
256263
write_escaped_value(edge.edge_type, edges_writer);
257264
edges_writer.write_all(b"\",\"").unwrap();
265+
write_escaped_value(&add_prefix, edges_writer);
258266
write_escaped_value(edge.to, edges_writer);
259267
edges_writer.write_all(b"\",\"").unwrap();
268+
write_escaped_value(&add_prefix, edges_writer);
260269
write_escaped_value(edge.edge_id, edges_writer);
261270
edges_writer.write_all(b"\",\"").unwrap();
262271

@@ -340,7 +349,7 @@ fn parse_json_and_write(buf:&[u8], refs:&Map<String,Value>, writer:&mut BufWrite
340349
}
341350
}
342351

343-
fn write_id_row(val:&SlicedPropertyValue, id_edges_writer:&mut BufWriter<&File>, grebi_node_id:&[u8]) {
352+
fn write_id_row(val:&SlicedPropertyValue, id_edges_writer:&mut BufWriter<&File>, grebi_node_id:&[u8], add_prefix:&[u8]) {
344353

345354
let actual_id = {
346355
if val.kind == JsonTokenType::StartObject {
@@ -356,6 +365,7 @@ fn write_id_row(val:&SlicedPropertyValue, id_edges_writer:&mut BufWriter<&File>,
356365
};
357366

358367
id_edges_writer.write_all(b"\"").unwrap();
368+
write_escaped_value(&add_prefix, id_edges_writer);
359369
write_escaped_value(grebi_node_id, id_edges_writer);
360370
id_edges_writer.write_all(b"\",\"").unwrap();
361371
write_escaped_value(b"id", id_edges_writer);

nextflow/01_create_subgraph.nf

+3-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ params.config = "$GREBI_CONFIG"
1010
params.subgraph = "$GREBI_SUBGRAPH"
1111
params.timestamp = "$GREBI_TIMESTAMP"
1212
params.is_ebi = "$GREBI_IS_EBI"
13-
params.max_entities = "$GREBI_MAX_ENTITIES"
1413

1514
workflow {
1615

@@ -128,8 +127,6 @@ process build_equiv_groups {
128127
set -Eeuo pipefail
129128
cat ${identifiers_tsv} \
130129
| ${params.home}/target/release/grebi_identifiers2groups \
131-
--add-prefix ${params.subgraph}:g: \
132-
--prealloc-size ${params.max_entities} \
133130
${buildAddEquivGroupArgs(additional_equivalence_groups)} \
134131
> groups.txt
135132
"""
@@ -302,7 +299,8 @@ process prepare_neo {
302299
--in-edges-jsonl ${edges_jsonl} \
303300
--out-nodes-csv-path neo_nodes_${params.subgraph}_${task.index}.csv \
304301
--out-edges-csv-path neo_edges_${params.subgraph}_${task.index}.csv \
305-
--out-id-edges-csv-path neo_edges_ids_${params.subgraph}_${task.index}.csv
302+
--out-id-edges-csv-path neo_edges_ids_${params.subgraph}_${task.index}.csv \
303+
--add-prefix ${params.subgraph}:
306304
"""
307305
}
308306

@@ -406,7 +404,7 @@ process create_solr_nodes_core {
406404
process create_solr_edges_core {
407405
cache "lenient"
408406
memory "64 GB"
409-
time "3d"
407+
time "23h"
410408
cpus "16"
411409

412410
publishDir "${params.tmp}/${params.config}/${params.subgraph}/solr_cores", overwrite: true, saveAs: { filename -> filename.replace("solr/data/", "") }

0 commit comments

Comments
 (0)