Enhance logging and configuration for YouTube Video Classifier

- Added fallback model configuration in config.ini
- Updated requirements.txt to include rich for enhanced logging
- Refactored script.py to implement rich logging for better visibility
- Modified functions to include channel link extraction and updated CSV saving logic
- Improved error handling and user feedback throughout the script
This commit is contained in:
2025-07-12 03:03:38 +00:00
parent 8c4177dca0
commit 89314f9c74
3 changed files with 377 additions and 168 deletions

View File

@@ -4,6 +4,7 @@
# Ollama settings # Ollama settings
ollama_host = http://localhost:11434 ollama_host = http://localhost:11434
ollama_model = qwen2.5vl:7b ollama_model = qwen2.5vl:7b
ollama_fallback_model = gemma2:2b
# File paths # File paths
classifications_csv = video_classifications.csv classifications_csv = video_classifications.csv
@@ -22,4 +23,8 @@ restart_tab_frequency = 90
enable_delete = false enable_delete = false
enable_playlist_creation = false enable_playlist_creation = false
# LLM timeout settings (in seconds)
llm_primary_timeout = 60
llm_fallback_timeout = 60

View File

@@ -11,3 +11,4 @@ pyperclip==1.8.2
pytesseract==0.3.10 pytesseract==0.3.10
selenium==4.15.2 selenium==4.15.2
webdriver-manager==4.0.1 webdriver-manager==4.0.1
rich==13.8.0

533
script.py
View File

