mirror of
https://github.com/FranP-code/classify_saved_videos_yt.git
synced 2025-10-13 00:32:25 +00:00
Update requirements and refactor script for video classification automation
- Updated pynput version in requirements.txt - Refactored script.py to enhance video classification functionality using Ollama - Added methods for video information extraction, classification, and CSV handling - Improved browser initialization and error handling
This commit is contained in:
@@ -2,4 +2,12 @@
|
||||
opencv-python==4.11.0.86
|
||||
pillow==11.3.0
|
||||
PyAutoGUI==0.9.54
|
||||
pynput==0.13.5
|
||||
pynput==1.8.1
|
||||
requests==2.31.0
|
||||
pandas~=2.3.1
|
||||
ollama==0.2.1
|
||||
configparser==6.0.0
|
||||
pyperclip==1.8.2
|
||||
pytesseract==0.3.10
|
||||
selenium==4.15.2
|
||||
webdriver-manager==4.0.1
|
||||
|
||||
873
script.py
873
script.py
@@ -2,25 +2,666 @@ import pyautogui as pgui
|
||||
import time
|
||||
from pynput import keyboard as kb
|
||||
import sys
|
||||
import csv
|
||||
import os
|
||||
import requests
|
||||
import pandas as pd
|
||||
import base64
|
||||
from io import BytesIO
|
||||
from PIL import Image
|
||||
import json
|
||||
import configparser
|
||||
import pyperclip
|
||||
import pytesseract
|
||||
import platform # Add this import for OS detection
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.chrome.options import Options
|
||||
from webdriver_manager.chrome import ChromeDriverManager
|
||||
from selenium.webdriver.chrome.service import Service
|
||||
from selenium.common.exceptions import TimeoutException, NoSuchElementException
|
||||
|
||||
# Load configuration
|
||||
config = configparser.ConfigParser()
|
||||
config.read('config.ini')
|
||||
|
||||
playlist_url = 'https://www.youtube.com/playlist?list=WL'
|
||||
# Configuration variables
|
||||
OLLAMA_HOST = config.get(
|
||||
'DEFAULT',
|
||||
'ollama_host',
|
||||
fallback='http://localhost:11434')
|
||||
OLLAMA_MODEL = config.get('DEFAULT', 'ollama_model', fallback='qwen2.5vl:7b')
|
||||
|
||||
CLASSIFICATION_PROMPT = """
|
||||
Please classify this YouTube video based on its title and thumbnail.
|
||||
|
||||
Video Title: {video_title}
|
||||
|
||||
Existing Classifications: {existing_categories}
|
||||
|
||||
Instructions:
|
||||
1. If the video fits into one of the existing classifications, use that exact classification name.
|
||||
2. If the video doesn't fit any existing classification, create a new appropriate classification name.
|
||||
3. Classification names should be concise (1-3 words) and descriptive.
|
||||
4. Examples of good classifications: "Tech Reviews", "Cooking", "Gaming", "Education", "Music", "Comedy", etc.
|
||||
5. Respond with ONLY the classification name, nothing else.
|
||||
"""
|
||||
|
||||
playlist_url = config.get(
|
||||
'DEFAULT',
|
||||
'playlist_url',
|
||||
fallback='https://www.youtube.com/playlist?list=WL')
|
||||
classifications_csv = config.get(
|
||||
'DEFAULT',
|
||||
'classifications_csv',
|
||||
fallback='video_classifications.csv')
|
||||
|
||||
quit = False
|
||||
driver = None # Global driver variable
|
||||
|
||||
half_left = (
|
||||
0,
|
||||
0,
|
||||
pgui.size().width // 2,
|
||||
pgui.size().height
|
||||
)
|
||||
|
||||
half_right = (
|
||||
pgui.size().width // 2,
|
||||
0,
|
||||
pgui.size().width,
|
||||
pgui.size().height
|
||||
)
|
||||
def get_chrome_binary_path():
|
||||
"""Get the Chrome binary path based on the operating system."""
|
||||
system = platform.system().lower()
|
||||
|
||||
if system == "linux":
|
||||
# Common paths for Google Chrome on Linux
|
||||
chrome_paths = [
|
||||
"/usr/bin/google-chrome",
|
||||
"/usr/bin/google-chrome-stable",
|
||||
"/opt/google/chrome/google-chrome",
|
||||
"/usr/local/bin/google-chrome",
|
||||
"/usr/bin/chromium-browser", # Add chromium as fallback
|
||||
"/snap/bin/chromium"
|
||||
]
|
||||
elif system == "darwin": # macOS
|
||||
chrome_paths = [
|
||||
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
|
||||
"/Applications/Chromium.app/Contents/MacOS/Chromium"
|
||||
]
|
||||
elif system == "windows":
|
||||
chrome_paths = [
|
||||
"C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe",
|
||||
"C:\\Program Files (x86)\\Google\\Chrome\\Application\\chrome.exe",
|
||||
os.path.expanduser("~\\AppData\\Local\\Google\\Chrome\\Application\\chrome.exe")]
|
||||
else:
|
||||
chrome_paths = []
|
||||
|
||||
# Find the first existing Chrome binary
|
||||
for path in chrome_paths:
|
||||
if os.path.exists(path):
|
||||
return path
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_system_chromedriver_path():
|
||||
"""Get the system chromedriver path based on the operating system."""
|
||||
system = platform.system().lower()
|
||||
|
||||
if system == "linux":
|
||||
chromedriver_paths = [
|
||||
"/usr/bin/chromedriver",
|
||||
"/usr/local/bin/chromedriver",
|
||||
"/snap/bin/chromedriver"
|
||||
]
|
||||
elif system == "darwin": # macOS
|
||||
chromedriver_paths = [
|
||||
"/usr/local/bin/chromedriver",
|
||||
"/opt/homebrew/bin/chromedriver"
|
||||
]
|
||||
elif system == "windows":
|
||||
chromedriver_paths = [
|
||||
"C:\\Program Files\\Google\\Chrome\\Application\\chromedriver.exe",
|
||||
"C:\\Windows\\System32\\chromedriver.exe"
|
||||
]
|
||||
else:
|
||||
chromedriver_paths = []
|
||||
|
||||
# Find the first existing chromedriver
|
||||
for path in chromedriver_paths:
|
||||
if os.path.exists(path):
|
||||
return path
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def init_browser():
|
||||
"""Initialize Chrome browser with options for automation."""
|
||||
global driver
|
||||
try:
|
||||
# Setup Chrome options
|
||||
chrome_options = Options()
|
||||
chrome_options.add_argument("--no-sandbox")
|
||||
chrome_options.add_argument("--disable-dev-shm-usage")
|
||||
chrome_options.add_argument(
|
||||
"--disable-blink-features=AutomationControlled")
|
||||
chrome_options.add_experimental_option(
|
||||
"excludeSwitches", ["enable-automation"])
|
||||
chrome_options.add_experimental_option('useAutomationExtension', False)
|
||||
|
||||
# Remove headless mode to make browser visible
|
||||
# chrome_options.add_argument("--headless=new") # Commented out to
|
||||
# show browser
|
||||
chrome_options.add_argument("--disable-gpu")
|
||||
chrome_options.add_argument("--window-size=1920,1080")
|
||||
|
||||
# Add options for dev container to display browser on host
|
||||
chrome_options.add_argument("--disable-extensions")
|
||||
chrome_options.add_argument("--disable-plugins")
|
||||
chrome_options.add_argument("--disable-background-timer-throttling")
|
||||
chrome_options.add_argument("--disable-backgrounding-occluded-windows")
|
||||
chrome_options.add_argument("--disable-renderer-backgrounding")
|
||||
|
||||
# Get the appropriate Chrome binary path
|
||||
chrome_binary = get_chrome_binary_path()
|
||||
if chrome_binary:
|
||||
chrome_options.binary_location = chrome_binary
|
||||
print(f"Using Chrome binary: {chrome_binary}")
|
||||
else:
|
||||
print("Warning: Could not find Chrome binary, using system default")
|
||||
|
||||
# Initialize the driver with various fallback options
|
||||
driver_initialized = False
|
||||
|
||||
# Try 1: System chromedriver with detected Chrome binary
|
||||
if not driver_initialized:
|
||||
try:
|
||||
chromedriver_path = get_system_chromedriver_path()
|
||||
if chromedriver_path:
|
||||
service = Service(chromedriver_path)
|
||||
driver = webdriver.Chrome(
|
||||
service=service, options=chrome_options)
|
||||
driver_initialized = True
|
||||
print(
|
||||
f"Initialized with system chromedriver: {chromedriver_path}")
|
||||
except Exception as e:
|
||||
print(f"System chromedriver failed: {e}")
|
||||
|
||||
# Try 2: WebDriverManager with explicit OS detection
|
||||
if not driver_initialized:
|
||||
try:
|
||||
# Force the correct OS for WebDriverManager
|
||||
import tempfile
|
||||
import shutil
|
||||
|
||||
# Clear any cached drivers that might have wrong architecture
|
||||
wdm_cache_dir = os.path.expanduser("~/.wdm")
|
||||
if os.path.exists(wdm_cache_dir):
|
||||
print(
|
||||
"Clearing WebDriverManager cache to avoid architecture conflicts...")
|
||||
shutil.rmtree(wdm_cache_dir, ignore_errors=True)
|
||||
|
||||
# Initialize ChromeDriverManager with explicit OS detection
|
||||
from webdriver_manager.core.utils import ChromeType
|
||||
manager = ChromeDriverManager(
|
||||
chrome_type=ChromeType.CHROMIUM if "chromium" in (
|
||||
chrome_binary or "").lower() else ChromeType.GOOGLE)
|
||||
service = Service(manager.install())
|
||||
driver = webdriver.Chrome(
|
||||
service=service, options=chrome_options)
|
||||
driver_initialized = True
|
||||
print("Initialized with WebDriverManager")
|
||||
except Exception as e:
|
||||
print(f"WebDriverManager failed: {e}")
|
||||
|
||||
# Try 3: Default Chrome without specifying service
|
||||
if not driver_initialized:
|
||||
try:
|
||||
driver = webdriver.Chrome(options=chrome_options)
|
||||
driver_initialized = True
|
||||
print("Initialized with default Chrome")
|
||||
except Exception as e:
|
||||
print(f"Default Chrome initialization failed: {e}")
|
||||
|
||||
# Try 4: Fallback with minimal options (still visible)
|
||||
if not driver_initialized:
|
||||
try:
|
||||
chrome_options_fallback = Options()
|
||||
chrome_options_fallback.add_argument("--no-sandbox")
|
||||
chrome_options_fallback.add_argument("--disable-dev-shm-usage")
|
||||
chrome_options_fallback.add_argument("--disable-extensions")
|
||||
chrome_options_fallback.add_argument("--disable-plugins")
|
||||
chrome_options_fallback.add_argument("--window-size=1920,1080")
|
||||
|
||||
# Try without custom binary location
|
||||
driver = webdriver.Chrome(options=chrome_options_fallback)
|
||||
driver_initialized = True
|
||||
print("Initialized with fallback Chrome options")
|
||||
except Exception as e:
|
||||
print(f"Fallback Chrome initialization failed: {e}")
|
||||
|
||||
# Try 5: Last resort - use system command to find chromedriver
|
||||
if not driver_initialized:
|
||||
try:
|
||||
import subprocess
|
||||
result = subprocess.run(
|
||||
['which', 'chromedriver'], capture_output=True, text=True)
|
||||
if result.returncode == 0:
|
||||
chromedriver_path = result.stdout.strip()
|
||||
service = Service(chromedriver_path)
|
||||
chrome_options_minimal = Options()
|
||||
chrome_options_minimal.add_argument("--no-sandbox")
|
||||
chrome_options_minimal.add_argument(
|
||||
"--window-size=1920,1080")
|
||||
|
||||
driver = webdriver.Chrome(
|
||||
service=service, options=chrome_options_minimal)
|
||||
driver_initialized = True
|
||||
print(
|
||||
f"Initialized with system-found chromedriver: {chromedriver_path}")
|
||||
except Exception as e:
|
||||
print(f"System command fallback failed: {e}")
|
||||
|
||||
if not driver_initialized:
|
||||
print("All browser initialization attempts failed")
|
||||
return False
|
||||
|
||||
# Remove automation indicators
|
||||
try:
|
||||
driver.execute_script(
|
||||
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
|
||||
except BaseException:
|
||||
pass # Ignore if this fails
|
||||
|
||||
print("Browser initialized successfully!")
|
||||
|
||||
# Navigate to the playlist URL
|
||||
driver.get(playlist_url)
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("SETUP INSTRUCTIONS:")
|
||||
print("1. The browser should now be visible on your screen")
|
||||
print("2. Please log in to your YouTube account in the browser")
|
||||
print("3. Navigate to your playlist if not already there")
|
||||
print("4. Ensure the playlist URL in config.ini is correct")
|
||||
print("5. The script will process videos automatically")
|
||||
print("6. Press 'q' to quit at any time")
|
||||
print("7. If you're in a dev container, use the command below to open the browser:")
|
||||
print(f' "$BROWSER" {playlist_url}')
|
||||
input("Press Enter to continue after logging in...")
|
||||
print("=" * 60)
|
||||
|
||||
print(f"Current browser navigated to: {playlist_url}")
|
||||
|
||||
# Wait a moment for any redirects
|
||||
time.sleep(5)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error initializing browser: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def get_video_info_web():
|
||||
"""Extract video information using web scraping from playlist items."""
|
||||
global driver
|
||||
try:
|
||||
# Wait for playlist elements to be present
|
||||
wait = WebDriverWait(driver, 10)
|
||||
|
||||
# Find the first video in the playlist
|
||||
video_selectors = [
|
||||
"ytd-playlist-video-renderer",
|
||||
".ytd-playlist-video-renderer",
|
||||
"#contents ytd-playlist-video-renderer:first-child"
|
||||
]
|
||||
|
||||
video_element = None
|
||||
for selector in video_selectors:
|
||||
try:
|
||||
video_element = wait.until(
|
||||
EC.presence_of_element_located(
|
||||
(By.CSS_SELECTOR, selector)))
|
||||
if video_element:
|
||||
break
|
||||
except BaseException:
|
||||
continue
|
||||
|
||||
if not video_element:
|
||||
print("Could not find video element in playlist")
|
||||
return None, None, None
|
||||
|
||||
# Extract video title
|
||||
video_title = None
|
||||
title_selectors = [
|
||||
"a#video-title",
|
||||
".ytd-playlist-video-renderer #video-title",
|
||||
"h3 a#video-title"
|
||||
]
|
||||
|
||||
for selector in title_selectors:
|
||||
try:
|
||||
title_element = video_element.find_element(
|
||||
By.CSS_SELECTOR, selector)
|
||||
video_title = title_element.get_attribute(
|
||||
"title") or title_element.text.strip()
|
||||
if video_title:
|
||||
break
|
||||
except BaseException:
|
||||
continue
|
||||
|
||||
if not video_title:
|
||||
video_title = f"Video_{int(time.time())}"
|
||||
|
||||
print(f"Extracted video title: {video_title}")
|
||||
|
||||
# Get video URL using share functionality
|
||||
video_url = None
|
||||
try:
|
||||
# Find and click the options button for this video
|
||||
options_button = video_element.find_element(
|
||||
By.CSS_SELECTOR, "#button.style-scope.yt-icon-button")
|
||||
driver.execute_script("arguments[0].click();", options_button)
|
||||
time.sleep(1)
|
||||
|
||||
# Find and click the share button
|
||||
share_button = wait.until(
|
||||
EC.element_to_be_clickable(
|
||||
(By.CSS_SELECTOR,
|
||||
"#items > ytd-menu-service-item-renderer:nth-child(6) > tp-yt-paper-item > yt-formatted-string")))
|
||||
driver.execute_script("arguments[0].click();", share_button)
|
||||
time.sleep(1)
|
||||
|
||||
# Wait for share modal to appear and get the URL
|
||||
try:
|
||||
url_input = wait.until(
|
||||
EC.presence_of_element_located(
|
||||
(By.CSS_SELECTOR, "input[readonly]")))
|
||||
video_url = url_input.get_attribute("value")
|
||||
|
||||
# If that doesn't work, try alternative selectors
|
||||
if not video_url:
|
||||
url_selectors = [
|
||||
"#share-url",
|
||||
"input[type='text'][readonly]",
|
||||
".style-scope.ytd-copy-link-renderer input"
|
||||
]
|
||||
for selector in url_selectors:
|
||||
try:
|
||||
url_element = driver.find_element(
|
||||
By.CSS_SELECTOR, selector)
|
||||
video_url = url_element.get_attribute("value")
|
||||
if video_url:
|
||||
break
|
||||
except BaseException:
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error getting URL from share modal: {e}")
|
||||
|
||||
# Close the share modal
|
||||
try:
|
||||
close_button = driver.find_element(
|
||||
By.CSS_SELECTOR, "#button.style-scope.yt-icon-button > yt-icon[icon='close']")
|
||||
driver.execute_script("arguments[0].click();", close_button)
|
||||
time.sleep(0.5)
|
||||
except BaseException:
|
||||
# Try alternative close methods
|
||||
try:
|
||||
close_button = driver.find_element(
|
||||
By.CSS_SELECTOR, "yt-icon-button[aria-label*='Close']")
|
||||
driver.execute_script(
|
||||
"arguments[0].click();", close_button)
|
||||
time.sleep(0.5)
|
||||
except BaseException:
|
||||
# Press Escape key as fallback
|
||||
driver.find_element(
|
||||
By.TAG_NAME, "body").send_keys(
|
||||
Keys.ESCAPE)
|
||||
time.sleep(0.5)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error extracting video URL: {e}")
|
||||
# Fallback: try to get href from video title link
|
||||
try:
|
||||
title_link = video_element.find_element(
|
||||
By.CSS_SELECTOR, "a#video-title")
|
||||
href = title_link.get_attribute("href")
|
||||
if href:
|
||||
video_url = href
|
||||
except BaseException:
|
||||
video_url = f"https://youtube.com/watch?v=unknown_{int(time.time())}"
|
||||
|
||||
print(f"Extracted video URL: {video_url}")
|
||||
|
||||
# Take screenshot of video thumbnail
|
||||
try:
|
||||
thumbnail_path = "temp_thumbnail.png"
|
||||
|
||||
# Try to find the video thumbnail image
|
||||
thumbnail_selectors = [
|
||||
"img#img",
|
||||
".ytd-thumbnail img",
|
||||
"ytd-thumbnail img",
|
||||
"#thumbnail img"
|
||||
]
|
||||
|
||||
screenshot_taken = False
|
||||
for selector in thumbnail_selectors:
|
||||
try:
|
||||
thumbnail_element = video_element.find_element(
|
||||
By.CSS_SELECTOR, selector)
|
||||
thumbnail_element.screenshot(thumbnail_path)
|
||||
screenshot_taken = True
|
||||
break
|
||||
except BaseException:
|
||||
continue
|
||||
|
||||
if not screenshot_taken:
|
||||
# Take screenshot of the entire video element
|
||||
video_element.screenshot(thumbnail_path)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error taking thumbnail screenshot: {e}")
|
||||
# Create a placeholder image
|
||||
try:
|
||||
img = Image.new('RGB', (320, 180), color='lightgray')
|
||||
img.save(thumbnail_path)
|
||||
except BaseException:
|
||||
thumbnail_path = None
|
||||
|
||||
return video_title, video_url, thumbnail_path
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error extracting video info: {e}")
|
||||
return None, None, None
|
||||
|
||||
|
||||
def navigate_to_next_video():
|
||||
"""Navigate to the next video in the playlist by removing the current one."""
|
||||
global driver
|
||||
try:
|
||||
# The next video becomes the first video after removal
|
||||
# So we don't need to navigate, just wait for the page to update
|
||||
time.sleep(2)
|
||||
|
||||
# Check if there are still videos in the playlist
|
||||
try:
|
||||
wait = WebDriverWait(driver, 5)
|
||||
next_video = wait.until(EC.presence_of_element_located(
|
||||
(By.CSS_SELECTOR, "ytd-playlist-video-renderer")))
|
||||
return True
|
||||
except BaseException:
|
||||
print("No more videos in playlist")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error checking for next video: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def remove_video_from_playlist():
|
||||
"""Remove the current video from the playlist."""
|
||||
global driver
|
||||
try:
|
||||
wait = WebDriverWait(driver, 10)
|
||||
|
||||
# Find the first video in the playlist
|
||||
video_element = wait.until(
|
||||
EC.presence_of_element_located(
|
||||
(By.CSS_SELECTOR, "ytd-playlist-video-renderer")))
|
||||
|
||||
# Find and click the options button
|
||||
options_button = video_element.find_element(
|
||||
By.CSS_SELECTOR, "#button.style-scope.yt-icon-button")
|
||||
driver.execute_script("arguments[0].click();", options_button)
|
||||
time.sleep(1)
|
||||
|
||||
# Find and click the remove button
|
||||
remove_selectors = [
|
||||
"ytd-menu-service-item-renderer tp-yt-paper-item[role='menuitem']",
|
||||
"#items > ytd-menu-service-item-renderer tp-yt-paper-item",
|
||||
"tp-yt-paper-item[role='menuitem']"
|
||||
]
|
||||
|
||||
removed = False
|
||||
for selector in remove_selectors:
|
||||
try:
|
||||
menu_items = driver.find_elements(By.CSS_SELECTOR, selector)
|
||||
for item in menu_items:
|
||||
item_text = item.text.strip().lower()
|
||||
if "remove" in item_text or "delete" in item_text:
|
||||
driver.execute_script("arguments[0].click();", item)
|
||||
print("Video removed from playlist")
|
||||
removed = True
|
||||
break
|
||||
if removed:
|
||||
break
|
||||
except BaseException:
|
||||
continue
|
||||
|
||||
if not removed:
|
||||
print("Could not find remove button")
|
||||
# Try to close the menu
|
||||
try:
|
||||
driver.find_element(By.TAG_NAME, "body").send_keys(Keys.ESCAPE)
|
||||
except BaseException:
|
||||
pass
|
||||
return False
|
||||
|
||||
time.sleep(2) # Wait for removal to complete
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error removing video: {e}")
|
||||
return False
|
||||
|
||||
# Initialize CSV file if it doesn't exist
|
||||
|
||||
|
||||
def init_csv():
|
||||
"""Initialize the CSV file with headers if it doesn't exist."""
|
||||
if not os.path.exists(classifications_csv):
|
||||
with open(classifications_csv, 'w', newline='', encoding='utf-8') as file:
|
||||
writer = csv.writer(file)
|
||||
writer.writerow(['video_title', 'video_url',
|
||||
'thumbnail_url', 'classification', 'timestamp'])
|
||||
print(f"Created {classifications_csv}")
|
||||
|
||||
|
||||
def load_existing_classifications():
|
||||
"""Load existing classifications from CSV."""
|
||||
try:
|
||||
if os.path.exists(classifications_csv):
|
||||
df = pd.read_csv(classifications_csv)
|
||||
return set(df['classification'].unique()
|
||||
) if not df.empty else set()
|
||||
return set()
|
||||
except Exception as e:
|
||||
print(f"Error loading classifications: {e}")
|
||||
return set()
|
||||
|
||||
|
||||
def save_classification(video_title, video_url, thumbnail_url, classification):
|
||||
"""Save a video classification to CSV."""
|
||||
try:
|
||||
with open(classifications_csv, 'a', newline='', encoding='utf-8') as file:
|
||||
writer = csv.writer(file)
|
||||
writer.writerow([video_title,
|
||||
video_url,
|
||||
thumbnail_url,
|
||||
classification,
|
||||
time.strftime('%Y-%m-%d %H:%M:%S')])
|
||||
print(f"Saved classification: {video_title} -> {classification}")
|
||||
except Exception as e:
|
||||
print(f"Error saving classification: {e}")
|
||||
|
||||
|
||||
def get_video_info():
|
||||
"""Extract video information using web scraping (replaces old GUI method)."""
|
||||
return get_video_info_web()
|
||||
|
||||
|
||||
def classify_video_with_ollama(
|
||||
video_title,
|
||||
thumbnail_path,
|
||||
existing_classifications):
|
||||
"""Use Ollama with Qwen2.5-VL to classify the video."""
|
||||
try:
|
||||
# Convert image to base64
|
||||
with open(thumbnail_path, "rb") as image_file:
|
||||
image_data = base64.b64encode(image_file.read()).decode('utf-8')
|
||||
|
||||
# Prepare existing classifications string
|
||||
existing_cats = ", ".join(
|
||||
existing_classifications) if existing_classifications else "None"
|
||||
|
||||
# Prepare the prompt
|
||||
prompt = CLASSIFICATION_PROMPT.format(
|
||||
video_title=video_title,
|
||||
existing_categories=existing_cats)
|
||||
|
||||
# Make request to Ollama
|
||||
response = requests.post(
|
||||
f'{OLLAMA_HOST}/api/generate',
|
||||
json={
|
||||
'model': OLLAMA_MODEL,
|
||||
'prompt': prompt,
|
||||
'images': [image_data],
|
||||
'stream': False
|
||||
}
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
classification = result['response'].strip()
|
||||
return classification
|
||||
else:
|
||||
print(f"Error from Ollama: {response.status_code}")
|
||||
return "Uncategorized"
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error classifying video: {e}")
|
||||
return "Uncategorized"
|
||||
|
||||
|
||||
def create_playlist_and_add_video(classification, video_url):
|
||||
"""Create a playlist based on classification and add video to it."""
|
||||
# This functionality would need to be implemented with Selenium
|
||||
# For now, it's commented out
|
||||
"""
|
||||
try:
|
||||
# Use Selenium to navigate to YouTube playlist creation
|
||||
# This would require additional implementation
|
||||
print(f"Would create/add to playlist: {classification}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error creating playlist/adding video: {e}")
|
||||
"""
|
||||
print(f"[TODO] Would add video to playlist: {classification}")
|
||||
|
||||
|
||||
def delete_video():
|
||||
"""Delete functionality - now handled by remove_video_from_playlist()."""
|
||||
print("[DEPRECATED] Use remove_video_from_playlist() instead")
|
||||
return True
|
||||
|
||||
# Keyboard listener for quitting
|
||||
|
||||
|
||||
def on_press_to_quit(key):
|
||||
"""Quit program if 'q' is pressed."""
|
||||
@@ -33,123 +674,109 @@ def on_press_to_quit(key):
|
||||
pass
|
||||
|
||||
|
||||
def locate_img(
|
||||
image: str,
|
||||
sleep_time:float=None,
|
||||
search_time:float=0,
|
||||
confidence:float=1.0,
|
||||
gray_scale:bool=False,
|
||||
region:tuple[int,int,int,int]=None
|
||||
):
|
||||
"""Locate and click on an image. Returns True if successful, False otherwise."""
|
||||
try:
|
||||
opt_loc = pgui.locateCenterOnScreen(
|
||||
f'img/{image}',
|
||||
confidence=confidence,
|
||||
minSearchTime=search_time,
|
||||
grayscale=gray_scale,
|
||||
region=region
|
||||
)
|
||||
|
||||
if opt_loc:
|
||||
pgui.click(opt_loc)
|
||||
|
||||
if sleep_time:
|
||||
time.sleep(sleep_time)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error locating '{image}': {e}")
|
||||
return False
|
||||
|
||||
|
||||
def change_to_not_available():
|
||||
try:
|
||||
|
||||
success = locate_img('pl_opt.png', sleep_time=0.2, search_time=0.5, confidence=0.8, gray_scale=True, region=half_left)
|
||||
|
||||
if not success: return False
|
||||
|
||||
success = locate_img('not-available.png', search_time=0.5, confidence=0.8)
|
||||
|
||||
if not success: return False
|
||||
|
||||
window = pgui.size()
|
||||
x_pos = 3 * (window.width / 4)
|
||||
y_pos = window.height / 2
|
||||
pgui.moveTo(x_pos, y_pos)
|
||||
time.sleep(5)
|
||||
|
||||
success = locate_img('options.png', sleep_time=0.2, confidence=0.8, gray_scale=True, region=half_right)
|
||||
|
||||
if not success: return False
|
||||
|
||||
return True
|
||||
|
||||
except:
|
||||
print(f"Critical error in change_to_not_available")
|
||||
return False
|
||||
|
||||
|
||||
listener = kb.Listener(on_press=on_press_to_quit)
|
||||
listener.start()
|
||||
|
||||
# Open browser
|
||||
browser_img = 'brave.png'
|
||||
|
||||
try:
|
||||
locate_img(browser_img, confidence=0.8, sleep_time=1)
|
||||
except Exception as e:
|
||||
print(f"Error opening browser, you must have your browser pinned in the taskbar")
|
||||
sys.exit('Closing script.')
|
||||
|
||||
# Open YouTube
|
||||
pgui.hotkey('ctrl', 't')
|
||||
pgui.write(playlist_url)
|
||||
pgui.press("enter")
|
||||
time.sleep(5)
|
||||
def cleanup_browser():
|
||||
"""Clean up browser resources."""
|
||||
global driver
|
||||
if driver:
|
||||
try:
|
||||
driver.quit()
|
||||
print("Browser closed.")
|
||||
except BaseException:
|
||||
pass
|
||||
|
||||
|
||||
# Updated main execution logic
|
||||
if __name__ == '__main__':
|
||||
counter = 0
|
||||
try:
|
||||
# Initialize CSV file
|
||||
init_csv()
|
||||
|
||||
while not quit:
|
||||
# restart tab to free up resources
|
||||
if counter > 0 and (counter % 90) == 0:
|
||||
pgui.hotkey('ctrl', 'w')
|
||||
pgui.hotkey('ctrl', 't')
|
||||
pgui.write(playlist_url)
|
||||
pgui.press("enter")
|
||||
time.sleep(8)
|
||||
|
||||
# Load existing classifications
|
||||
existing_classifications = load_existing_classifications()
|
||||
print(
|
||||
f"Loaded {
|
||||
len(existing_classifications)} existing classifications: {existing_classifications}")
|
||||
|
||||
try:
|
||||
success = locate_img('options.png', sleep_time=0.2, confidence=0.8, gray_scale=True, region=half_right)
|
||||
# Initialize browser
|
||||
if not init_browser():
|
||||
print("Failed to initialize browser. Exiting.")
|
||||
sys.exit(1)
|
||||
|
||||
if not success:
|
||||
print('changing plans to enable videos...')
|
||||
second_success = change_to_not_available()
|
||||
counter = 0
|
||||
|
||||
if not second_success:
|
||||
# Set up keyboard listener for quitting
|
||||
listener = kb.Listener(on_press=on_press_to_quit)
|
||||
listener.start()
|
||||
|
||||
print("\nStarting video processing...")
|
||||
print("Press 'q' to quit at any time.")
|
||||
|
||||
while not quit:
|
||||
try:
|
||||
# Extract video information using web scraping
|
||||
video_title, video_url, thumbnail_path = get_video_info()
|
||||
|
||||
if video_title and video_url and thumbnail_path:
|
||||
# Classify video using Ollama
|
||||
print(f"\nClassifying video: {video_title}")
|
||||
classification = classify_video_with_ollama(
|
||||
video_title, thumbnail_path, existing_classifications)
|
||||
print(f"Classification result: {classification}")
|
||||
|
||||
# Save classification to CSV
|
||||
save_classification(
|
||||
video_title, video_url, thumbnail_path, classification)
|
||||
|
||||
# Update existing classifications set
|
||||
existing_classifications.add(classification)
|
||||
|
||||
# Create playlist and add video (commented for testing)
|
||||
create_playlist_and_add_video(classification, video_url)
|
||||
|
||||
# Remove video from current playlist
|
||||
remove_success = remove_video_from_playlist()
|
||||
|
||||
if remove_success:
|
||||
counter += 1
|
||||
print(f"Processed {counter} videos.")
|
||||
|
||||
# Navigate to next video
|
||||
if not navigate_to_next_video():
|
||||
print("No more videos to process.")
|
||||
break
|
||||
else:
|
||||
# If can't remove, try to navigate to next video anyway
|
||||
if not navigate_to_next_video():
|
||||
print("Could not navigate to next video.")
|
||||
break
|
||||
counter += 1
|
||||
print(f"Processed {counter} videos (removal failed).")
|
||||
|
||||
# Clean up temporary thumbnail
|
||||
if os.path.exists(thumbnail_path):
|
||||
os.remove(thumbnail_path)
|
||||
|
||||
time.sleep(2) # Brief pause between videos
|
||||
|
||||
else:
|
||||
print("Could not extract video information")
|
||||
# Try to navigate to next video anyway
|
||||
if not navigate_to_next_video():
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in main processing loop: {e}")
|
||||
# Try to continue with next video
|
||||
if not navigate_to_next_video():
|
||||
break
|
||||
except:
|
||||
print('Error finding options image')
|
||||
break
|
||||
|
||||
try:
|
||||
success = locate_img('delete.png', confidence=0.8)
|
||||
if success:
|
||||
counter += 1
|
||||
print(f"Deleted {counter} videos.")
|
||||
time.sleep(0.2)
|
||||
listener.stop()
|
||||
cleanup_browser()
|
||||
print("Script finished.")
|
||||
|
||||
else: break
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error in delete step: {e}")
|
||||
break
|
||||
|
||||
|
||||
listener.stop()
|
||||
print("Script finished.")
|
||||
except KeyboardInterrupt:
|
||||
print("\nScript interrupted by user.")
|
||||
cleanup_browser()
|
||||
except Exception as e:
|
||||
print(f"Unexpected error: {e}")
|
||||
cleanup_browser()
|
||||
|
||||
Reference in New Issue
Block a user