mirror of
https://github.com/mikeswanson/WallGet.git
synced 2025-04-18 14:28:20 +02:00
213 lines
6.4 KiB
Python
213 lines
6.4 KiB
Python
import http.client
|
|
import json
|
|
import os
|
|
import plistlib
|
|
import shutil
|
|
import ssl
|
|
import time
|
|
import urllib.parse
|
|
from multiprocessing.pool import ThreadPool
|
|
from typing import Tuple
|
|
|
|
IDLEASSETSD_PATH = "/Library/Application Support/com.apple.idleassetsd"
|
|
STRINGS_PATH = f"{IDLEASSETSD_PATH}/Customer/TVIdleScreenStrings.bundle/en.lproj/Localizable.nocache.strings"
|
|
ENTRIES_PATH = f"{IDLEASSETSD_PATH}/Customer/entries.json"
|
|
VIDEO_PATH = f"{IDLEASSETSD_PATH}/Customer/4KSDR240FPS"
|
|
|
|
|
|
def main():
|
|
# Check if running as admin
|
|
if os.geteuid() != 0:
|
|
print(f'Please run as admin: sudo python3 "{__file__}"')
|
|
exit()
|
|
|
|
print("WallGet Live Wallpaper Download/Delete Script")
|
|
print("---------------------------------------------\n")
|
|
|
|
# Validate environment
|
|
if not os.path.isdir(IDLEASSETSD_PATH):
|
|
print("Unable to find idleassetsd path.")
|
|
exit()
|
|
if not os.path.isfile(STRINGS_PATH):
|
|
print("Unable to find localizable strings file.")
|
|
exit()
|
|
if not os.path.isfile(ENTRIES_PATH):
|
|
print("Unable to find entries.json file.")
|
|
exit()
|
|
if not os.path.isdir(VIDEO_PATH):
|
|
print("Unable to find video path.")
|
|
exit()
|
|
|
|
# Read localizable strings
|
|
with open(STRINGS_PATH, "rb") as fp:
|
|
strings = plistlib.load(fp)
|
|
|
|
# Read asset entries
|
|
asset_entries = json.load(open(ENTRIES_PATH))
|
|
|
|
# Show categories
|
|
item = 0
|
|
categories = asset_entries.get("categories", [])
|
|
for category in categories:
|
|
name = strings.get(category.get("localizedNameKey", ""), "")
|
|
item += 1
|
|
print(f"{item}. {name}")
|
|
print(f"{item + 1}. All")
|
|
|
|
# Select category
|
|
category_index = as_int(input("\nCategory number? "))
|
|
if category_index < 1 or category_index > item + 1:
|
|
print("\nNo category selected.")
|
|
exit()
|
|
category_id = (
|
|
categories[int(category_index) - 1]["id"] if category_index <= item else None
|
|
)
|
|
|
|
# Download or delete?
|
|
action = input("\n(d)Download or (x)delete? (d/x) ").strip().lower()
|
|
if action != "d" and action != "x":
|
|
print("\nNo action selected.")
|
|
exit()
|
|
action_text = "download" if action == "d" else "delete"
|
|
|
|
# Determine items
|
|
print(f"\nDetermining {action_text} size...", end="")
|
|
items = []
|
|
total_bytes = 0
|
|
for asset in asset_entries.get("assets", []):
|
|
if category_id and category_id not in asset.get("categories", []):
|
|
continue
|
|
|
|
label = strings.get(asset.get("localizedNameKey", ""), "")
|
|
id = asset.get("id", "")
|
|
|
|
# NOTE: May need to update this key logic if other formats are added
|
|
url = asset.get("url-4K-SDR-240FPS", "")
|
|
|
|
# Valid asset?
|
|
if not label or not id or not url:
|
|
continue
|
|
|
|
path = urllib.parse.urlparse(url).path
|
|
ext = os.path.splitext(path)[1]
|
|
file_path = f"{VIDEO_PATH}/{id}{ext}"
|
|
|
|
# Download if file doesn't exist or is the wrong size
|
|
file_exists = os.path.isfile(file_path)
|
|
file_size = os.path.getsize(file_path) if file_exists else 0
|
|
if action == "d":
|
|
content_length = get_content_length(url)
|
|
print(".", end="", flush=True)
|
|
if not file_exists or file_size != content_length:
|
|
items.append((label, url, file_path))
|
|
total_bytes += content_length
|
|
elif action == "x" and file_exists:
|
|
items.append((label, url, file_path))
|
|
total_bytes += file_size
|
|
|
|
print("done.\n")
|
|
|
|
# Anything to process?
|
|
if not items:
|
|
print(f"Nothing to {action_text}.")
|
|
exit()
|
|
|
|
# Disk space check
|
|
free_space = shutil.disk_usage("/").free
|
|
print(f"Available space: {format_bytes(free_space)}")
|
|
print(f"Files to {action_text} ({len(items)}): {format_bytes(total_bytes)}")
|
|
if action == "d" and total_bytes > free_space:
|
|
print("Not enough disk space to download all files.")
|
|
exit()
|
|
|
|
proceed = input(f"{action_text.capitalize()} files? (y/n) ").strip().lower()
|
|
if proceed != "y":
|
|
exit()
|
|
|
|
if action == "d":
|
|
start_time = time.time()
|
|
print("\nDownloading...")
|
|
results = ThreadPool().imap_unordered(download_file, items)
|
|
for result in results:
|
|
print(f" Downloaded '{result}'")
|
|
print(f"\nDownloaded {len(items)} files in {time.time() - start_time:.1f}s.")
|
|
elif action == "x":
|
|
print("\nDeleting...")
|
|
for item in items:
|
|
label, _, file_path = item
|
|
os.remove(file_path)
|
|
print(f" Deleted '{label}'")
|
|
print(f"\nDeleted {len(items)} files.")
|
|
|
|
# Optionally kill idleassetsd to update wallpaper status
|
|
should_kill = (
|
|
input("\nKill idleassetsd to update download status in Settings? (y/n) ")
|
|
.strip()
|
|
.lower()
|
|
)
|
|
if should_kill == "y":
|
|
os.system("killall idleassetsd")
|
|
print("Killed idleassetsd.")
|
|
|
|
print("\nDone.")
|
|
|
|
|
|
def as_int(s: str) -> int:
|
|
try:
|
|
return int(s)
|
|
except ValueError:
|
|
return -1
|
|
|
|
|
|
def format_bytes(bytes: int) -> str:
|
|
units = (
|
|
(1 << 50, "PB"),
|
|
(1 << 40, "TB"),
|
|
(1 << 30, "GB"),
|
|
(1 << 20, "MB"),
|
|
(1 << 10, "KB"),
|
|
(1, "bytes"),
|
|
)
|
|
if bytes == 1:
|
|
return "1 byte"
|
|
for factor, suffix in units:
|
|
if bytes >= factor:
|
|
break
|
|
return f"{bytes / factor:.2f} {suffix}"
|
|
|
|
|
|
def connect(parsed_url: urllib.parse.ParseResult) -> http.client.HTTPConnection:
|
|
context = ssl._create_unverified_context()
|
|
conn = (
|
|
http.client.HTTPSConnection(parsed_url.netloc, context=context)
|
|
if parsed_url.scheme == "https"
|
|
else http.client.HTTPConnection(parsed_url.netloc)
|
|
)
|
|
return conn
|
|
|
|
|
|
def get_content_length(url: str) -> int:
|
|
parsed_url = urllib.parse.urlparse(url)
|
|
conn = connect(parsed_url)
|
|
conn.request("HEAD", parsed_url.path)
|
|
r = conn.getresponse()
|
|
content_length = int(r.getheader("Content-Length", -1))
|
|
conn.close()
|
|
return content_length
|
|
|
|
|
|
def download_file(download: Tuple[str, str, str]) -> str:
|
|
label, url, file_path = download
|
|
parsed_url = urllib.parse.urlparse(url)
|
|
conn = connect(parsed_url)
|
|
conn.request("GET", parsed_url.path)
|
|
r = conn.getresponse()
|
|
if r.status == 200:
|
|
with open(file_path, "wb") as f:
|
|
shutil.copyfileobj(r, f)
|
|
conn.close()
|
|
return label
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|