Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a version check before calling http.socket_timeout #32

Merged
merged 8 commits into from
Jan 21, 2019
233 changes: 121 additions & 112 deletions hetio/neo4j.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,28 @@
from operator import or_
from functools import reduce

import py2neo
import py2neo.packages.httpstream
import pandas
from tqdm import tqdm

import hetio.hetnet

# Get py2neo version
PY2NEO_VER = int(py2neo.__version__[0])

# Avoid SocketError
py2neo.packages.httpstream.http.socket_timeout = 1e8
def import_py2neo():
"""
Imports the py2neo library, checks its version, and sets the socket timeout if necessary
"""
import py2neo
# Get py2neo version
PY2NEO_VER = int(py2neo.__version__[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we do PY2NEO_VER = tuple(int(x) for x in py2neo.__version__.split('.')). Then if PY2NEO_VER < (4, 0). This is more resilient if major version number becomes double digits.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it's probably best to just use

import pkg_resources
version = pkg_resources.parse_version(py2neo.__version__)
if version._version.release[0] < 4:
    # blah

https://setuptools.readthedocs.io/en/latest/pkg_resources.html#parsing-utilities

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And perhaps import_py2neo should return py2neo, version_tuple where version_tuple is version._version.release. Then this can be used in other functions.

if PY2NEO_VER < 4:
import py2neo.packages.httpstream
# Avoid SocketError
py2neo.packages.httpstream.http.socket_timeout = 1e8
return py2neo

def export_neo4j(graph, uri, node_queue=200, edge_queue=5, show_progress=False):
"""Export hetnet to neo4j"""
py2neo = import_py2neo()

if isinstance(uri, py2neo.Graph):
db_graph = uri
Expand Down Expand Up @@ -89,6 +96,10 @@ def append(self, x):
self.create()

def create(self):
import py2neo

PY2NEO_VER = int(py2neo.__version__[0])

if not self:
return

Expand Down Expand Up @@ -162,43 +173,16 @@ def cypher_path(metarels):
q += '{dir0}[:{rel_type}]{dir1}(n{i}{target_label})'.format(**kwargs)
return q

def construct_dwpc_query(metarels, property='name', join_hint='midpoint', index_hint=False, unique_nodes=True):
def construct_degree_clause(metarels):
"""
Create a cypher query for computing the *DWPC* for a type of path.
Create a Cypher query clause that calculates the degree of each node

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to just save the whitespace changes for the other PR that modifies the PDP / DWPC functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing

Parameters
----------
metarels : a metarels or MetaPath object
the metapath (path type) to create a query for
property : str
which property to use for soure and target node lookup
join_hint : 'midpoint', bool, or int
whether to add a join hint to tell neo4j to traverse form both ends of
the path and join at a specific index. `'midpoint'` or `True` specifies
joining at the middle node in the path (rounded down if an even number
of nodes). `False` specifies not to add a join hint. An int specifies
the node to join on.
index_hint : bool
whether to add index hints which specifies the properties of the source
and target nodes to use for lookup. Enabling both `index_hint` and
`join_hint` can cause the query to fail.
unique_nodes : bool or str
whether to exclude paths with duplicate nodes. To not enforce node
uniqueness, use `False`. Methods for enforcing node uniqueness are:
`nested` the path-length independent query (`ALL (x IN nodes(path) WHERE size(filter(z IN nodes(path) WHERE z = x)) = 1)`)
`expanded` for the combinatorial and path-length dependent form (`NOT (n0=n1 OR n0=n2 OR n0=n3 OR n1=n2 OR n1=n3 OR n2=n3)`).
`labeled` to perform an intelligent version of `expanded` where only
nodes with the same label are checked for duplicity. Specifying `True`,
which is the default, uses the `labeled` method.
the metapath to create the clause for
"""
# Convert metapath to metarels
if isinstance(metarels, hetio.hetnet.MetaPath):
metarels = metapath_to_metarels(metarels)

# create cypher path query
metapath_query = cypher_path(metarels)

# create cypher path degree query
degree_strs = list()
for i, (source_label, target_label, rel_type, direction) in enumerate(metarels):
kwargs = {
Expand All @@ -217,6 +201,26 @@ def construct_dwpc_query(metarels, property='name', join_hint='midpoint', index_
).format(**kwargs))
degree_query = ',\n'.join(degree_strs)

return degree_query

def construct_using_clause(metarels, join_hint, index_hint):
"""
Create a Cypher query clause that gives the planner hints to speed up the query
Parameters
----------
metarels : a metarels or MetaPath object
the metapath to create the clause for
join_hint : 'midpoint', bool, or int
whether to add a join hint to tell neo4j to traverse form both ends of
the path and join at a specific index. `'midpoint'` or `True` specifies
joining at the middle node in the path (rounded down if an even number
of nodes). `False` specifies not to add a join hint. An int specifies
the node to join on.
index_hint : bool
whether to add index hints which specifies the properties of the source
and target nodes to use for lookup. Enabling both `index_hint` and
`join_hint` can cause the query to fail.
"""
using_query = ''
# Specify index hint for node lookup
if index_hint:
Expand All @@ -239,7 +243,24 @@ def construct_dwpc_query(metarels, property='name', join_hint='midpoint', index_
assert join_hint <= len(metarels)
using_query += "\nUSING JOIN ON n{}".format(join_hint)

# Unique node constraint (pevent paths with duplicate nodes)
return using_query

def construct_unique_nodes_clause(metarels, unique_nodes):
"""
Create a Cypher query clause that gives the planner hints to speed up the query
Parameters
----------
metarels : a metarels or MetaPath object
the metapath to create the clause for
unique_nodes : bool or str
whether to exclude paths with duplicate nodes. To not enforce node
uniqueness, use `False`. Methods for enforcing node uniqueness are:
`nested` the path-length independent query (`ALL (x IN nodes(path) WHERE size(filter(z IN nodes(path) WHERE z = x)) = 1)`)
`expanded` for the combinatorial and path-length dependent form (`NOT (n0=n1 OR n0=n2 OR n0=n3 OR n1=n2 OR n1=n3 OR n2=n3)`).
`labeled` to perform an intelligent version of `expanded` where only
nodes with the same label are checked for duplicity. Specifying `True`,
which is the default, uses the `labeled` method.
"""
if unique_nodes == 'nested':
unique_nodes_query = '\nAND ALL (x IN nodes(path) WHERE size(filter(z IN nodes(path) WHERE z = x)) = 1)'
elif unique_nodes == 'expanded':
Expand All @@ -259,6 +280,52 @@ def construct_dwpc_query(metarels, property='name', join_hint='midpoint', index_
assert unique_nodes is False
unique_nodes_query = ''

return unique_nodes_query

def construct_dwpc_query(metarels, property='name', join_hint='midpoint', index_hint=False, unique_nodes=True):
"""
Create a cypher query for computing the *DWPC* for a type of path.

Parameters
----------
metarels : a metarels or MetaPath object
the metapath (path type) to create a query for
property : str
which property to use for soure and target node lookup
join_hint : 'midpoint', bool, or int
whether to add a join hint to tell neo4j to traverse form both ends of
the path and join at a specific index. `'midpoint'` or `True` specifies
joining at the middle node in the path (rounded down if an even number
of nodes). `False` specifies not to add a join hint. An int specifies
the node to join on.
index_hint : bool
whether to add index hints which specifies the properties of the source
and target nodes to use for lookup. Enabling both `index_hint` and
`join_hint` can cause the query to fail.
unique_nodes : bool or str
whether to exclude paths with duplicate nodes. To not enforce node
uniqueness, use `False`. Methods for enforcing node uniqueness are:
`nested` the path-length independent query (`ALL (x IN nodes(path) WHERE size(filter(z IN nodes(path) WHERE z = x)) = 1)`)
`expanded` for the combinatorial and path-length dependent form (`NOT (n0=n1 OR n0=n2 OR n0=n3 OR n1=n2 OR n1=n3 OR n2=n3)`).
`labeled` to perform an intelligent version of `expanded` where only
nodes with the same label are checked for duplicity. Specifying `True`,
which is the default, uses the `labeled` method.
"""
# Convert metapath to metarels
if isinstance(metarels, hetio.hetnet.MetaPath):
metarels = metapath_to_metarels(metarels)

# create cypher path query
metapath_query = cypher_path(metarels)

# create cypher path degree query
degree_query = construct_degree_clause(metarels)

using_query = construct_using_clause(metarels, join_hint, index_hint)

# Unique node constraint (pevent paths with duplicate nodes)
unique_nodes_query = construct_unique_nodes_clause(metarels, unique_nodes)

# combine cypher fragments into a single query and add DWPC logic
query = textwrap.dedent('''\
MATCH path = {metapath_query}{using_query}
Expand Down Expand Up @@ -323,65 +390,12 @@ def construct_pdp_query(metarels, dwpc=None, property='name', join_hint='midpoin
metapath_query = cypher_path(metarels)

# create cypher path degree query
degree_strs = list()
for i, (source_label, target_label, rel_type, direction) in enumerate(metarels):
kwargs = {
'i0': i,
'i1': i + 1,
'source_label': source_label,
'target_label': target_label,
'rel_type': rel_type,
'dir0': '<-' if direction == 'backward' else '-',
'dir1': '->' if direction == 'forward' else '-',
}
degree_strs.append(textwrap.dedent(
'''\
size((n{i0}){dir0}[:{rel_type}]{dir1}()),
size((){dir0}[:{rel_type}]{dir1}(n{i1}))'''
).format(**kwargs))
degree_query = ',\n'.join(degree_strs)

using_query = ''
# Specify index hint for node lookup
if index_hint:
using_query = '\n' + textwrap.dedent('''\
USING INDEX n0:{source_label}({property})
USING INDEX n{length}:{target_label}({property})
''').rstrip().format(
property = property,
source_label = metarels[0][0],
target_label = metarels[-1][1],
length = len(metarels)
)
degree_query = construct_degree_clause(metarels)

# Specify join hint with node to join on
if join_hint is not False:
if join_hint is True or join_hint == 'midpoint':
join_hint = len(metarels) // 2
join_hint = int(join_hint)
assert join_hint >= 0
assert join_hint <= len(metarels)
using_query += "\nUSING JOIN ON n{}".format(join_hint)
using_query = construct_using_clause(metarels, join_hint, index_hint)

# Unique node constraint (pevent paths with duplicate nodes)
if unique_nodes == 'nested':
unique_nodes_query = '\nAND ALL (x IN nodes(path) WHERE size(filter(z IN nodes(path) WHERE z = x)) = 1)'
elif unique_nodes == 'expanded':
pairs = itertools.combinations(range(len(metarels) + 1), 2)
unique_nodes_query = format_expanded_clause(pairs)
elif unique_nodes == 'labeled' or unique_nodes is True:
labels = [metarel[0] for metarel in metarels]
labels.append(metarels[-1][1])
label_to_nodes = dict()
for i, label in enumerate(labels):
label_to_nodes.setdefault(label, list()).append(i)
pairs = list()
for nodes in label_to_nodes.values():
pairs.extend(itertools.combinations(nodes, 2))
unique_nodes_query = format_expanded_clause(pairs)
else:
assert unique_nodes is False
unique_nodes_query = ''
unique_nodes_query = construct_unique_nodes_clause(metarels, unique_nodes)

# combine cypher fragments into a single query and add PDP logic
query = ''
Expand All @@ -394,9 +408,9 @@ def construct_pdp_query(metarels, dwpc=None, property='name', join_hint='midpoin
[
{degree_query}
] AS degrees, path
WITH path, reduce(pdp = 1.0, d in degrees| pdp * d ^ -{{ w }}) as PDP
WITH path, reduce(pdp = 1.0, d in degrees| pdp * d ^ -{{ w }}) AS PDP
RETURN
path,
substring(reduce(s = '', node IN nodes(path)| s + '–' + node.name), 1) AS path,
PDP,
100 * (PDP / {dwpc}) AS PERCENT_OF_DWPC
ORDER BY PERCENT_OF_DWPC DESC
Expand All @@ -408,9 +422,8 @@ def construct_pdp_query(metarels, dwpc=None, property='name', join_hint='midpoin
length=len(metarels),
property=property,
dwpc = dwpc)
# If the dwpc isn't provided, we'll have to calculate it before the PDP.
# Doing so roughly doubles the query execution time, as it effectively
# runs the query twice returning different degrees of aggregation.

# https://stackoverflow.com/questions/54245415/
else:
query = textwrap.dedent('''\
MATCH path = {metapath_query}{using_query}
Expand All @@ -420,20 +433,16 @@ def construct_pdp_query(metarels, dwpc=None, property='name', join_hint='midpoin
[
{degree_query}
] AS degrees, path
WITH sum(reduce(pdp = 1.0, d in degrees| pdp * d ^ -{{ w }})) as DWPC

MATCH path = {metapath_query}{using_query}
WHERE n0.{property} = {{ source }}
AND n{length}.{property} = {{ target }}{unique_nodes_query}
WITH
[
{degree_query}
] AS degrees, path, DWPC
WITH path, DWPC, reduce(pdp = 1.0, d in degrees| pdp * d ^ -{{ w }}) as PDP
WITH path, reduce(pdp = 1.0, d in degrees| pdp * d ^ -{{ w }}) AS PDP
WITH path, collect(PDP) AS pdps, PDP
WITH collect({{path: path, pdps: pdps}}) AS allData, sum(PDP) AS DWPC
UNWIND allData AS data
UNWIND data.pdps AS PDP
WITH data.path AS path, PDP, DWPC
RETURN
path,
PDP,
100 * (PDP / DWPC) AS PERCENT_OF_DWPC
substring(reduce(s = '', node IN nodes(path)| s + '–' + node.name), 1) AS path,
PDP,
100 * (PDP / DWPC) AS PERCENT_OF_DWPC
ORDER BY PERCENT_OF_DWPC DESC
''').rstrip().format(
metapath_query = metapath_query,
Expand All @@ -443,7 +452,6 @@ def construct_pdp_query(metarels, dwpc=None, property='name', join_hint='midpoin
length=len(metarels),
property=property)


return query

def format_expanded_clause(pairs):
Expand Down Expand Up @@ -487,6 +495,7 @@ def permute_rel_type(uri, rel_type, nswap=None, max_tries=None, nswap_mult=10, m
Randomization Techniques for Graphs. SIAM International Conference on
Data Mining. https://doi.org/10.1137/1.9781611972795.67
"""
py2neo = import_py2neo()

neo = py2neo.Graph(uri)

Expand Down
Loading