7
7
from typing import Dict
8
8
from typing import Optional
9
9
10
+ from ..utils import _apply_grouping
11
+
10
12
11
13
logger = getLogger (__name__ )
12
14
@@ -21,11 +23,12 @@ def extract(
21
23
Args:
22
24
self (InvoiceTemplate): The current instance of the class. # noqa: DOC103
23
25
content (str): The content of the invoice.
24
- output (Dict[str, Any]): A dictionary to store the extracted data.
26
+ output (Dict[str, Any]): The updated output dictionary with extracted
27
+ data or None if parsing fails.
25
28
26
29
Returns:
27
- Optional[Dict[str, Any]]: The updated output dictionary with extracted
28
- data, or None if date parsing fails .
30
+ Optional[List[ Any]]: The extracted data as a list of dictionaries, or None if table parsing fails.
31
+ Each dictionary represents a row in the table .
29
32
"""
30
33
for i , table in enumerate (self ["tables" ]):
31
34
logger .debug ("Testing Rules set #%s" , i )
@@ -41,8 +44,18 @@ def extract(
41
44
continue
42
45
43
46
# Process table lines
44
- if not _process_table_lines (self , table , table_body , output ):
45
- return None # Return None if date parsing fails
47
+ table_data = _process_table_lines (self , table , table_body )
48
+ if table_data is None :
49
+ continue
50
+
51
+ # Apply grouping to individual fields within table_data
52
+ for field , field_settings in table .get ("fields" , {}).items ():
53
+ if "group" in field_settings :
54
+ grouped_value = _apply_grouping (field_settings , table_data .get (field ))
55
+ if grouped_value is not None :
56
+ table_data [field ] = grouped_value
57
+
58
+ output .update (table_data )
46
59
47
60
return output
48
61
@@ -104,29 +117,28 @@ def _process_table_lines(
104
117
self : "OrderedDict[str, Any]" ,
105
118
table : Dict [str , Any ],
106
119
table_body : str ,
107
- output : Dict [str , Any ],
108
- ) -> bool :
120
+ ) -> Optional [Dict [str , Any ]]:
109
121
"""Process the lines within the table body.
110
122
111
123
Args:
112
124
self (InvoiceTemplate): The current instance of the class. # noqa: DOC103
113
125
table (Dict[str, Any]): The validated table settings.
114
126
table_body (str): The extracted table body.
115
- output (Dict[str, Any]): A dictionary to store the extracted data.
116
127
117
128
Returns:
118
- bool: True if processing is successful, False if date parsing fails.
129
+ List[Dict[str, Any]]: A list of dictionaries, where each dictionary
130
+ represents a row in the table.
119
131
"""
120
132
types = table .get ("types" , {})
121
133
no_match_found = True
122
-
134
+ line_output : Dict [ str , Any ] = {}
123
135
for line in re .split (table ["line_separator" ], table_body ):
124
136
if not line .strip ("" ).strip ("\n " ) or line .isspace ():
125
137
continue
126
138
127
139
# Correct the function call and return logic
128
- if not _process_table_line (self , table , line , types , output ):
129
- return False # Return False immediately if date parsing fails
140
+ if not _process_table_line (self , table , line , types , line_output ):
141
+ return None # Return None immediately if line parsing fails
130
142
else :
131
143
no_match_found = (
132
144
False # Update no_match_found only if line processing is successful
@@ -137,10 +149,11 @@ def _process_table_lines(
137
149
"\033 [1;43mWarning\033 [0m regex=\033 [91m*%s*\033 [0m doesn't match anything!" ,
138
150
table ["body" ],
139
151
)
140
- return True
141
152
153
+ return line_output
142
154
143
- def _process_table_line (
155
+
156
+ def _process_table_line ( # noqa: C901
144
157
self : "OrderedDict[str, Any]" ,
145
158
table : Dict [str , Any ],
146
159
line : str ,
@@ -162,9 +175,6 @@ def _process_table_line(
162
175
match = re .search (table ["body" ], line )
163
176
if match :
164
177
for field , value in match .groupdict ().items ():
165
- if field in output :
166
- continue
167
-
168
178
logger .debug (
169
179
(
170
180
"field=\033 [1m\033 [93m%s\033 [0m |"
@@ -177,18 +187,32 @@ def _process_table_line(
177
187
)
178
188
179
189
if field .startswith ("date" ) or field .endswith ("date" ):
180
- output [ field ] = self .parse_date (value ) # type: ignore[attr-defined]
181
- if not output [ field ] :
190
+ value = self .parse_date (value ) # type: ignore[attr-defined]
191
+ if not value :
182
192
logger .error ("Date parsing failed on date *%s*" , value )
183
193
return False
184
194
elif field .startswith ("amount" ):
185
- output [ field ] = self .parse_number (value ) # type: ignore[attr-defined]
195
+ value = self .parse_number (value ) # type: ignore[attr-defined]
186
196
elif field in types :
187
- # Access types as a dictionary
188
- output [field ] = self .coerce_type (value , types [field ]) # type: ignore[attr-defined]
197
+ value = self .coerce_type (value , types [field ]) # type: ignore[attr-defined]
198
+ elif table .get ("fields" ):
199
+ # Writing templates is hard. So we also support the following format
200
+ # In case someone mixup syntax
201
+ # fields:
202
+ # example_field:
203
+ # type: float
204
+ # group: sum
205
+ field_set = table ["fields" ].get (field , {})
206
+ if "type" in field_set :
207
+ value = self .coerce_type (value , field_set .get ("type" )) # type: ignore[attr-defined]
208
+
209
+ if field in output :
210
+ # Ensure output[field] is a list before appending
211
+ if not isinstance (output [field ], list ):
212
+ output [field ] = [output [field ]]
213
+ output [field ].append (value )
189
214
else :
190
215
output [field ] = value
191
-
192
216
# Return True if a match is found and processed successfully
193
217
return True
194
218
else :
0 commit comments