|
import os
|
|
import sys
|
|
import shutil
|
|
from infer import VC
|
|
|
|
now_dir = os.getcwd()
|
|
sys.path.append(now_dir)
|
|
tmp = os.path.join(now_dir, "TEMP")
|
|
shutil.rmtree(tmp, ignore_errors=True)
|
|
shutil.rmtree("%s/runtime/Lib/site-packages/infer_pack" % (now_dir), ignore_errors=True)
|
|
os.makedirs(tmp, exist_ok=True)
|
|
os.environ["TEMP"] = tmp
|
|
|
|
import logging
|
|
import argparse
|
|
import json
|
|
from multiprocessing import cpu_count
|
|
import torch
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
version_config_list = [
|
|
"v1/32k.json",
|
|
"v1/40k.json",
|
|
"v1/48k.json",
|
|
"v2/48k.json",
|
|
"v2/32k.json",
|
|
]
|
|
|
|
|
|
def singleton_variable(func):
|
|
def wrapper(*args, **kwargs):
|
|
if not wrapper.instance:
|
|
wrapper.instance = func(*args, **kwargs)
|
|
return wrapper.instance
|
|
|
|
wrapper.instance = None
|
|
return wrapper
|
|
|
|
|
|
|
|
|
|
@singleton_variable
|
|
class Config:
|
|
def __init__(self):
|
|
self.device = "cuda:0"
|
|
self.is_half = True
|
|
self.use_jit = False
|
|
self.n_cpu = 0
|
|
self.gpu_name = None
|
|
self.json_config = self.load_config_json()
|
|
self.gpu_mem = None
|
|
(
|
|
self.python_cmd,
|
|
self.listen_port,
|
|
self.iscolab,
|
|
self.noparallel,
|
|
self.noautoopen,
|
|
self.dml,
|
|
self.nocheck,
|
|
self.update,
|
|
) = self.arg_parse()
|
|
self.instead = ""
|
|
self.preprocess_per = 3.7
|
|
self.x_pad, self.x_query, self.x_center, self.x_max = self.device_config()
|
|
|
|
@staticmethod
|
|
def load_config_json() -> dict:
|
|
d = {}
|
|
for config_file in version_config_list:
|
|
p = f"configs/inuse/{config_file}"
|
|
if not os.path.exists(p):
|
|
shutil.copy(f"configs/{config_file}", p)
|
|
with open(f"configs/inuse/{config_file}", "r") as f:
|
|
d[config_file] = json.load(f)
|
|
return d
|
|
|
|
@staticmethod
|
|
def arg_parse() -> tuple:
|
|
exe = sys.executable or "python"
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--port", type=int, default=7865)
|
|
parser.add_argument("--pycmd", type=str, default=exe)
|
|
parser.add_argument("--colab")
|
|
parser.add_argument("--noparallel")
|
|
parser.add_argument("--noautoopen")
|
|
parser.add_argument("--dml")
|
|
parser.add_argument("--nocheck")
|
|
parser.add_argument("--update")
|
|
cmd_opts = parser.parse_args()
|
|
|
|
cmd_opts.port = cmd_opts.port if 0 <= cmd_opts.port <= 65535 else 7865
|
|
|
|
return (
|
|
cmd_opts.pycmd,
|
|
cmd_opts.port,
|
|
cmd_opts.colab,
|
|
cmd_opts.noparallel,
|
|
cmd_opts.noautoopen,
|
|
cmd_opts.dml,
|
|
cmd_opts.nocheck,
|
|
cmd_opts.update,
|
|
)
|
|
|
|
@staticmethod
|
|
def has_mps() -> bool:
|
|
if not torch.backends.mps.is_available():
|
|
return False
|
|
try:
|
|
torch.zeros(1).to(torch.device("mps"))
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
@staticmethod
|
|
def has_xpu() -> bool:
|
|
if hasattr(torch, "xpu") and torch.xpu.is_available():
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def use_fp32_config(self):
|
|
for config_file in version_config_list:
|
|
self.json_config[config_file]["train"]["fp16_run"] = False
|
|
with open(f"configs/inuse/{config_file}", "r") as f:
|
|
strr = f.read().replace("true", "false")
|
|
with open(f"configs/inuse/{config_file}", "w") as f:
|
|
f.write(strr)
|
|
logger.info("Ghi Đè Lên " + config_file)
|
|
self.preprocess_per = 3.0
|
|
logger.info("Ghi Đè preprocess_per Thành %d" % (self.preprocess_per))
|
|
|
|
def device_config(self) -> tuple:
|
|
if torch.cuda.is_available():
|
|
if self.has_xpu():
|
|
self.device = self.instead = "xpu:0"
|
|
self.is_half = True
|
|
i_device = int(self.device.split(":")[-1])
|
|
self.gpu_name = torch.cuda.get_device_name(i_device)
|
|
if (
|
|
("16" in self.gpu_name and "V100" not in self.gpu_name.upper())
|
|
or "P40" in self.gpu_name.upper()
|
|
or "P10" in self.gpu_name.upper()
|
|
or "1060" in self.gpu_name
|
|
or "1070" in self.gpu_name
|
|
or "1080" in self.gpu_name
|
|
):
|
|
logger.info("Tìm Thấy GPU: %s, force to fp32", self.gpu_name)
|
|
self.is_half = False
|
|
self.use_fp32_config()
|
|
else:
|
|
logger.info("Tìm Thấy GPU: %s", self.gpu_name)
|
|
self.gpu_mem = int(
|
|
torch.cuda.get_device_properties(i_device).total_memory
|
|
/ 1024
|
|
/ 1024
|
|
/ 1024
|
|
+ 0.4
|
|
)
|
|
if self.gpu_mem <= 4:
|
|
self.preprocess_per = 3.0
|
|
elif self.has_mps():
|
|
self.device = self.instead = "mps"
|
|
self.is_half = False
|
|
self.use_fp32_config()
|
|
else:
|
|
self.device = self.instead = "cpu"
|
|
self.is_half = False
|
|
self.use_fp32_config()
|
|
|
|
if self.n_cpu == 0:
|
|
self.n_cpu = cpu_count()
|
|
|
|
if self.is_half:
|
|
x_pad = 3
|
|
x_query = 10
|
|
x_center = 60
|
|
x_max = 65
|
|
else:
|
|
x_pad = 1
|
|
x_query = 6
|
|
x_center = 38
|
|
x_max = 41
|
|
|
|
if self.gpu_mem is not None and self.gpu_mem <= 4:
|
|
x_pad = 1
|
|
x_query = 5
|
|
x_center = 30
|
|
x_max = 32
|
|
if self.dml:
|
|
if (
|
|
os.path.exists(
|
|
"runtime\Lib\site-packages\onnxruntime\capi\DirectML.dll"
|
|
)
|
|
== False
|
|
):
|
|
try:
|
|
os.rename(
|
|
"runtime\Lib\site-packages\onnxruntime",
|
|
"runtime\Lib\site-packages\onnxruntime-cuda",
|
|
)
|
|
except:
|
|
pass
|
|
try:
|
|
os.rename(
|
|
"runtime\Lib\site-packages\onnxruntime-dml",
|
|
"runtime\Lib\site-packages\onnxruntime",
|
|
)
|
|
except:
|
|
pass
|
|
|
|
import torch_directml
|
|
|
|
self.device = torch_directml.device(torch_directml.default_device())
|
|
self.is_half = False
|
|
else:
|
|
if (
|
|
os.path.exists(
|
|
"runtime\Lib\site-packages\onnxruntime\capi\onnxruntime_providers_cuda.dll"
|
|
)
|
|
== False
|
|
):
|
|
try:
|
|
os.rename(
|
|
"runtime\Lib\site-packages\onnxruntime",
|
|
"runtime\Lib\site-packages\onnxruntime-dml",
|
|
)
|
|
except:
|
|
pass
|
|
try:
|
|
os.rename(
|
|
"runtime\Lib\site-packages\onnxruntime-cuda",
|
|
"runtime\Lib\site-packages\onnxruntime",
|
|
)
|
|
except:
|
|
pass
|
|
logger.info(
|
|
"thiết Bị: %s"
|
|
% (self.device)
|
|
)
|
|
return x_pad, x_query, x_center, x_max
|
|
|
|
|
|
config = Config()
|
|
vc = VC(config)
|
|
|
|
weight_root = "assets/weights"
|
|
index_root = "logs"
|
|
|
|
names = []
|
|
for name in os.listdir(weight_root):
|
|
if name.endswith(".pth"):
|
|
names.append(name)
|
|
|
|
index_paths = []
|
|
|
|
def lookup_indices(index_root):
|
|
global index_paths
|
|
for root, dirs, files in os.walk(index_root, topdown=False):
|
|
for name in files:
|
|
if name.endswith(".index") and "trained" not in name:
|
|
index_paths.append("%s/%s" % (root, name))
|
|
|
|
lookup_indices(index_root)
|
|
|
|
def change_choices():
|
|
names = []
|
|
for name in os.listdir(weight_root):
|
|
if name.endswith(".pth"):
|
|
names.append(name)
|
|
index_paths = []
|
|
for root, dirs, files in os.walk(index_root, topdown=False):
|
|
for name in files:
|
|
if name.endswith(".index") and "trained" not in name:
|
|
index_paths.append("%s/%s" % (root, name))
|
|
return {"choices": sorted(names), "__type__": "update"}, {
|
|
"choices": sorted(index_paths),
|
|
"__type__": "update",
|
|
}
|
|
|