11
11
import requests
12
12
from django .conf import settings
13
13
from django .core .management .base import BaseCommand
14
- from django .db import connection , transaction
14
+ from django .db import connection , reset_queries , transaction
15
15
from lxml import etree
16
- from raven .contrib .django .raven_compat .models import client
17
16
18
17
from datahub .company .models import CompaniesHouseCompany
19
- from datahub .core .utils import log_and_ignore_exceptions , slice_iterable_into_chunks , stream_to_file_pointer
18
+ from datahub .core .utils import slice_iterable_into_chunks , stream_to_file_pointer
20
19
21
20
logger = getLogger (__name__ )
22
21
@@ -73,6 +72,7 @@ def open_ch_zipped_csv(fp):
73
72
74
73
def iter_ch_csv_from_url (url , tmp_file_creator ):
75
74
"""Fetch & cache CH zipped CSV, and then iterate though contents."""
75
+ logger .info ('Loading CSV from URL: %s' , url )
76
76
with tmp_file_creator () as tf :
77
77
stream_to_file_pointer (url , tf )
78
78
tf .seek (0 , 0 )
@@ -90,18 +90,32 @@ def sync_ch(tmp_file_creator, endpoint=None, truncate_first=False):
90
90
https://github.com/django/django/blob/master/django/db/models/query.py#L420
91
91
this would create a list with millions of objects, that will try to be saved in batches in a single transaction
92
92
"""
93
+ logger .info ('Starting CH load...' )
94
+ count = 0
93
95
endpoint = endpoint or settings .CH_DOWNLOAD_URL
94
96
ch_csv_urls = get_ch_latest_dump_file_list (endpoint )
97
+ logger .info ('Found the following Companies House CSV URLs: %s' , ch_csv_urls )
95
98
if truncate_first :
96
99
truncate_ch_companies_table ()
97
100
for csv_url in ch_csv_urls :
98
101
ch_company_rows = iter_ch_csv_from_url (csv_url , tmp_file_creator )
99
- for batchiter in slice_iterable_into_chunks (ch_company_rows , settings .BULK_CREATE_BATCH_SIZE ):
100
- objects = [CompaniesHouseCompany (** ch_company_row ) for ch_company_row in batchiter if ch_company_row ]
102
+
103
+ batch_iter = slice_iterable_into_chunks (
104
+ ch_company_rows , settings .BULK_CREATE_BATCH_SIZE , _create_ch_company
105
+ )
106
+ for batch in batch_iter :
101
107
CompaniesHouseCompany .objects .bulk_create (
102
- objs = objects ,
108
+ objs = batch ,
103
109
batch_size = settings .BULK_CREATE_BATCH_SIZE
104
110
)
111
+ count += len (batch )
112
+ logger .info ('%d Companies House records loaded...' , count )
113
+ # In debug mode, Django keeps track of SQL statements executed which
114
+ # eventually leads to memory exhaustion.
115
+ # This clears that history.
116
+ reset_queries ()
117
+
118
+ logger .info ('Companies House load complete, %s records loaded' , count )
105
119
106
120
107
121
@transaction .atomic
@@ -112,20 +126,18 @@ def truncate_ch_companies_table():
112
126
"""
113
127
cursor = connection .cursor ()
114
128
table_name = CompaniesHouseCompany ._meta .db_table
129
+ logger .info ('Truncating the %s table' , table_name )
115
130
query = f'truncate { table_name } ;'
116
131
cursor .execute (query )
117
132
118
133
134
+ def _create_ch_company (row_dict ):
135
+ return CompaniesHouseCompany (** row_dict )
136
+
137
+
119
138
class Command (BaseCommand ):
120
139
"""Companies House sync command."""
121
140
122
141
def handle (self , * args , ** options ):
123
142
"""Handle."""
124
- try :
125
- sync_ch (tmp_file_creator = tempfile .TemporaryFile , truncate_first = True )
126
- except Exception as e :
127
- with log_and_ignore_exceptions ():
128
- client .captureException ()
129
-
130
- logger .exception ('Failed to sync from ES' )
131
- self .stderr .write (e )
143
+ sync_ch (tmp_file_creator = tempfile .TemporaryFile , truncate_first = True )
0 commit comments