1
- import os , pickle
1
+ import os
2
+ import pickle
2
3
from bisect import bisect_left , bisect_right
3
4
from moviepy .video .io .ffmpeg_tools import ffmpeg_extract_subclip
4
- from utils import *
5
+ import cv2
6
+ from utils import frame_to_string , prepareOCR , extract_coordinates
7
+ import unittest
8
+
5
9
6
10
class VideoSplitter :
7
11
"""
@@ -20,8 +24,8 @@ class VideoSplitter:
20
24
frame_to_string (optional): function used to extract caption from frame
21
25
extract_coordinates (optional): function used to extract coordinates from caption
22
26
"""
23
- def __init__ (self , fname , captions = None ,
24
- acceptCloseMatches = True , max_frames = 200 ,
27
+ def __init__ (self , fname , captions = None ,
28
+ acceptCloseMatches = True , max_frames = 200 ,
25
29
frame_to_string = frame_to_string ,
26
30
img_preprocessing = prepareOCR ,
27
31
extract_coordinates = extract_coordinates ):
@@ -36,8 +40,8 @@ def __init__(self, fname, captions = None,
36
40
self .Nframes = int (self .video .get (cv2 .CAP_PROP_FRAME_COUNT ))
37
41
self .fps = self .video .get (cv2 .CAP_PROP_FPS )
38
42
39
- self .captions = {} # (frame_index, caption)
40
- self .coordinates = {} # (frame_index, camera position in x,y,z) N.B.: strings, not float
43
+ self .captions = {} # (frame_index, caption)
44
+ self .coordinates = {} # (frame_index, camera position in x,y,z) N.B.: strings, not float
41
45
# (coordinates, first frame where coordinates appeared):
42
46
# represents a sorted list of frames, used for defining the start and end of sequences by bisection
43
47
self .seqID = {}
@@ -69,7 +73,7 @@ def loadFrame(self, frame_index):
69
73
success , frame = self .video .read ()
70
74
return frame if success else None
71
75
72
- def processFrame (self , frame_index , ignore_caption = False ):
76
+ def processFrame (self , frame_index , ignore_caption = False ):
73
77
"""
74
78
Extract and process the caption from the given frame, storing it in
75
79
self.caption if not present, the coordinates in self.coordinates,
@@ -93,7 +97,7 @@ def processFrame(self, frame_index, ignore_caption = False):
93
97
try :
94
98
pm = self .seqID if self .acceptCloseMatches else []
95
99
coordinates = self .extract_coordinates (caption , possible_matches = pm )
96
- except ValueError as error :
100
+ except ValueError :
97
101
coordinates = ('EXTRACTION FAILED' , frame_index , caption )
98
102
99
103
self .coordinates [frame_index ] = coordinates
@@ -112,28 +116,30 @@ def getCoordinates(self, frame_index):
112
116
113
117
def printCaptions (self ):
114
118
"Print captions"
115
- print ('Frame \t Caption' )
116
- for i in sorted (x .captions .items ()): print (f'{ i [0 ]} \t { i [1 ]} ' )
119
+ print ('Frame \t Caption' )
120
+ for i in sorted (x .captions .items ()):
121
+ print (f'{ i [0 ]} \t { i [1 ]} ' )
117
122
118
123
def __getitem__ (self , item ):
119
124
"""
120
125
Return the first frame that revealed the sequence to which the given frame belongs.
121
126
This method is called by bisect to find the boundaries of the sequence
122
127
"""
123
- return self .seqID [ self .getCoordinates (item ) ]
128
+ return self .seqID [self .getCoordinates (item )]
124
129
125
130
def findSequences (self ):
126
131
"""
127
132
Fill dictionary self.sequences with coordinates, (first frame, last frame) for each sequence
128
133
"""
129
134
if not self .coordinates :
130
135
# fill coordinates with first and last values if empty
131
- self [0 ], self [self .Nframes - 1 ]
136
+ self [0 ], self [self .Nframes - 1 ]
132
137
while True :
133
138
# Coordinates not yet analysed
134
- missing = dict ((frame , coord ) for (frame , coord ) in self .coordinates .items () \
139
+ missing = dict ((frame , coord ) for (frame , coord ) in self .coordinates .items ()
135
140
if coord not in self .sequences )
136
- if not missing : return
141
+ if not missing :
142
+ return
137
143
# Find the start and end of each sequence corresponding to each set of coordinates
138
144
for (frame , coord ) in missing .items ():
139
145
self .sequences [coord ] = bisect_left (self , self [frame ]), bisect_right (self , self [frame ]) - 1
@@ -144,10 +150,9 @@ def printSequences(self):
144
150
"""
145
151
if not self .sequences :
146
152
return
147
- print ('Frames \t Coordinates' )
148
- for v ,k in sorted ((v ,k ) for k ,v in self .sequences .items ()):
149
- print (f'{ v } \t { k } ' )
150
-
153
+ print ('Frames \t Coordinates' )
154
+ for v , k in sorted ((v , k ) for k , v in self .sequences .items ()):
155
+ print (f'{ v } \t { k } ' )
151
156
152
157
def writeSequences (self , outputdir , min_frames = 10 ):
153
158
"""
@@ -164,8 +169,8 @@ def writeSequences(self, outputdir, min_frames=10):
164
169
valid_sequences = filter (lambda x : x [1 ] - x [0 ] >= min_frames , self .sequences .values ())
165
170
for (fmin , fmax ) in sorted (valid_sequences ):
166
171
basename , ext = os .path .splitext (os .path .basename (self .fname ))
167
- fname = os .path .join (outputdir , f'{ basename } _seq{ fmin } _{ fmax } { ext } ' )
168
- ffmpeg_extract_subclip (self .fname , fmin / self .fps , fmax / self .fps , fname )
172
+ fname = os .path .join (outputdir , f'{ basename } _seq{ fmin } _{ fmax } { ext } ' )
173
+ ffmpeg_extract_subclip (self .fname , fmin / self .fps , fmax / self .fps , fname )
169
174
170
175
def writeInfo (self , outputdir ):
171
176
"""
@@ -185,18 +190,18 @@ def writeInfo(self, outputdir):
185
190
with open (fname , 'wb' ) as pickleFile :
186
191
pickle .dump (v , pickleFile )
187
192
188
-
189
193
def __len__ (self ):
190
194
return self .Nframes
191
195
192
- import unittest
196
+
193
197
def setupTester (cls ):
194
198
"""
195
199
Prepare tester class for VideoSplitter
196
200
"""
197
- import urllib , yaml
201
+ import urllib
202
+ import yaml
198
203
# Test parameters
199
- url = 'https://gist.githubusercontent.com/blenzi/82746e11119cb88a67603944869e29e2/raw' # noqa: E501
204
+ url = 'https://gist.githubusercontent.com/blenzi/82746e11119cb88a67603944869e29e2/raw'
200
205
cls .ref = eval (urllib .request .urlopen (url ).read ())
201
206
202
207
# Stream
@@ -223,9 +228,9 @@ def setUpClass(cls):
223
228
"Setup only once for all tests"
224
229
setupTester (cls )
225
230
cls .splitter = VideoSplitter (cls .fname )
226
- cls .testFindSequences = False # skip finding sequences (takes about 30s)
231
+ cls .testFindSequences = False # skip finding sequences (takes about 30s)
227
232
228
- def a_test_loadFrame (self ): # call it a_ as they are executed in alphabetical order
233
+ def a_test_loadFrame (self ): # call it a_ as they are executed in alphabetical order
229
234
frame = self .splitter .loadFrame (self .ref ['extract' ]['frame' ])
230
235
self .assertEqual (len (frame .shape ), 3 )
231
236
@@ -243,7 +248,7 @@ def test_findSequences(self):
243
248
self .maxDiff = None
244
249
self .splitter .findSequences ()
245
250
seqs = self .splitter .sequences
246
- inv_seqs = dict (map (reversed , seqs .items ())) # invert keys and values
251
+ inv_seqs = dict (map (reversed , seqs .items ())) # invert keys and values
247
252
self .assertEqual (inv_seqs .keys (), self .ref ['sequences' ].keys ())
248
253
249
254
def test_writeSequences (self ):
@@ -256,7 +261,7 @@ def test_writeSequences(self):
256
261
basename , ext = os .path .splitext (os .path .basename (self .splitter .fname ))
257
262
for fmin , fmax in self .splitter .sequences .values ():
258
263
fname = os .path .join (tmpdirname , f'{ basename } _seq{ fmin } _{ fmax } { ext } ' )
259
- self .assertTrue ( os .path .exists (fname ) )
264
+ self .assertTrue (os .path .exists (fname ))
260
265
261
266
def test_writeInfo (self ):
262
267
"Test writing dictionaries with captions, sequences, ..."
@@ -273,6 +278,7 @@ def test_writeInfo(self):
273
278
dSaved = pickle .load (pickleFile )
274
279
self .assertEqual (d , dSaved )
275
280
281
+
276
282
class VideoTesterWithCaptions (VideoTester ):
277
283
"""
278
284
Test VideoSplitter with captions loaded externally
0 commit comments