Skip to content

Commit

Permalink
parse twitter csv files; try new max global depth for quadtree
Browse files Browse the repository at this point in the history
  • Loading branch information
davemfish committed Oct 11, 2023
1 parent c65a20e commit d154633
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/natcap/invest/recreation/out_of_core_quadtree.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ class OutOfCoreQuadTree(object):
shapely_prepared_polygon.contains(shapely_point)):
result_deque.append(point)
return result_deque
# DF: here's where we dive into the tree:
elif shapely_polygon.intersects(bounding_polygon):
# combine results of children
result_deque = collections.deque()
Expand Down
32 changes: 20 additions & 12 deletions src/natcap/invest/recreation/recmodel_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import threading
import collections
import logging
import psutil
import queue
from io import BytesIO, StringIO

Expand Down Expand Up @@ -104,7 +105,8 @@ class RecModel(object):
@_try_except_wrapper("RecModel construction exited while multiprocessing.")
def __init__(
self, raw_csv_filename, min_year, max_year, cache_workspace,
max_points_per_node=GLOBAL_MAX_POINTS_PER_NODE):
max_points_per_node=GLOBAL_MAX_POINTS_PER_NODE,
max_depth=GLOBAL_DEPTH, dataset_name='flickr'):
"""Initialize RecModel object.
Args:
Expand Down Expand Up @@ -133,8 +135,8 @@ def __init__(
"max_year is less than min_year, must be greater or "
"equal to")
self.qt_pickle_filename = construct_userday_quadtree(
initial_bounding_box, raw_csv_filename, cache_workspace,
max_points_per_node)
initial_bounding_box, raw_csv_filename, dataset_name, cache_workspace,
max_points_per_node, max_depth)
self.cache_workspace = cache_workspace
self.min_year = min_year
self.max_year = max_year
Expand Down Expand Up @@ -221,7 +223,7 @@ def calc_photo_user_days_in_aoi(
numpy_date_range = (
numpy.datetime64(date_range[0]),
numpy.datetime64(date_range[1]))
base_pud_aoi_path, monthly_table_path = (
base_pud_aoi_path, monthly_table_path, _ = (
self._calc_aggregated_points_in_aoi(
aoi_path, workspace_path, numpy_date_range,
out_vector_filename))
Expand Down Expand Up @@ -469,11 +471,11 @@ def _calc_aggregated_points_in_aoi(
polytest_process.join()

LOGGER.info('returning out shapefile path')
return out_aoi_pud_path, monthly_table_path
return out_aoi_pud_path, monthly_table_path, len(local_points)


def _parse_input_csv(
block_offset_size_queue, csv_filepath, numpy_array_queue):
block_offset_size_queue, csv_filepath, numpy_array_queue, dataset_name):
"""Parse CSV file lines to (datetime64[d], userhash, lat, lng) tuples.
Args:
Expand All @@ -500,9 +502,14 @@ def _parse_input_csv(
# 8568090486,48344648@N00,2013-03-17 16:27:27,42.383841,-71.138378,16
# this pattern matches the above style of line and only parses valid
# dates to handle some cases where there are weird dates in the input
pattern = r"[^,]+,([^,]+),(19|20\d\d-(?:0[1-9]|1[012])-(?:0[1-9]|[12][0-9]|3[01])) [^,]+,([^,]+),([^,]+),[^\n]" # pylint: disable=line-too-long
flickr_pattern = r"[^,]+,([^,]+),(19|20\d\d-(?:0[1-9]|1[012])-(?:0[1-9]|[12][0-9]|3[01])) [^,]+,([^,]+),([^,]+),[^\n]" # pylint: disable=line-too-long
twittr_pattern = r"[^,]+,([^,]+),(19|20\d\d-(?:0[1-9]|1[012])-(?:0[1-9]|[12][0-9]|3[01])),([^,]+),([^,]+)\n" # pylint: disable=line-too-long
patterns = {
'flickr': flickr_pattern,
'twitter': twittr_pattern
}
result = numpy.fromregex(
StringIO(chunk_string), pattern,
StringIO(chunk_string), patterns[dataset_name],
[('user', 'S40'), ('date', 'datetime64[D]'), ('lat', 'f4'),
('lng', 'f4')])

Expand Down Expand Up @@ -544,8 +551,8 @@ def _file_len(file_path):


def construct_userday_quadtree(
initial_bounding_box, raw_photo_csv_table, cache_dir,
max_points_per_node):
initial_bounding_box, raw_photo_csv_table, dataset_name, cache_dir,
max_points_per_node, max_depth):
"""Construct a spatial quadtree for fast querying of userday points.
Args:
Expand Down Expand Up @@ -575,12 +582,13 @@ def construct_userday_quadtree(
total_lines = _file_len(raw_photo_csv_table)
LOGGER.info('%d lines', total_lines)
ooc_qt = out_of_core_quadtree.OutOfCoreQuadTree(
initial_bounding_box, max_points_per_node, GLOBAL_DEPTH,
initial_bounding_box, max_points_per_node, max_depth,
cache_dir, pickle_filename=ooc_qt_picklefilename)

n_parse_processes = multiprocessing.cpu_count() - 1
if n_parse_processes < 1:
n_parse_processes = 1
n_parse_processes = 1 # TODO: remove after debugging

block_offset_size_queue = multiprocessing.Queue(n_parse_processes * 2)
numpy_array_queue = multiprocessing.Queue(n_parse_processes * 2)
Expand All @@ -590,7 +598,7 @@ def construct_userday_quadtree(
parse_input_csv_process = multiprocessing.Process(
target=_parse_input_csv, args=(
block_offset_size_queue, raw_photo_csv_table,
numpy_array_queue))
numpy_array_queue, dataset_name))
parse_input_csv_process.deamon = True
parse_input_csv_process.start()

Expand Down

0 comments on commit d154633

Please sign in to comment.