-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvideo.py
executable file
·122 lines (102 loc) · 3.38 KB
/
video.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#
#Copyright (C) 2020-2021 ISTI-CNR
#Licensed under the BSD 3-Clause Clear License (see license.txt)
#
import os
import sys
import numpy as np
import cv2
import torch
from torchvision.transforms.functional import to_tensor
from PIL import Image
import threading
from util import *
#
#
#
class Video:
def __init__(self, video_path, video_fmt = ''):
if video_fmt == '':
video_fmt = os.path.splitext(video_path)[1]
video_fmt_c = video_fmt.lower()
self.bVideo = (video_fmt_c == '.mov') or (video_fmt_c == '.mp4') or (video_fmt_c == '.avi') or (video_fmt_c == '.asf')
self.video_path = video_path
self.video_fmt = video_fmt
self.total_names = []
self.v = []
self.lock = threading.Lock()
self.counter = 0
if self.bVideo:
self.v = cv2.VideoCapture(self.video_path)
self.n = int(self.v.get(cv2.CAP_PROP_FRAME_COUNT))
else:
self.total_names = sorted([f for f in os.listdir(video_path) if f.endswith(video_fmt)])
self.n = len(self.total_names)
def release(self):
self.v.release()
def getNumFrames(self):
return self.n
def setFrame(self, frame):
self.counter = frame % self.n
self.v.set(cv2.CAP_PROP_POS_FRAMES, self.counter)
def getNextFrame(self, bBGR=True):
if self.bVideo:
success, frame_cv = self.v.read()
if success:
frame = fromNPtoPIL(fromVideoFrameToNP(frame_cv, bBGR))
else:
frame = []
else:
path = os.path.join(self.video_path, self.total_names[self.counter])
frame = imread(path)
self.counter = (self.counter + 1) % self.n
success = True
return success, frame
#
#
#
def getNextFrameWithIndex(self, index, bBGR=True, bPIL = True):
index = index % self.n
if self.bVideo:
self.lock.acquire(blocking=True)
self.setFrame(index)
success, frame_cv = self.v.read()
self.lock.release()
if success:
frame = fromVideoFrameToNP(frame_cv, bBGR)
if bPIL:
frame = fromNPtoPIL(frame)
else:
frame = []
else:
path = os.path.join(self.video_path, self.total_names[index])
frame = Image.open(path)
if not bPIL:
frame = fromPILtoNP(frame)
success = True
return success, frame
#
#
#
def readBlockFromVideo(self, index, use_transform, differential, fps = 30):
X = []
for i in range(0, fps):
success, frame = self.getNextFrameWithIndex(index + i)
if success:
if use_transform is not None:
frame = use_transform(frame)
else:
frame = to_tensor(frame)
X.append(frame)
if(differential == 1):
for i in range(0, fps - 1):
X[i] = X[i + 1] - X[i]
X = torch.stack(X[0:(fps - 1)], dim=0)
else:
X = torch.stack(X[0:fps], dim=0)
return X
def getName(self, index):
if self.bVideo:
return 'frame_' + str(index)
else:
return self.total_names[index]