星级打分
平均分: NAN 参与人数: 0 我的评分: 未评
我发个我自己修改的用的版本吧。
因为我的是python3.10版本以及相关库的环境,所以如果出错,你们自己解决。。
首先是FANExtractor.py
import os
import traceback
from pathlib import Path
import cv2
import numpy as np
from numpy import linalg as npla
from facelib import FaceType, LandmarksProcessor
from core.leras import nn
"""
ported from https://github.com/1adrianb/face-alignment
"""
class FANExtractor(object):
def __init__(self, landmarks_3D=False, place_model_on_cpu=False):
model_path = Path(__file__).parent / ("2DFAN.npy" if not landmarks_3D else "3DFAN.npy")
if not model_path.exists():
raise Exception("Unable to load FANExtractor model")
nn.initialize(data_format="NHWC")
tf = nn.tf
class ConvBlock(nn.ModelBase):
def on_build(self, in_planes, out_planes):
self.in_planes = in_planes
self.out_planes = out_planes
self.bn1 = nn.BatchNorm2D(in_planes)
self.conv1 = nn.Conv2D(in_planes, out_planes // 2, kernel_size=3, strides=1, padding='SAME', use_bias=False)
self.bn2 = nn.BatchNorm2D(out_planes // 2)
self.conv2 = nn.Conv2D(out_planes // 2, out_planes // 4, kernel_size=3, strides=1, padding='SAME', use_bias=False)
self.bn3 = nn.BatchNorm2D(out_planes // 4)
self.conv3 = nn.Conv2D(out_planes // 4, out_planes // 4, kernel_size=3, strides=1, padding='SAME', use_bias=False)
if self.in_planes != self.out_planes:
self.down_bn1 = nn.BatchNorm2D(in_planes)
self.down_conv1 = nn.Conv2D(in_planes, out_planes, kernel_size=1, strides=1, padding='VALID', use_bias=False)
else:
self.down_bn1 = None
self.down_conv1 = None
def forward(self, input):
x = input
x = self.bn1(x)
x = tf.nn.relu(x)
x = out1 = self.conv1(x)
x = self.bn2(x)
x = tf.nn.relu(x)
x = out2 = self.conv2(x)
x = self.bn3(x)
x = tf.nn.relu(x)
x = out3 = self.conv3(x)
x = tf.concat([out1, out2, out3], axis=-1)
if self.in_planes != self.out_planes:
downsample = self.down_bn1(input)
downsample = tf.nn.relu(downsample)
downsample = self.down_conv1(downsample)
x = x + downsample
else:
x = x + input
return x
class HourGlass(nn.ModelBase):
def on_build(self, in_planes, depth):
self.b1 = ConvBlock(in_planes, 256)
self.b2 = ConvBlock(in_planes, 256)
if depth > 1:
self.b2_plus = HourGlass(256, depth - 1)
else:
self.b2_plus = ConvBlock(256, 256)
self.b3 = ConvBlock(256, 256)
def forward(self, input):
up1 = self.b1(input)
low1 = tf.nn.avg_pool(input, [1, 2, 2, 1], [1, 2, 2, 1], 'VALID')
low1 = self.b2(low1)
low2 = self.b2_plus(low1)
low3 = self.b3(low2)
up2 = nn.upsample2d(low3)
return up1 + up2
class FAN(nn.ModelBase):
def __init__(self):
super().__init__(name='FAN')
def on_build(self):
self.conv1 = nn.Conv2D(3, 64, kernel_size=7, strides=2, padding='SAME')
self.bn1 = nn.BatchNorm2D(64)
self.conv2 = ConvBlock(64, 128)
self.conv3 = ConvBlock(128, 128)
self.conv4 = ConvBlock(128, 256)
self.m = []
self.top_m = []
self.conv_last = []
self.bn_end = []
self.l = []
self.bl = []
self.al = []
for i in range(4):
self.m += [HourGlass(256, 4)]
self.top_m += [ConvBlock(256, 256)]
self.conv_last += [nn.Conv2D(256, 256, kernel_size=1, strides=1, padding='VALID')]
self.bn_end += [nn.BatchNorm2D(256)]
self.l += [nn.Conv2D(256, 68, kernel_size=1, strides=1, padding='VALID')]
if i < 4 - 1:
self.bl += [nn.Conv2D(256, 256, kernel_size=1, strides=1, padding='VALID')]
self.al += [nn.Conv2D(68, 256, kernel_size=1, strides=1, padding='VALID')]
def forward(self, inp):
x, = inp
x = self.conv1(x)
x = self.bn1(x)
x = tf.nn.relu(x)
x = self.conv2(x)
x = tf.nn.avg_pool(x, [1, 2, 2, 1], [1, 2, 2, 1], 'VALID')
x = self.conv3(x)
x = self.conv4(x)
outputs = []
previous = x
for i in range(4):
ll = self.m[i](previous)
ll = self.top_m[i](ll)
ll = self.conv_last[i](ll)
ll = self.bn_end[i](ll)
ll = tf.nn.relu(ll)
tmp_out = self.l[i](ll)
outputs.append(tmp_out)
if i < 4 - 1:
ll = self.bl[i](ll)
previous = previous + ll + self.al[i](tmp_out)
x = outputs[-1]
x = tf.transpose(x, (0, 3, 1, 2))
return x
e = None
if place_model_on_cpu:
e = tf.device("/CPU:0")
if e is not None:
e.__enter__()
try:
self.model = FAN()
self.model.load_weights(str(model_path))
finally:
if e is not None:
e.__exit__(None, None, None)
self.model.build_for_run([(tf.float32, (None, 256, 256, 3))])
self.gpu_memory_limit = self._get_gpu_memory_limit()
self.current_memory_usage = 0
def _get_gpu_memory_limit(self):
import tensorflow as tf
try:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
details = tf.config.experimental.get_device_details(gpus[0])
if 'device_limitations' in details and 'total_memory' in details['device_limitations']:
return details['device_limitations']['total_memory'] // (1024 ** 2)
else:
return 8192
except:
return 8192
return 8192
def _estimate_memory_usage(self, batch_size, avg_image_size):
memory_per_image = (avg_image_size * 3 * 4) / (1024 ** 2) # 图像数据 (float32)
model_overhead = 150 # FAN模型激活值和中间层的更大开销
return batch_size * (memory_per_image + model_overhead)
def _get_current_gpu_memory_usage(self):
try:
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
import pynvml
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
info = pynvml.nvmlDeviceGetMemoryInfo(handle)
return info.used // (1024 ** 2) # 转换为MB
except ImportError:
return self.current_memory_usage
return 0
except:
return self.current_memory_usage
def extract_batch(self, input_images, all_rects, second_pass_extractor=None,
is_bgr=True, multi_sample=False, initial_batch_size=4, memory_safety_margin=0.7):
if not input_images or not all_rects or len(input_images) != len(all_rects):
return [[] for _ in range(len(input_images))]
all_results = []
avg_image_size = np.mean([img.shape[0] * img.shape[1] for img in input_images])
current_batch_size = initial_batch_size
max_retries = 3
retry_count = 0
i = 0
while i < len(input_images):
current_batch_size = self._adjust_batch_size(
current_batch_size, avg_image_size, memory_safety_margin
)
end_idx = min(i + current_batch_size, len(input_images))
batch_images = input_images[i:end_idx]
batch_rects = all_rects[i:end_idx]
batch_landmarks = []
success = True
try:
for img, rects in zip(batch_images, batch_rects):
landmarks = self._extract_single_optimized(img, rects, second_pass_extractor, is_bgr, multi_sample)
batch_landmarks.append(landmarks)
self.current_memory_usage += 100 # 估算每次处理约100MB
all_results.extend(batch_landmarks)
i = end_idx
retry_count = 0 # 重置重试计数
except Exception as e:
if "out of memory" in str(e).lower() or "memory" in str(e).lower():
if retry_count < max_retries:
current_batch_size = max(1, current_batch_size // 2)
retry_count += 1
print(f"FANExtractor: 内存不足,减少批大小至 {current_batch_size}")
continue
else:
print(f"FANExtractor: 无法处理图像,跳过。错误: {e}")
for _ in range(len(batch_images)):
all_results.append(None)
i = end_idx
retry_count = 0
else:
print(f"FANExtractor: 处理图像时发生错误: {e}")
for _ in range(len(batch_images)):
all_results.append(None)
i = end_idx
retry_count = 0
return all_results
def _adjust_batch_size(self, current_batch_size, avg_image_size, safety_margin):
if self.gpu_memory_limit == 0: # CPU模式
return min(current_batch_size, 2)
current_usage = self._get_current_gpu_memory_usage()
available_memory = self.gpu_memory_limit * safety_margin - current_usage
required_memory = self._estimate_memory_usage(current_batch_size, avg_image_size)
if required_memory > available_memory and current_batch_size > 1:
adjusted_batch_size = int(current_batch_size * (available_memory / required_memory))
adjusted_batch_size = max(1, adjusted_batch_size)
return min(current_batch_size, adjusted_batch_size)
return current_batch_size
def _process_batch(self, input_images, all_rects, second_pass_extractor=None, is_bgr=True, multi_sample=False):
batch_results = []
for img, rects in zip(input_images, all_rects):
landmarks = self._extract_single_optimized(img, rects, second_pass_extractor, is_bgr, multi_sample)
batch_results.append(landmarks)
return batch_results
def _extract_single_optimized(self, input_image, rects, second_pass_extractor=None, is_bgr=True, multi_sample=False):
if len(rects) == 0:
return []
if is_bgr:
input_image = input_image[:, :, ::-1]
is_bgr = False
(h, w, ch) = input_image.shape
landmarks = []
for (left, top, right, bottom) in rects:
scale = (right - left + bottom - top) / 195.0
center = np.array([(left + right) / 2.0, (top + bottom) / 2.0])
centers = [center]
if multi_sample:
centers += [center + [-1, -1],
center + [1, -1],
center + [1, 1],
center + [-1, 1],
]
images = []
ptss = []
try:
for c in centers:
images += [self.crop(input_image, c, scale)]
images = np.stack(images)
images = images.astype(np.float32) / 255.0
predicted = []
for i in range(len(images)):
pred_result = self.model.run([images[i][None, ...]])
predicted.append(pred_result[0])
for i, pred in enumerate(predicted):
ptss += [self.get_pts_from_predict(pred, centers[i], scale)]
pts_img = np.mean(np.array(ptss), 0)
landmarks.append(pts_img)
except Exception as e:
print(f"Error in FANExtractor.extract: {str(e)}")
landmarks.append(None)
if second_pass_extractor is not None and any(lmrks is not None for lmrks in landmarks):
for i, lmrks in enumerate(landmarks):
if lmrks is not None:
try:
image_to_face_mat = LandmarksProcessor.get_transform_mat(lmrks, 256, FaceType.FULL)
face_image = cv2.warpAffine(input_image, image_to_face_mat, (256, 256), cv2.INTER_CUBIC)
rects2 = second_pass_extractor.extract(face_image, is_bgr=is_bgr)
if len(rects2) == 1: # 只有在检测到1个面部时才进行二次提取
lmrks2 = self.extract(face_image, [rects2[0]], is_bgr=is_bgr, multi_sample=True)[0]
landmarks[i] = LandmarksProcessor.transform_points(lmrks2, image_to_face_mat, True)
except Exception as e:
print(f"Error in second pass extraction: {str(e)}")
pass
return landmarks
def extract(self, input_image, rects, second_pass_extractor=None, is_bgr=True, multi_sample=False):
return self._extract_single_optimized(input_image, rects, second_pass_extractor, is_bgr, multi_sample)
def transform(self, point, center, scale, resolution):
pt = np.array([point[0], point[1], 1.0])
h = 200.0 * scale
m = np.eye(3)
m[0, 0] = resolution / h
m[1, 1] = resolution / h
m[0, 2] = resolution * (-center[0] / h + 0.5)
m[1, 2] = resolution * (-center[1] / h + 0.5)
m = np.linalg.inv(m)
return np.matmul(m, pt)[0:2]
def crop(self, image, center, scale, resolution=256.0):
ul = self.transform([1, 1], center, scale, resolution).astype(np.int32)
br = self.transform([resolution, resolution], center, scale, resolution).astype(np.int32)
ht, wd = image.shape[0], image.shape[1]
newX = np.array([max(1, -ul[0] + 1), min(br[0], wd) - ul[0]], dtype=np.int32)
newY = np.array([max(1, -ul[1] + 1), min(br[1], ht) - ul[1]], dtype=np.int32)
oldX = np.array([max(1, ul[0] + 1), min(br[0], wd)], dtype=np.int32)
oldY = np.array([max(1, ul[1] + 1), min(br[1], ht)], dtype=np.int32)
if image.ndim > 2:
newDim = np.array([br[1] - ul[1], br[0] - ul[0], image.shape[2]], dtype=np.int32)
newImg = np.zeros(newDim, dtype=np.uint8)
else:
newDim = np.array([br[1] - ul[1], br[0] - ul[0]], dtype=np.int32)
newImg = np.zeros(newDim, dtype=np.uint8)
newImg[newY[0] - 1:newY[1], newX[0] - 1:newX[1]] = image[oldY[0] - 1:oldY[1], oldX[0] - 1:oldX[1], :]
newImg = cv2.resize(newImg, dsize=(int(resolution), int(resolution)), interpolation=cv2.INTER_LINEAR)
return newImg
def get_pts_from_predict(self, a, center, scale):
a_ch, a_h, a_w = a.shape
b = a.reshape((a_ch, a_h * a_w))
c = b.argmax(1).reshape((a_ch, 1)).repeat(2, axis=1).astype(np.float32)
c[:, 0] %= a_w
c[:, 1] = np.floor(c[:, 1] / a_w)
for i in range(a_ch):
pX, pY = int(c[i, 0]), int(c[i, 1])
if 0 < pX < a_w - 1 and 0 < pY < a_h - 1:
diff = np.array([a[i, pY, pX + 1] - a[i, pY, pX - 1],
a[i, pY + 1, pX] - a[i, pY - 1, pX]])
c[i] += np.sign(diff) * 0.25
c += 0.5
return np.array([self.transform(c[i], center, scale, a_w) for i in range(a_ch)])
复制代码
第二个文件 S3FDExtractor.py
import operator
from pathlib import Path
import cv2
import numpy as np
from core.leras import nn
class S3FDExtractor(object):
def __init__(self, place_model_on_cpu=False):
nn.initialize(data_format="NHWC")
tf = nn.tf
model_path = Path(__file__).parent / "S3FD.npy"
if not model_path.exists():
raise Exception("Unable to load S3FD.npy")
class L2Norm(nn.LayerBase):
def __init__(self, n_channels, **kwargs):
self.n_channels = n_channels
super().__init__(**kwargs)
def build_weights(self):
self.weight = tf.get_variable("weight", (1, 1, 1, self.n_channels), dtype=nn.floatx, initializer=tf.initializers.ones)
def get_weights(self):
return [self.weight]
def __call__(self, inputs):
x = inputs
x = x / (tf.sqrt(tf.reduce_sum(tf.pow(x, 2), axis=-1, keepdims=True)) + 1e-10) * self.weight
return x
class S3FD(nn.ModelBase):
def __init__(self):
super().__init__(name='S3FD')
def on_build(self):
self.minus = tf.constant([104, 117, 123], dtype=nn.floatx)
self.conv1_1 = nn.Conv2D(3, 64, kernel_size=3, strides=1, padding='SAME')
self.conv1_2 = nn.Conv2D(64, 64, kernel_size=3, strides=1, padding='SAME')
self.conv2_1 = nn.Conv2D(64, 128, kernel_size=3, strides=1, padding='SAME')
self.conv2_2 = nn.Conv2D(128, 128, kernel_size=3, strides=1, padding='SAME')
self.conv3_1 = nn.Conv2D(128, 256, kernel_size=3, strides=1, padding='SAME')
self.conv3_2 = nn.Conv2D(256, 256, kernel_size=3, strides=1, padding='SAME')
self.conv3_3 = nn.Conv2D(256, 256, kernel_size=3, strides=1, padding='SAME')
self.conv4_1 = nn.Conv2D(256, 512, kernel_size=3, strides=1, padding='SAME')
self.conv4_2 = nn.Conv2D(512, 512, kernel_size=3, strides=1, padding='SAME')
self.conv4_3 = nn.Conv2D(512, 512, kernel_size=3, strides=1, padding='SAME')
self.conv5_1 = nn.Conv2D(512, 512, kernel_size=3, strides=1, padding='SAME')
self.conv5_2 = nn.Conv2D(512, 512, kernel_size=3, strides=1, padding='SAME')
self.conv5_3 = nn.Conv2D(512, 512, kernel_size=3, strides=1, padding='SAME')
self.fc6 = nn.Conv2D(512, 1024, kernel_size=3, strides=1, padding=3)
self.fc7 = nn.Conv2D(1024, 1024, kernel_size=1, strides=1, padding='SAME')
self.conv6_1 = nn.Conv2D(1024, 256, kernel_size=1, strides=1, padding='SAME')
self.conv6_2 = nn.Conv2D(256, 512, kernel_size=3, strides=2, padding='SAME')
self.conv7_1 = nn.Conv2D(512, 128, kernel_size=1, strides=1, padding='SAME')
self.conv7_2 = nn.Conv2D(128, 256, kernel_size=3, strides=2, padding='SAME')
self.conv3_3_norm = L2Norm(256)
self.conv4_3_norm = L2Norm(512)
self.conv5_3_norm = L2Norm(512)
self.conv3_3_norm_mbox_conf = nn.Conv2D(256, 4, kernel_size=3, strides=1, padding='SAME')
self.conv3_3_norm_mbox_loc = nn.Conv2D(256, 4, kernel_size=3, strides=1, padding='SAME')
self.conv4_3_norm_mbox_conf = nn.Conv2D(512, 2, kernel_size=3, strides=1, padding='SAME')
self.conv4_3_norm_mbox_loc = nn.Conv2D(512, 4, kernel_size=3, strides=1, padding='SAME')
self.conv5_3_norm_mbox_conf = nn.Conv2D(512, 2, kernel_size=3, strides=1, padding='SAME')
self.conv5_3_norm_mbox_loc = nn.Conv2D(512, 4, kernel_size=3, strides=1, padding='SAME')
self.fc7_mbox_conf = nn.Conv2D(1024, 2, kernel_size=3, strides=1, padding='SAME')
self.fc7_mbox_loc = nn.Conv2D(1024, 4, kernel_size=3, strides=1, padding='SAME')
self.conv6_2_mbox_conf = nn.Conv2D(512, 2, kernel_size=3, strides=1, padding='SAME')
self.conv6_2_mbox_loc = nn.Conv2D(512, 4, kernel_size=3, strides=1, padding='SAME')
self.conv7_2_mbox_conf = nn.Conv2D(256, 2, kernel_size=3, strides=1, padding='SAME')
self.conv7_2_mbox_loc = nn.Conv2D(256, 4, kernel_size=3, strides=1, padding='SAME')
def forward(self, inp):
x, = inp
x = x - self.minus
x = tf.nn.relu(self.conv1_1(x))
x = tf.nn.relu(self.conv1_2(x))
x = tf.nn.max_pool(x, [1, 2, 2, 1], [1, 2, 2, 1], "VALID")
x = tf.nn.relu(self.conv2_1(x))
x = tf.nn.relu(self.conv2_2(x))
x = tf.nn.max_pool(x, [1, 2, 2, 1], [1, 2, 2, 1], "VALID")
x = tf.nn.relu(self.conv3_1(x))
x = tf.nn.relu(self.conv3_2(x))
x = tf.nn.relu(self.conv3_3(x))
f3_3 = x
x = tf.nn.max_pool(x, [1, 2, 2, 1], [1, 2, 2, 1], "VALID")
x = tf.nn.relu(self.conv4_1(x))
x = tf.nn.relu(self.conv4_2(x))
x = tf.nn.relu(self.conv4_3(x))
f4_3 = x
x = tf.nn.max_pool(x, [1, 2, 2, 1], [1, 2, 2, 1], "VALID")
x = tf.nn.relu(self.conv5_1(x))
x = tf.nn.relu(self.conv5_2(x))
x = tf.nn.relu(self.conv5_3(x))
f5_3 = x
x = tf.nn.max_pool(x, [1, 2, 2, 1], [1, 2, 2, 1], "VALID")
x = tf.nn.relu(self.fc6(x))
x = tf.nn.relu(self.fc7(x))
ffc7 = x
x = tf.nn.relu(self.conv6_1(x))
x = tf.nn.relu(self.conv6_2(x))
f6_2 = x
x = tf.nn.relu(self.conv7_1(x))
x = tf.nn.relu(self.conv7_2(x))
f7_2 = x
f3_3 = self.conv3_3_norm(f3_3)
f4_3 = self.conv4_3_norm(f4_3)
f5_3 = self.conv5_3_norm(f5_3)
cls1 = self.conv3_3_norm_mbox_conf(f3_3)
reg1 = self.conv3_3_norm_mbox_loc(f3_3)
cls2 = tf.nn.softmax(self.conv4_3_norm_mbox_conf(f4_3))
reg2 = self.conv4_3_norm_mbox_loc(f4_3)
cls3 = tf.nn.softmax(self.conv5_3_norm_mbox_conf(f5_3))
reg3 = self.conv5_3_norm_mbox_loc(f5_3)
cls4 = tf.nn.softmax(self.fc7_mbox_conf(ffc7))
reg4 = self.fc7_mbox_loc(ffc7)
cls5 = tf.nn.softmax(self.conv6_2_mbox_conf(f6_2))
reg5 = self.conv6_2_mbox_loc(f6_2)
cls6 = tf.nn.softmax(self.conv7_2_mbox_conf(f7_2))
reg6 = self.conv7_2_mbox_loc(f7_2)
# max-out background label
bmax = tf.maximum(tf.maximum(cls1[:, :, :, 0:1], cls1[:, :, :, 1:2]), cls1[:, :, :, 2:3])
cls1 = tf.concat([bmax, cls1[:, :, :, 3:4]], axis=-1)
cls1 = tf.nn.softmax(cls1)
return [cls1, reg1, cls2, reg2, cls3, reg3, cls4, reg4, cls5, reg5, cls6, reg6]
e = None
if place_model_on_cpu:
e = tf.device("/CPU:0")
if e is not None:
e.__enter__()
try:
self.model = S3FD()
self.model.load_weights(model_path)
finally:
if e is not None:
e.__exit__(None, None, None)
self.model.build_for_run([(tf.float32, nn.get4Dshape(None, None, 3))])
self.gpu_memory_limit = self._get_gpu_memory_limit()
self.current_memory_usage = 0
def _get_gpu_memory_limit(self):
import tensorflow as tf
try:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
details = tf.config.experimental.get_device_details(gpus[0])
if 'device_limitations' in details and 'total_memory' in details['device_limitations']:
return details['device_limitations']['total_memory'] // (1024 ** 2)
else:
return 8192
except:
return 8192
return 8192
def _estimate_memory_usage(self, batch_size, avg_image_size):
memory_per_image = (avg_image_size * 3 * 4) / (1024 ** 2) # 图像数据 (float32)
model_overhead = 50 # 模型激活值和中间层的开销
return batch_size * (memory_per_image + model_overhead)
def _get_current_gpu_memory_usage(self):
try:
import tensorflow as tf
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
try:
import pynvml
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(0)
info = pynvml.nvmlDeviceGetMemoryInfo(handle)
return info.used // (1024 ** 2) # 转换为MB
except ImportError:
return self.current_memory_usage
return 0
except:
return self.current_memory_usage
def extract_batch(self, input_images, is_bgr=True, is_remove_intersects=False,
initial_batch_size=8, memory_safety_margin=0.8):
if not input_images:
return [[] for _ in range(len(input_images))]
all_results = []
avg_image_size = np.mean([img.shape[0] * img.shape[1] for img in input_images])
current_batch_size = initial_batch_size
max_retries = 3
retry_count = 0
i = 0
while i < len(input_images):
current_batch_size = self._adjust_batch_size(
current_batch_size, avg_image_size, memory_safety_margin
)
end_idx = min(i + current_batch_size, len(input_images))
batch = input_images[i:end_idx]
try:
processed_batch = []
scales = []
for img in batch:
if is_bgr:
img = img[:, :, ::-1]
h, w, ch = img.shape
d = max(w, h)
scale_to = 640 if d >= 1280 else d / 2
scale_to = max(64, scale_to)
input_scale = d / scale_to
resized_img = cv2.resize(img, (int(w / input_scale), int(h / input_scale)),
interpolation=cv2.INTER_LINEAR)
processed_batch.append(resized_img)
scales.append(input_scale)
batch_tensor = np.stack(processed_batch).astype(np.float32)
self.current_memory_usage += len(batch) * 50 # 估算每张图约50MB
olist_batch = self.model.run([batch_tensor])
for idx in range(len(batch)):
single_olist = [tensor[idx:idx + 1] for tensor in olist_batch]
detected_faces = []
for ltrb in self.refine(single_olist):
l, t, r, b = [x * scales[idx] for x in ltrb]
bt = b - t
if min(r - l, bt) < 40:
continue
b += bt * 0.1
detected_faces.append([int(x) for x in (l, t, r, b)])
detected_faces = [[(l, t, r, b), (r - l) * (b - t)] for (l, t, r, b) in detected_faces]
detected_faces = sorted(detected_faces, key=operator.itemgetter(1), reverse=True)
detected_faces = [x[0] for x in detected_faces]
if is_remove_intersects:
detected_faces = self._remove_intersecting_faces_optimized(detected_faces)
all_results.append(detected_faces)
i = end_idx
retry_count = 0 # 重置重试计数
except Exception as e:
if "out of memory" in str(e).lower() or "memory" in str(e).lower():
if retry_count < max_retries:
current_batch_size = max(1, current_batch_size // 2)
retry_count += 1
print(f"S3FDExtractor: 内存不足,减少批大小至 {current_batch_size}")
continue
else:
print(f"S3FDExtractor: 无法处理图像,跳过。错误: {e}")
for _ in range(len(batch)):
all_results.append([])
i = end_idx
retry_count = 0
else:
print(f"S3FDExtractor: 处理图像时发生错误: {e}")
for _ in range(len(batch)):
all_results.append([])
i = end_idx
retry_count = 0
return all_results
def _adjust_batch_size(self, current_batch_size, avg_image_size, safety_margin):
if self.gpu_memory_limit == 0: # CPU模式
return min(current_batch_size, 4)
current_usage = self._get_current_gpu_memory_usage()
available_memory = self.gpu_memory_limit * safety_margin - current_usage
required_memory = self._estimate_memory_usage(current_batch_size, avg_image_size)
if required_memory > available_memory and current_batch_size > 1:
adjusted_batch_size = int(current_batch_size * (available_memory / required_memory))
adjusted_batch_size = max(1, adjusted_batch_size)
return min(current_batch_size, adjusted_batch_size)
return current_batch_size
def _process_batch(self, input_images, is_bgr=True, is_remove_intersects=False):
batch_results = []
for img in input_images:
result = self.extract_single(img, is_bgr, is_remove_intersects)
batch_results.append(result)
return batch_results
def extract_single(self, input_image, is_bgr=True, is_remove_intersects=False):
if is_bgr:
input_image = input_image[:, :, ::-1]
is_bgr = False
(h, w, ch) = input_image.shape
d = max(w, h)
scale_to = 640 if d >= 1280 else d / 2
scale_to = max(64, scale_to)
input_scale = d / scale_to
input_image = cv2.resize(input_image, (int(w / input_scale), int(h / input_scale)), interpolation=cv2.INTER_LINEAR)
olist = self.model.run([input_image[None, ...]])
detected_faces = []
for ltrb in self.refine(olist):
l, t, r, b = [x * input_scale for x in ltrb]
bt = b - t
if min(r - l, bt) < 40: # filtering faces < 40pix by any side
continue
b += bt * 0.1 # enlarging bottom line a bit for 2DFAN-4
detected_faces.append([int(x) for x in (l, t, r, b)])
# sort by largest area first
detected_faces = [[(l, t, r, b), (r - l) * (b - t)] for (l, t, r, b) in detected_faces]
detected_faces = sorted(detected_faces, key=operator.itemgetter(1), reverse=True)
detected_faces = [x[0] for x in detected_faces]
if is_remove_intersects:
detected_faces = self._remove_intersecting_faces_optimized(detected_faces)
return detected_faces
def _remove_intersecting_faces_optimized(self, detected_faces):
if len(detected_faces) <= 1:
return detected_faces
bboxes = np.array(detected_faces)
areas = (bboxes[:, 2] - bboxes[:, 0]) * (bboxes[:, 3] - bboxes[:, 1])
x1, y1, x2, y2 = bboxes[:, 0], bboxes[:, 1], bboxes[:, 2], bboxes[:, 3]
xx1 = np.maximum(x1[:, None], x1[None, :])
yy1 = np.maximum(y1[:, None], y1[None, :])
xx2 = np.minimum(x2[:, None], x2[None, :])
yy2 = np.minimum(y2[:, None], y2[None, :])
w = np.maximum(0, xx2 - xx1)
h = np.maximum(0, yy2 - yy1)
inter = w * h
union = areas[:, None] + areas[None, :] - inter
iou = inter / union
to_remove = set()
for i in range(len(bboxes)):
if i in to_remove:
continue
overlap_indices = np.where((iou[i, :] > 0.3) & (np.arange(len(bboxes)) != i))[0]
for j in overlap_indices:
if j in to_remove:
continue
if areas[i] < areas[j]:
to_remove.add(i)
break
else:
to_remove.add(j)
for idx in sorted(to_remove, reverse=True):
if idx < len(detected_faces):
detected_faces.pop(idx)
return detected_faces
def extract(self, input_image, is_bgr=True, is_remove_intersects=False):
return self.extract_single(input_image, is_bgr, is_remove_intersects)
def refine(self, olist):
bboxlist = []
for i, ((ocls,), (oreg,)) in enumerate(zip(olist[::2], olist[1::2])):
stride = 2 ** (i + 2) # 4,8,16,32,64,128
s_d2 = stride / 2
s_m4 = stride * 4
scores = ocls[..., 1]
high_score_indices = np.where(scores > 0.05)
for hindex, windex in zip(*high_score_indices):
score = scores[hindex, windex]
loc = oreg[hindex, windex, :]
priors = np.array([windex * stride + s_d2, hindex * stride + s_d2, s_m4, s_m4])
priors_2p = priors[2:]
box = np.concatenate((priors[:2] + loc[:2] * 0.1 * priors_2p,
priors_2p * np.exp(loc[2:] * 0.2)))
box[:2] -= box[2:] / 2
box[2:] += box[:2]
bboxlist.append([*box, score])
bboxlist = np.array(bboxlist)
if len(bboxlist) == 0:
bboxlist = np.zeros((1, 5))
bboxlist = bboxlist[self.refine_nms(bboxlist, 0.3), :]
bboxlist = [x[:-1].astype(np.int32) for x in bboxlist if x[-1] >= 0.5]
return bboxlist
def refine_nms(self, dets, thresh):
if len(dets) == 0:
return []
dets = np.asarray(dets)
x1 = dets[:, 0]
y1 = dets[:, 1]
x2 = dets[:, 2]
y2 = dets[:, 3]
scores = dets[:, 4]
areas = (x2 - x1 + 1) * (y2 - y1 + 1)
order = scores.argsort()[::-1]
keep = []
while order.size > 0:
i = order[0]
keep.append(i)
xx1 = np.maximum(x1[i], x1[order[1:]])
yy1 = np.maximum(y1[i], y1[order[1:]])
xx2 = np.minimum(x2[i], x2[order[1:]])
yy2 = np.minimum(y2[i], y2[order[1:]])
w = np.maximum(0.0, xx2 - xx1 + 1)
h = np.maximum(0.0, yy2 - yy1 + 1)
inter = w * h
ovr = inter / (areas[i] + areas[order[1:]] - inter)
inds = np.where(ovr <= thresh)[0]
order = order[inds + 1]
return keep
复制代码
评分
查看全部评分