@@ -24,6 +24,17 @@ from selenium.webdriver.chrome.service import Service
from selenium.common.exceptions import TimeoutException, NoSuchElementException from selenium.common.exceptions import TimeoutException, NoSuchElementException
from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.keys import Keys
# Rich logging imports
from rich.console import Console
from rich import print as rprint
from rich.panel import Panel
from rich.text import Text
from rich.progress import Progress, SpinnerColumn, TextColumn
from rich.table import Table
# Initialize rich console
console = Console()
# Load configuration # Load configuration
config = configparser.ConfigParser() config = configparser.ConfigParser()
config.read('config.ini') config.read('config.ini')
@@ -34,6 +45,11 @@ OLLAMA_HOST = config.get(
'ollama_host', 'ollama_host',
fallback='http://localhost:11434') fallback='http://localhost:11434')
OLLAMA_MODEL = config.get('DEFAULT', 'ollama_model', fallback='qwen2.5vl:7b') OLLAMA_MODEL = config.get('DEFAULT', 'ollama_model', fallback='qwen2.5vl:7b')
OLLAMA_FALLBACK_MODEL = config.get('DEFAULT', 'ollama_fallback_model', fallback='gemma2:2b')
# Timeout settings
LLM_PRIMARY_TIMEOUT = config.getint('DEFAULT', 'llm_primary_timeout', fallback=60)
LLM_FALLBACK_TIMEOUT = config.getint('DEFAULT', 'llm_fallback_timeout', fallback=10)
CLASSIFICATION_PROMPT = """ CLASSIFICATION_PROMPT = """
Please classify this YouTube video based on its title and thumbnail. Please classify this YouTube video based on its title and thumbnail.
@@ -96,6 +112,51 @@ quit = False
driver = None # Global driver variable driver = None # Global driver variable
def log(message, style="", emoji="", add_newline_before=False):
"""Enhanced logging function with rich formatting."""
if emoji:
width = 1
if (emoji == "⚠️") or (emoji == "⏱️") or (emoji == "🏷️") or (emoji == "🗣️") or (emoji == "🗑️") or (emoji == "⌨️"):
width = 2
pad = " " * width
formatted_message = f"{emoji}{pad}{message.strip()}"
else:
formatted_message = message.strip()
if add_newline_before:
console.print()
if style:
console.print(formatted_message, style=style)
else:
console.print(formatted_message)
def log_success(message, emoji="", add_newline_before=False):
"""Log success messages in green."""
log(message, style="bold green", emoji=emoji, add_newline_before=add_newline_before)
def log_warning(message, emoji="⚠️", add_newline_before=False):
"""Log warning messages in yellow."""
log(message, style="bold yellow", emoji=emoji, add_newline_before=add_newline_before)
def log_error(message, emoji=""):
"""Log error messages in red."""
log(message, style="bold red", emoji=emoji)
def log_info(message, emoji="", add_newline_before=False):
"""Log info messages in blue."""
log(message, style="bold blue", emoji=emoji, add_newline_before=add_newline_before)
def log_process(message, emoji="🔄", add_newline_before=False):
"""Log process messages in cyan."""
log(message, style="bold cyan", emoji=emoji, add_newline_before=add_newline_before)
def get_chrome_binary_path(): def get_chrome_binary_path():
"""Get the Chrome binary path based on the operating system.""" """Get the Chrome binary path based on the operating system."""
system = platform.system().lower() system = platform.system().lower()
@@ -193,9 +254,9 @@ def init_browser():
chrome_binary = get_chrome_binary_path() chrome_binary = get_chrome_binary_path()
if chrome_binary: if chrome_binary:
chrome_options.binary_location = chrome_binary chrome_options.binary_location = chrome_binary
print(f"Using Chrome binary: {chrome_binary}") log_info(f"Using Chrome binary: {chrome_binary}", "🔧")
else: else:
print("Warning: Could not find Chrome binary, using system default") log_warning("Could not find Chrome binary, using system default")
# Initialize the driver with various fallback options # Initialize the driver with various fallback options
driver_initialized = False driver_initialized = False
@@ -209,10 +270,9 @@ def init_browser():
driver = webdriver.Chrome( driver = webdriver.Chrome(
service=service, options=chrome_options) service=service, options=chrome_options)
driver_initialized = True driver_initialized = True
print( log_success(f"Initialized with system chromedriver: {chromedriver_path}")
f"Initialized with system chromedriver: {chromedriver_path}")
except Exception as e: except Exception as e:
print(f"System chromedriver failed: {e}") log_error(f"System chromedriver failed: {e}")
# Try 2: WebDriverManager with explicit OS detection # Try 2: WebDriverManager with explicit OS detection
if not driver_initialized: if not driver_initialized:
@@ -224,8 +284,7 @@ def init_browser():
# Clear any cached drivers that might have wrong architecture # Clear any cached drivers that might have wrong architecture
wdm_cache_dir = os.path.expanduser("~/.wdm") wdm_cache_dir = os.path.expanduser("~/.wdm")
if os.path.exists(wdm_cache_dir): if os.path.exists(wdm_cache_dir):
print( log_process("Clearing WebDriverManager cache to avoid architecture conflicts...", "🧹")
"Clearing WebDriverManager cache to avoid architecture conflicts...")
shutil.rmtree(wdm_cache_dir, ignore_errors=True) shutil.rmtree(wdm_cache_dir, ignore_errors=True)
# Initialize ChromeDriverManager with explicit OS detection # Initialize ChromeDriverManager with explicit OS detection
@@ -237,18 +296,18 @@ def init_browser():
driver = webdriver.Chrome( driver = webdriver.Chrome(
service=service, options=chrome_options) service=service, options=chrome_options)
driver_initialized = True driver_initialized = True
print("Initialized with WebDriverManager") log_success("Initialized with WebDriverManager")
except Exception as e: except Exception as e:
print(f"WebDriverManager failed: {e}") log_error(f"WebDriverManager failed: {e}")
# Try 3: Default Chrome without specifying service # Try 3: Default Chrome without specifying service
if not driver_initialized: if not driver_initialized:
try: try:
driver = webdriver.Chrome(options=chrome_options) driver = webdriver.Chrome(options=chrome_options)
driver_initialized = True driver_initialized = True
print("Initialized with default Chrome") log_success("Initialized with default Chrome")
except Exception as e: except Exception as e:
print(f"Default Chrome initialization failed: {e}") log_error(f"Default Chrome initialization failed: {e}")
# Try 4: Fallback with minimal options (still visible) # Try 4: Fallback with minimal options (still visible)
if not driver_initialized: if not driver_initialized:
@@ -263,9 +322,9 @@ def init_browser():
# Try without custom binary location # Try without custom binary location
driver = webdriver.Chrome(options=chrome_options_fallback) driver = webdriver.Chrome(options=chrome_options_fallback)
driver_initialized = True driver_initialized = True
print("Initialized with fallback Chrome options") log_success("Initialized with fallback Chrome options")
except Exception as e: except Exception as e:
print(f"Fallback Chrome initialization failed: {e}") log_error(f"Fallback Chrome initialization failed: {e}")
# Try 5: Last resort - use system command to find chromedriver # Try 5: Last resort - use system command to find chromedriver
if not driver_initialized: if not driver_initialized:
@@ -284,13 +343,12 @@ def init_browser():
driver = webdriver.Chrome( driver = webdriver.Chrome(
service=service, options=chrome_options_minimal) service=service, options=chrome_options_minimal)
driver_initialized = True driver_initialized = True
print( log_success(f"Initialized with system-found chromedriver: {chromedriver_path}")
f"Initialized with system-found chromedriver: {chromedriver_path}")
except Exception as e: except Exception as e:
print(f"System command fallback failed: {e}") log_error(f"System command fallback failed: {e}")
if not driver_initialized: if not driver_initialized:
print("All browser initialization attempts failed") log_error("All browser initialization attempts failed")
return False return False
# Remove automation indicators # Remove automation indicators
@@ -300,25 +358,31 @@ def init_browser():
except BaseException: except BaseException:
pass # Ignore if this fails pass # Ignore if this fails
print("Browser initialized successfully!") log_success("Browser initialized successfully!", "🎉")
# Navigate to the playlist URL # Navigate to the playlist URL
driver.get(playlist_url) driver.get(playlist_url)
log_info(f"Current browser navigated to: {playlist_url}", "🌐")
print("\n" + "=" * 60) # Create a beautiful setup panel
print("SETUP INSTRUCTIONS:") setup_panel = Panel.fit(
print("1. The browser should now be visible on your screen") """[bold cyan]SETUP INSTRUCTIONS:[/bold cyan]
print("2. Please log in to your YouTube account in the browser")
print("3. Navigate to your playlist if not already there")
print("4. Ensure the playlist URL in config.ini is correct")
print("5. The script will process videos automatically")
print("6. Press 'q' to quit at any time")
print("7. If you're in a dev container, use the command below to open the browser:")
print(f' "$BROWSER" {playlist_url}')
input("Press Enter to continue after logging in...")
print("=" * 60)
print(f"Current browser navigated to: {playlist_url}") 1. The browser should now be visible on your screen
2. If not already logged in, please log in to your YouTube account
3. Navigate to your playlist if not already there
4. Ensure the playlist URL in config.ini is correct
5. The script will process videos automatically
6. Press 'q' to quit at any time""",
title="[bold magenta]🚀 YouTube Video Classifier Setup[/bold magenta]",
border_style="bright_blue"
)
console.print()
console.print(setup_panel)
console.print()
input("Press Enter to continue after confirming login status...")
# Wait a moment for any redirects # Wait a moment for any redirects
time.sleep(5) time.sleep(5)
@@ -326,10 +390,115 @@ def init_browser():
return True return True
except Exception as e: except Exception as e:
print(f"Error initializing browser: {e}") log_error(f"Error initializing browser: {e}")
return False return False
def check_ollama_models():
"""Check if required Ollama models are available."""
try:
response = requests.get(f'{OLLAMA_HOST}/api/tags', timeout=5)
if response.status_code == 200:
models = response.json()
available_models = [model['name'] for model in models.get('models', [])]
primary_available = any(OLLAMA_MODEL in model for model in available_models)
fallback_available = any(OLLAMA_FALLBACK_MODEL in model for model in available_models)
log_info(f"Available models: {available_models}", "🤖")
if not primary_available:
log_warning(f"Primary model '{OLLAMA_MODEL}' not found!")
log_info(f"Run: ollama pull {OLLAMA_MODEL}", "💡")
if not fallback_available:
log_warning(f"Fallback model '{OLLAMA_FALLBACK_MODEL}' not found!")
log_info(f"Run: ollama pull {OLLAMA_FALLBACK_MODEL}", "💡")
if not primary_available and not fallback_available:
log_error("No required models available! Please install at least one.")
return False
return True
else:
log_error(f"Cannot connect to Ollama at {OLLAMA_HOST}")
return False
except Exception as e:
log_error(f"Error checking Ollama models: {e}")
return False
def call_ollama_with_fallback(prompt, image_data, purpose="processing"):
"""
Call Ollama with timeout and fallback support using continuous loop.
Args:
prompt: The text prompt to send
image_data: Base64 encoded image data
purpose: Description of what this call is for (for logging)
Returns:
str: The response from the LLM, or empty string if manually cancelled
"""
models = [OLLAMA_MODEL, OLLAMA_FALLBACK_MODEL]
base_timeouts = [LLM_PRIMARY_TIMEOUT, LLM_FALLBACK_TIMEOUT]
attempt = 0
while not quit: # Continue until we get a response or user quits
for i, model in enumerate(models):
attempt += 1
# Calculate increasing timeout: base + (10 * number of complete cycles)
cycles_completed = (attempt - 1) // len(models)
current_timeout = base_timeouts[i] + (10 * cycles_completed)
try:
log_process(f"Attempt {attempt}: Trying {model} for {purpose} (timeout: {current_timeout}s)", "🔄")
response = requests.post(
f'{OLLAMA_HOST}/api/generate',
json={
'model': model,
'prompt': prompt,
'images': [image_data],
'stream': False
},
timeout=current_timeout
)
if response.status_code == 200:
result = response.json()
response_text = result['response'].strip()
if response_text: # Make sure we got actual content
log_success(f"Response received from {model} on attempt {attempt}", "")
return response_text
else:
log_warning(f"{model} returned empty response")
continue
else:
log_warning(f"{model} returned status {response.status_code}")
except requests.exceptions.Timeout:
log_warning(f"{model} timed out after {current_timeout}s")
except requests.exceptions.ConnectionError:
log_error(f"Cannot connect to {model}")
# Wait a bit before trying again if connection failed
time.sleep(2)
except Exception as e:
log_error(f"Error calling {model}: {e}")
# After trying both models in this cycle
if attempt >= 2: # After first complete cycle
log_info(f"Completed cycle {cycles_completed + 1}, increasing timeouts by 10s for next cycle", "🔄")
# Small delay between cycles to prevent overwhelming the server
time.sleep(1)
log_warning(f"LLM calls cancelled by user for {purpose}")
return ""
def detect_video_language(video_title, thumbnail_path): def detect_video_language(video_title, thumbnail_path):
"""Use Ollama to detect the language of the video.""" """Use Ollama to detect the language of the video."""
try: try:
@@ -340,32 +509,18 @@ def detect_video_language(video_title, thumbnail_path):
# Prepare the prompt # Prepare the prompt
prompt = LANGUAGE_DETECTION_PROMPT.format(video_title=video_title) prompt = LANGUAGE_DETECTION_PROMPT.format(video_title=video_title)
# Make request to Ollama # Call with fallback support
response = requests.post( language = call_ollama_with_fallback(prompt, image_data, "language detection")
f'{OLLAMA_HOST}/api/generate',
json={
'model': OLLAMA_MODEL,
'prompt': prompt,
'images': [image_data],
'stream': False
}
)
if response.status_code == 200: return language if language else "Unknown"
result = response.json()
language = result['response'].strip()
return language
else:
print(f"Error from Ollama for language detection: {response.status_code}")
return "Unknown"
except Exception as e: except Exception as e:
print(f"Error detecting language: {e}") log_error(f"Error detecting language: {e}")
return "Unknown" return "Unknown"
def extract_channel_name(video_element): def extract_channel_name(video_element):
"""Extract the channel name from the video element.""" """Extract the channel name and link from the video element."""
try: try:
channel_selectors = [ channel_selectors = [
"#text > a", "#text > a",
@@ -381,7 +536,12 @@ def extract_channel_name(video_element):
if "yt-simple-endpoint" in classes and "style-scope" in classes and "yt-formatted-string" in classes: if "yt-simple-endpoint" in classes and "style-scope" in classes and "yt-formatted-string" in classes:
channel_name = channel_element.text.strip() channel_name = channel_element.text.strip()
if channel_name: if channel_name:
return channel_name # Get the resolved absolute URL
channel_link = channel_element.get_attribute("href")
# If it's a relative URL, resolve it
if channel_link and channel_link.startswith("/"):
channel_link = f"https://www.youtube.com{channel_link}"
return channel_name, channel_link or "Unknown Channel Link"
except BaseException: except BaseException:
continue continue
@@ -391,15 +551,20 @@ def extract_channel_name(video_element):
channel_element = video_element.find_element(By.CSS_SELECTOR, selector) channel_element = video_element.find_element(By.CSS_SELECTOR, selector)
channel_name = channel_element.text.strip() channel_name = channel_element.text.strip()
if channel_name: if channel_name:
return channel_name # Get the resolved absolute URL
channel_link = channel_element.get_attribute("href")
# If it's a relative URL, resolve it
if channel_link and channel_link.startswith("/"):
channel_link = f"https://www.youtube.com{channel_link}"
return channel_name, channel_link or "Unknown Channel Link"
except BaseException: except BaseException:
continue continue
return "Unknown Channel" return "Unknown Channel", "Unknown Channel Link"
except Exception as e: except Exception as e:
print(f"Error extracting channel name: {e}") log_error(f"Error extracting channel name: {e}")
return "Unknown Channel" return "Unknown Channel", "Unknown Channel Link"
def extract_video_length(video_element): def extract_video_length(video_element):
@@ -456,7 +621,7 @@ def extract_video_length(video_element):
return 0 # Default if no length found return 0 # Default if no length found
except Exception as e: except Exception as e:
print(f"Error extracting video length: {e}") log_error(f"Error extracting video length: {e}")
return 0 return 0
@@ -495,7 +660,7 @@ def extract_video_date(video_element):
return "Unknown Date" return "Unknown Date"
except Exception as e: except Exception as e:
print(f"Error extracting video date: {e}") log_error(f"Error extracting video date: {e}")
return "Unknown Date" return "Unknown Date"
@@ -538,7 +703,7 @@ def parse_natural_date(date_text):
return current_time return current_time
except Exception as e: except Exception as e:
print(f"Error parsing natural date: {e}") log_error(f"Error parsing natural date: {e}")
return datetime.now() return datetime.now()
@@ -568,7 +733,7 @@ def get_video_info_web():
continue continue
if not video_element: if not video_element:
print("Could not find video element in playlist") log_error("Could not find video element in playlist")
return None, None, None, None, None, None, None return None, None, None, None, None, None, None
# Extract video title # Extract video title
@@ -593,19 +758,20 @@ def get_video_info_web():
if not video_title: if not video_title:
video_title = f"Video_{int(time.time())}" video_title = f"Video_{int(time.time())}"
print(f"Extracted video title: {video_title}") log_info(f"Extracted video title: {video_title}", "📝", True)
# Extract channel name # Extract channel name and link
channel_name = extract_channel_name(video_element) channel_name, channel_link = extract_channel_name(video_element)
print(f"Extracted channel name: {channel_name}") log_info(f"Extracted channel name: {channel_name}", "👤")
log_info(f"Extracted channel link: {channel_link}", "🔗")
# Extract video length # Extract video length
video_length = extract_video_length(video_element) video_length = extract_video_length(video_element)
print(f"Extracted video length: {video_length} seconds") log_info(f"Extracted video length: {video_length} seconds", "⏱️")
# Extract video date # Extract video date
video_date = extract_video_date(video_element) video_date = extract_video_date(video_element)
print(f"Extracted video date: {video_date}") log_info(f"Extracted video date: {video_date}", "📅")
# Get video URL using share functionality # Get video URL using share functionality
video_url = None video_url = None
@@ -649,7 +815,7 @@ def get_video_info_web():
continue continue
except Exception as e: except Exception as e:
print(f"Error getting URL from share modal: {e}") log_error(f"Error getting URL from share modal: {e}")
# Close the share modal # Close the share modal
try: try:
@@ -674,7 +840,7 @@ def get_video_info_web():
time.sleep(0.5) time.sleep(0.5)
except Exception as e: except Exception as e:
print(f"Error extracting video URL: {e}") log_error(f"Error extracting video URL: {e}")
# Fallback: try to get href from video title link # Fallback: try to get href from video title link
try: try:
title_link = video_element.find_element( title_link = video_element.find_element(
@@ -685,7 +851,7 @@ def get_video_info_web():
except BaseException: except BaseException:
video_url = f"https://youtube.com/watch?v=unknown_{int(time.time())}" video_url = f"https://youtube.com/watch?v=unknown_{int(time.time())}"
print(f"Extracted video URL: {video_url}") log_info(f"Extracted video URL: {video_url}", "🔗")
# Take screenshot of video thumbnail # Take screenshot of video thumbnail
try: try:
@@ -715,7 +881,7 @@ def get_video_info_web():
video_element.screenshot(thumbnail_path) video_element.screenshot(thumbnail_path)
except Exception as e: except Exception as e:
print(f"Error taking thumbnail screenshot: {e}") log_error(f"Error taking thumbnail screenshot: {e}")
# Create a placeholder image # Create a placeholder image
try: try:
img = Image.new('RGB', (320, 180), color='lightgray') img = Image.new('RGB', (320, 180), color='lightgray')
@@ -723,11 +889,11 @@ def get_video_info_web():
except BaseException: except BaseException:
thumbnail_path = None thumbnail_path = None
return video_title, video_url, thumbnail_path, channel_name, video_length, video_date return video_title, video_url, thumbnail_path, channel_name, channel_link, video_length, video_date
except Exception as e: except Exception as e:
print(f"Error extracting video info: {e}") log_error(f"Error extracting video info: {e}")
return None, None, None, None, None, None return None, None, None, None, None, None, None
def navigate_to_next_video(): def navigate_to_next_video():
@@ -745,11 +911,11 @@ def navigate_to_next_video():
(By.CSS_SELECTOR, "ytd-playlist-video-renderer"))) (By.CSS_SELECTOR, "ytd-playlist-video-renderer")))
return True return True
except BaseException: except BaseException:
print("No more videos in playlist") log_info("No more videos in playlist", "📭")
return False return False
except Exception as e: except Exception as e:
print(f"Error checking for next video: {e}") log_error(f"Error checking for next video: {e}")
return False return False
@@ -785,7 +951,7 @@ def remove_video_from_playlist():
item_text = item.text.strip().lower() item_text = item.text.strip().lower()
if "remove" in item_text or "delete" in item_text: if "remove" in item_text or "delete" in item_text:
driver.execute_script("arguments[0].click();", item) driver.execute_script("arguments[0].click();", item)
print("Video removed from playlist") log_success("Video removed from playlist", "🗑️")
removed = True removed = True
break break
if removed: if removed:
@@ -794,7 +960,7 @@ def remove_video_from_playlist():
continue continue
if not removed: if not removed:
print("Could not find remove button") log_error("Could not find remove button")
# Try to close the menu # Try to close the menu
try: try:
driver.find_element(By.TAG_NAME, "body").send_keys(Keys.ESCAPE) driver.find_element(By.TAG_NAME, "body").send_keys(Keys.ESCAPE)
@@ -806,10 +972,50 @@ def remove_video_from_playlist():
return True return True
except Exception as e: except Exception as e:
print(f"Error removing video: {e}") log_error(f"Error removing video: {e}")
return False return False
# Initialize CSV file if it doesn't exist
def extract_playlist_info():
"""Extract playlist name and URL from the current page."""
global driver
try:
# Extract playlist name
playlist_name = "Unknown Playlist"
playlist_name_selector = "yt-page-header-renderer > yt-page-header-view-model > div.page-header-view-model-wiz__scroll-container > div > div.page-header-view-model-wiz__page-header-headline > div > yt-dynamic-text-view-model > h1 > span"
try:
playlist_name_element = driver.find_element(By.CSS_SELECTOR, playlist_name_selector)
playlist_name = playlist_name_element.text.strip()
except BaseException:
# Try alternative selectors
alt_selectors = [
"h1 span",
".page-header-headline h1 span",
"yt-dynamic-text-view-model h1 span",
"#page-header h1 span"
]
for selector in alt_selectors:
try:
playlist_name_element = driver.find_element(By.CSS_SELECTOR, selector)
playlist_name = playlist_name_element.text.strip()
if playlist_name:
break
except BaseException:
continue
# Get current playlist URL
playlist_link = driver.current_url
log_info(f"Playlist name: {playlist_name}", "📋")
log_info(f"Playlist link: {playlist_link}", "🔗")
return playlist_name, playlist_link
except Exception as e:
log_error(f"Error extracting playlist info: {e}")
return "Unknown Playlist", driver.current_url if driver else "Unknown URL"
def init_csv(): def init_csv():
@@ -818,8 +1024,9 @@ def init_csv():
with open(classifications_csv, 'w', newline='', encoding='utf-8') as file: with open(classifications_csv, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file) writer = csv.writer(file)
writer.writerow(['video_title', 'video_url', 'thumbnail_url', 'classification', writer.writerow(['video_title', 'video_url', 'thumbnail_url', 'classification',
'language', 'channel_name', 'video_length_seconds', 'video_date', 'detailed_subtags', 'image_data', 'timestamp']) 'language', 'channel_name', 'channel_link', 'video_length_seconds', 'video_date',
print(f"Created {classifications_csv}") 'detailed_subtags', 'playlist_name', 'playlist_link', 'image_data', 'timestamp'])
log_success(f"Created {classifications_csv}", "📊")
def load_existing_classifications(): def load_existing_classifications():
@@ -831,21 +1038,21 @@ def load_existing_classifications():
) if not df.empty else set() ) if not df.empty else set()
return set() return set()
except Exception as e: except Exception as e:
print(f"Error loading classifications: {e}") log_error(f"Error loading classifications: {e}")
return set() return set()
def save_classification(video_title, video_url, thumbnail_url, classification, language, channel_name, video_length, video_date, detailed_subtags, image_data): def save_classification(video_title, video_url, thumbnail_url, classification, language, channel_name, channel_link, video_length, video_date, detailed_subtags, playlist_name, playlist_link, image_data):
"""Save a video classification to CSV.""" """Save a video classification to CSV."""
try: try:
with open(classifications_csv, 'a', newline='', encoding='utf-8') as file: with open(classifications_csv, 'a', newline='', encoding='utf-8') as file:
writer = csv.writer(file) writer = csv.writer(file)
writer.writerow([video_title, video_url, thumbnail_url, classification, writer.writerow([video_title, video_url, thumbnail_url, classification,
language, channel_name, video_length, video_date, detailed_subtags, image_data, language, channel_name, channel_link, video_length, video_date, detailed_subtags,
time.strftime('%Y-%m-%d %H:%M:%S')]) playlist_name, playlist_link, image_data, time.strftime('%Y-%m-%d %H:%M:%S')])
print(f"Saved classification: {video_title} -> {classification}") log_success(f"Saved classification: {video_title} -> {classification}", "💾")
except Exception as e: except Exception as e:
print(f"Error saving classification: {e}") log_error(f"Error saving classification: {e}")
def get_video_info(): def get_video_info():
@@ -872,27 +1079,13 @@ def classify_video_with_ollama(
video_title=video_title, video_title=video_title,
existing_categories=existing_cats) existing_categories=existing_cats)
# Make request to Ollama # Call with fallback support
response = requests.post( classification = call_ollama_with_fallback(prompt, image_data, "video classification")
f'{OLLAMA_HOST}/api/generate',
json={
'model': OLLAMA_MODEL,
'prompt': prompt,
'images': [image_data],
'stream': False
}
)
if response.status_code == 200: return classification if classification else "Uncategorized"
result = response.json()
classification = result['response'].strip()
return classification
else:
print(f"Error from Ollama: {response.status_code}")
return "Uncategorized"
except Exception as e: except Exception as e:
print(f"Error classifying video: {e}") log_error(f"Error classifying video: {e}")
return "Uncategorized" return "Uncategorized"
@@ -909,43 +1102,19 @@ def generate_detailed_subtags(video_title, thumbnail_path, classification):
classification=classification classification=classification
) )
# Make request to Ollama # Call with fallback support
response = requests.post( subtags = call_ollama_with_fallback(prompt, image_data, "sub-tags generation")
f'{OLLAMA_HOST}/api/generate',
json={
'model': OLLAMA_MODEL,
'prompt': prompt,
'images': [image_data],
'stream': False
}
)
if response.status_code == 200: return subtags if subtags else ""
result = response.json()
subtags = result['response'].strip()
return subtags
else:
print(f"Error from Ollama for sub-tags generation: {response.status_code}")
return ""
except Exception as e: except Exception as e:
print(f"Error generating sub-tags: {e}") log_error(f"Error generating sub-tags: {e}")
return "" return ""
def create_playlist_and_add_video(classification, video_url): def create_playlist_and_add_video(classification, video_url):
"""Create a playlist based on classification and add video to it.""" """Create a playlist based on classification and add video to it."""
# This functionality would need to be implemented with Selenium log_process(f"[TODO] Would add video to playlist: {classification}", "📋")
# For now, it's commented out
"""
try:
# Use Selenium to navigate to YouTube playlist creation
# This would require additional implementation
print(f"Would create/add to playlist: {classification}")
except Exception as e:
print(f"Error creating playlist/adding video: {e}")
"""
print(f"[TODO] Would add video to playlist: {classification}")
def delete_video(): def delete_video():
@@ -961,7 +1130,7 @@ def on_press_to_quit(key):
global quit global quit
try: try:
if key.char == 'q': if key.char == 'q':
print('Closing program...') log_warning('Closing program...', "🛑")
quit = True quit = True
except AttributeError: except AttributeError:
pass pass
@@ -973,7 +1142,7 @@ def cleanup_browser():
if driver: if driver:
try: try:
driver.quit() driver.quit()
print("Browser closed.") log_info("Browser closed.", "🚪")
except BaseException: except BaseException:
pass pass
@@ -981,16 +1150,27 @@ def cleanup_browser():
# Updated main execution logic # Updated main execution logic
if __name__ == '__main__': if __name__ == '__main__':
try: try:
# Create a beautiful title
title = Text("🎬 YouTube Video Classifier", style="bold magenta")
console.print(Panel.fit(title, border_style="bright_magenta"))
console.print()
# Check Ollama models before starting
log_info("Checking Ollama models availability...", "🔍")
if not check_ollama_models():
log_error("Required models not available. Please install them first.")
sys.exit(1)
# Initialize CSV file # Initialize CSV file
init_csv() init_csv()
# Load existing classifications # Load existing classifications
existing_classifications = load_existing_classifications() existing_classifications = load_existing_classifications()
print(f"Loaded {len(existing_classifications)} existing classifications: {existing_classifications}") log_info(f"Loaded {len(existing_classifications)} existing classifications: {existing_classifications}", "📚")
# Initialize browser # Initialize browser
if not init_browser(): if not init_browser():
print("Failed to initialize browser. Exiting.") log_error("Failed to initialize browser. Exiting.")
sys.exit(1) sys.exit(1)
counter = 0 counter = 0
@@ -999,19 +1179,32 @@ if __name__ == '__main__':
listener = kb.Listener(on_press=on_press_to_quit) listener = kb.Listener(on_press=on_press_to_quit)
listener.start() listener.start()
print("\nStarting video processing...") log_success("Starting video processing...", "🚀", True)
print("Press 'q' to quit at any time.") log_info("Press 'q' to quit at any time.", "⌨️")
log_info(f"Primary model: {OLLAMA_MODEL} (timeout: {LLM_PRIMARY_TIMEOUT}s)", "🤖")
log_info(f"Fallback model: {OLLAMA_FALLBACK_MODEL} (timeout: {LLM_FALLBACK_TIMEOUT}s)", "🤖")
playlist_name = "Unknown Playlist"
playlist_link = "Unknown URL"
while not quit: while not quit:
try: try:
# Extract playlist information once per session
if counter == 0:
playlist_name, playlist_link = extract_playlist_info()
console.print() # Add spacing after playlist info
# Extract video information using web scraping # Extract video information using web scraping
video_data = get_video_info() video_data = get_video_info()
if len(video_data) == 6: # New format with all data if len(video_data) == 7: # New format with all data including channel_link
video_title, video_url, thumbnail_path, channel_name, channel_link, video_length, video_date = video_data
elif len(video_data) == 6: # Old format with channel_name but no channel_link
video_title, video_url, thumbnail_path, channel_name, video_length, video_date = video_data video_title, video_url, thumbnail_path, channel_name, video_length, video_date = video_data
channel_link = "Unknown Channel Link"
else: # Fallback to old format else: # Fallback to old format
video_title, video_url, thumbnail_path = video_data[:3] video_title, video_url, thumbnail_path = video_data[:3]
channel_name, video_length, video_date = "Unknown Channel", 0, "Unknown Date" channel_name, channel_link, video_length, video_date = "Unknown Channel", "Unknown Channel Link", 0, "Unknown Date"
if video_title and video_url and thumbnail_path: if video_title and video_url and thumbnail_path:
# Convert thumbnail to base64 for storage # Convert thumbnail to base64 for storage
@@ -1019,39 +1212,41 @@ if __name__ == '__main__':
try: try:
with open(thumbnail_path, "rb") as image_file: with open(thumbnail_path, "rb") as image_file:
image_data = base64.b64encode(image_file.read()).decode('utf-8') image_data = base64.b64encode(image_file.read()).decode('utf-8')
print(f"Thumbnail converted to base64 ({len(image_data)} characters)") log_process(f"Thumbnail converted to base64 ({len(image_data)} characters)", "📸", True)
except Exception as e: except Exception as e:
print(f"Error converting thumbnail to base64: {e}") log_error(f"Error converting thumbnail to base64: {e}")
image_data = "" image_data = ""
# Detect language using Ollama # Detect language using Ollama with fallback
print(f"\nDetecting language for video: {video_title}") log_process(f"Detecting language for video: {video_title}", "🌐", True)
language = detect_video_language(video_title, thumbnail_path) language = detect_video_language(video_title, thumbnail_path)
print(f"Language detected: {language}") log_success(f"Language detected: {language}", "🗣️")
# Classify video using Ollama # Classify video using Ollama with fallback
print(f"Classifying video: {video_title}") log_process(f"Classifying video: {video_title}", "🏷️", True)
classification = classify_video_with_ollama( classification = classify_video_with_ollama(
video_title, thumbnail_path, existing_classifications) video_title, thumbnail_path, existing_classifications)
print(f"Classification result: {classification}") log_success(f"Classification result: {classification}", "📂")
# Let's be gentle with the API # Let's be gentle with the API
time.sleep(1) time.sleep(1)
# Generate detailed sub-tags using Ollama # Generate detailed sub-tags using Ollama with fallback
print(f"Generating detailed sub-tags for video: {video_title}") log_process(f"Generating detailed sub-tags for video: {video_title}", "🏷️", True)
detailed_subtags = generate_detailed_subtags(video_title, thumbnail_path, classification) detailed_subtags = generate_detailed_subtags(video_title, thumbnail_path, classification)
print(f"Detailed sub-tags: {detailed_subtags}") log_success(f"Detailed sub-tags: {detailed_subtags}", "🔖")
# Save classification to CSV with new data including image data # Save classification to CSV with new data including channel link
save_classification( save_classification(
video_title, video_url, thumbnail_path, classification, video_title, video_url, thumbnail_path, classification,
language, channel_name, video_length, video_date, detailed_subtags, image_data) language, channel_name, channel_link, video_length, video_date, detailed_subtags,
playlist_name, playlist_link, image_data)
# Update existing classifications set # Update existing classifications set
existing_classifications.add(classification) existing_classifications.add(classification)
# Create playlist and add video (commented for testing) # Create playlist and add video (commented for testing)
log_process("Actions with playlist:", "📋", True)
create_playlist_and_add_video(classification, video_url) create_playlist_and_add_video(classification, video_url)
# Remove video from current playlist # Remove video from current playlist
@@ -1059,19 +1254,19 @@ if __name__ == '__main__':
if remove_success: if remove_success:
counter += 1 counter += 1
print(f"Processed {counter} videos.") log_success(f"Processed {counter} videos.", "", True)
# Navigate to next video # Navigate to next video
if not navigate_to_next_video(): if not navigate_to_next_video():
print("No more videos to process.") log_info("No more videos to process.", "🏁")
break break
else: else:
# If can't remove, try to navigate to next video anyway # If can't remove, try to navigate to next video anyway
if not navigate_to_next_video(): if not navigate_to_next_video():
print("Could not navigate to next video.") log_error("Could not navigate to next video.")
break break
counter += 1 counter += 1
print(f"Processed {counter} videos (removal failed).") log_warning(f"Processed {counter} videos (removal failed).", add_newline_before=True)
# Clean up temporary thumbnail # Clean up temporary thumbnail
if os.path.exists(thumbnail_path): if os.path.exists(thumbnail_path):
@@ -1080,24 +1275,32 @@ if __name__ == '__main__':
time.sleep(2) # Brief pause between videos time.sleep(2) # Brief pause between videos
else: else:
print("Could not extract video information") log_error("Could not extract video information")
# Try to navigate to next video anyway # Try to navigate to next video anyway
if not navigate_to_next_video(): if not navigate_to_next_video():
break break
except Exception as e: except Exception as e:
print(f"Error in main processing loop: {e}") log_error(f"Error in main processing loop: {e}")
# Try to continue with next video # Try to continue with next video
if not navigate_to_next_video(): if not navigate_to_next_video():
break break
listener.stop() listener.stop()
cleanup_browser() cleanup_browser()
print("Script finished.")
# Final summary
completion_panel = Panel.fit(
f"[bold green]✨ Script completed successfully![/bold green]\n[cyan]Total videos processed: {counter}[/cyan]",
title="[bold blue]🎯 Completion Summary[/bold blue]",
border_style="bright_green"
)
console.print()
console.print(completion_panel)
except KeyboardInterrupt: except KeyboardInterrupt:
print("\nScript interrupted by user.") log_warning("Script interrupted by user.", add_newline_before=True)
cleanup_browser() cleanup_browser()
except Exception as e: except Exception as e:
print(f"Unexpected error: {e}") log_error(f"Unexpected error: {e}", "💥")
cleanup_browser() cleanup_browser()