-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy path批量扫描emby视频信息.py
111 lines (91 loc) · 3.24 KB
/
批量扫描emby视频信息.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import requests
import logging
import os
import concurrent.futures
working_directory = os.path.dirname(os.path.abspath(__file__))
os.chdir(working_directory)
# 创建日志记录器
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# 创建控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
console_format = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s\n", "%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(console_format)
# 创建文件处理器
file_handler = logging.FileHandler("emby_extract.log")
file_handler.setLevel(logging.DEBUG)
file_format = logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s\n", "%Y-%m-%d %H:%M:%S"
)
file_handler.setFormatter(file_format)
# 将处理器添加到记录器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
import time
def measure_time(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
total_time = end_time - start_time
logger.info(f"总耗时: {total_time:.2f} 秒")
return result
return wrapper
# 获取所有电影的信息
def get_all_movies():
url = f"{EMBY_SERVER_URL}/emby/Items"
params = {
"api_key": API_KEY,
"IncludeItemTypes": "Movie,Episode",
"Recursive": True,
"Fields": "ProviderIds,Path",
}
try:
response = requests.get(url, params=params)
response.raise_for_status() # 检查请求是否成功
return response.json()["Items"]
except requests.exceptions.HTTPError as http_err:
logger.info(f"HTTP error occurred: {http_err}") # 输出HTTP错误
except requests.exceptions.RequestException as err:
logger.info(f"Other error occurred: {err}") # 输出其他错误
except ValueError:
logger.info("Error parsing JSON response")
logger.info("Response content:", response.content) # 输出响应内容以便调试
return []
def scan_item(item_id: str):
url = f"{EMBY_SERVER_URL}/emby/Items/{item_id}/PlaybackInfo"
params = {
"api_key": API_KEY,
}
response = requests.get(url, params=params)
if response.status_code == 200:
return True
else:
logger.info(f"Request failed with status code: {response.status_code}")
return False
# 主函数
@measure_time
def main():
total_num = 0
all_movies = get_all_movies()
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(scan_item, item["Id"]) for item in all_movies]
for future in concurrent.futures.as_completed(futures):
try:
if future.result():
total_num += 1
logger.info(
f"已成功扫描视频信息:{all_movies[futures.index(future)]['Path']}\n共扫描{total_num}个视频"
)
except:
continue
logger.info(f"总共扫描了 {total_num} 个视频")
if __name__ == "__main__":
# 配置Emby服务器信息
EMBY_SERVER_URL = "http://127.0.0.1:8096"
API_KEY = "5b2d4062d9124a6ca9e7eab015251f53"
num_threads = 5 # 设置线程数
main()