From 89314f9c7459c3d004a2edf9ddc5c3da6e7b188c Mon Sep 17 00:00:00 2001 From: Francisco Pessano Date: Sat, 12 Jul 2025 03:03:38 +0000 Subject: [PATCH] Enhance logging and configuration for YouTube Video Classifier - Added fallback model configuration in config.ini - Updated requirements.txt to include rich for enhanced logging - Refactored script.py to implement rich logging for better visibility - Modified functions to include channel link extraction and updated CSV saving logic - Improved error handling and user feedback throughout the script --- config.ini | 5 + requirements.txt | 1 + script.py | 539 ++++++++++++++++++++++++++++++++--------------- 3 files changed, 377 insertions(+), 168 deletions(-) diff --git a/config.ini b/config.ini index ad9ea5e..1d01a9f 100644 --- a/config.ini +++ b/config.ini @@ -4,6 +4,7 @@ # Ollama settings ollama_host = http://localhost:11434 ollama_model = qwen2.5vl:7b +ollama_fallback_model = gemma2:2b # File paths classifications_csv = video_classifications.csv @@ -22,4 +23,8 @@ restart_tab_frequency = 90 enable_delete = false enable_playlist_creation = false +# LLM timeout settings (in seconds) +llm_primary_timeout = 60 +llm_fallback_timeout = 60 + diff --git a/requirements.txt b/requirements.txt index cc1d7d1..69add1c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ pyperclip==1.8.2 pytesseract==0.3.10 selenium==4.15.2 webdriver-manager==4.0.1 +rich==13.8.0 diff --git a/script.py b/script.py index 4f363be..52af106 100644 --- a/script.py +++ b/script.py @@ -24,6 +24,17 @@ from selenium.webdriver.chrome.service import Service from selenium.common.exceptions import TimeoutException, NoSuchElementException from selenium.webdriver.common.keys import Keys +# Rich logging imports +from rich.console import Console +from rich import print as rprint +from rich.panel import Panel +from rich.text import Text +from rich.progress import Progress, SpinnerColumn, TextColumn +from rich.table import Table + +# Initialize rich console +console = Console() + # Load configuration config = configparser.ConfigParser() config.read('config.ini') @@ -34,6 +45,11 @@ OLLAMA_HOST = config.get( 'ollama_host', fallback='http://localhost:11434') OLLAMA_MODEL = config.get('DEFAULT', 'ollama_model', fallback='qwen2.5vl:7b') +OLLAMA_FALLBACK_MODEL = config.get('DEFAULT', 'ollama_fallback_model', fallback='gemma2:2b') + +# Timeout settings +LLM_PRIMARY_TIMEOUT = config.getint('DEFAULT', 'llm_primary_timeout', fallback=60) +LLM_FALLBACK_TIMEOUT = config.getint('DEFAULT', 'llm_fallback_timeout', fallback=10) CLASSIFICATION_PROMPT = """ Please classify this YouTube video based on its title and thumbnail. @@ -96,6 +112,51 @@ quit = False driver = None # Global driver variable +def log(message, style="", emoji="", add_newline_before=False): + """Enhanced logging function with rich formatting.""" + if emoji: + width = 1 + if (emoji == "โš ๏ธ") or (emoji == "โฑ๏ธ") or (emoji == "๐Ÿท๏ธ") or (emoji == "๐Ÿ—ฃ๏ธ") or (emoji == "๐Ÿ—‘๏ธ") or (emoji == "โŒจ๏ธ"): + width = 2 + pad = " " * width + formatted_message = f"{emoji}{pad}{message.strip()}" + else: + formatted_message = message.strip() + + if add_newline_before: + console.print() + + if style: + console.print(formatted_message, style=style) + else: + console.print(formatted_message) + + +def log_success(message, emoji="โœ…", add_newline_before=False): + """Log success messages in green.""" + log(message, style="bold green", emoji=emoji, add_newline_before=add_newline_before) + + +def log_warning(message, emoji="โš ๏ธ", add_newline_before=False): + """Log warning messages in yellow.""" + log(message, style="bold yellow", emoji=emoji, add_newline_before=add_newline_before) + + +def log_error(message, emoji="โŒ"): + """Log error messages in red.""" + log(message, style="bold red", emoji=emoji) + + +def log_info(message, emoji="โ„น๏ธ", add_newline_before=False): + """Log info messages in blue.""" + log(message, style="bold blue", emoji=emoji, add_newline_before=add_newline_before) + + +def log_process(message, emoji="๐Ÿ”„", add_newline_before=False): + """Log process messages in cyan.""" + log(message, style="bold cyan", emoji=emoji, add_newline_before=add_newline_before) + + def get_chrome_binary_path(): """Get the Chrome binary path based on the operating system.""" system = platform.system().lower() @@ -193,9 +254,9 @@ def init_browser(): chrome_binary = get_chrome_binary_path() if chrome_binary: chrome_options.binary_location = chrome_binary - print(f"Using Chrome binary: {chrome_binary}") + log_info(f"Using Chrome binary: {chrome_binary}", "๐Ÿ”ง") else: - print("Warning: Could not find Chrome binary, using system default") + log_warning("Could not find Chrome binary, using system default") # Initialize the driver with various fallback options driver_initialized = False @@ -209,10 +270,9 @@ def init_browser(): driver = webdriver.Chrome( service=service, options=chrome_options) driver_initialized = True - print( - f"Initialized with system chromedriver: {chromedriver_path}") + log_success(f"Initialized with system chromedriver: {chromedriver_path}") except Exception as e: - print(f"System chromedriver failed: {e}") + log_error(f"System chromedriver failed: {e}") # Try 2: WebDriverManager with explicit OS detection if not driver_initialized: @@ -224,8 +284,7 @@ def init_browser(): # Clear any cached drivers that might have wrong architecture wdm_cache_dir = os.path.expanduser("~/.wdm") if os.path.exists(wdm_cache_dir): - print( - "Clearing WebDriverManager cache to avoid architecture conflicts...") + log_process("Clearing WebDriverManager cache to avoid architecture conflicts...", "๐Ÿงน") shutil.rmtree(wdm_cache_dir, ignore_errors=True) # Initialize ChromeDriverManager with explicit OS detection @@ -237,18 +296,18 @@ def init_browser(): driver = webdriver.Chrome( service=service, options=chrome_options) driver_initialized = True - print("Initialized with WebDriverManager") + log_success("Initialized with WebDriverManager") except Exception as e: - print(f"WebDriverManager failed: {e}") + log_error(f"WebDriverManager failed: {e}") # Try 3: Default Chrome without specifying service if not driver_initialized: try: driver = webdriver.Chrome(options=chrome_options) driver_initialized = True - print("Initialized with default Chrome") + log_success("Initialized with default Chrome") except Exception as e: - print(f"Default Chrome initialization failed: {e}") + log_error(f"Default Chrome initialization failed: {e}") # Try 4: Fallback with minimal options (still visible) if not driver_initialized: @@ -263,9 +322,9 @@ def init_browser(): # Try without custom binary location driver = webdriver.Chrome(options=chrome_options_fallback) driver_initialized = True - print("Initialized with fallback Chrome options") + log_success("Initialized with fallback Chrome options") except Exception as e: - print(f"Fallback Chrome initialization failed: {e}") + log_error(f"Fallback Chrome initialization failed: {e}") # Try 5: Last resort - use system command to find chromedriver if not driver_initialized: @@ -284,13 +343,12 @@ def init_browser(): driver = webdriver.Chrome( service=service, options=chrome_options_minimal) driver_initialized = True - print( - f"Initialized with system-found chromedriver: {chromedriver_path}") + log_success(f"Initialized with system-found chromedriver: {chromedriver_path}") except Exception as e: - print(f"System command fallback failed: {e}") + log_error(f"System command fallback failed: {e}") if not driver_initialized: - print("All browser initialization attempts failed") + log_error("All browser initialization attempts failed") return False # Remove automation indicators @@ -300,25 +358,31 @@ def init_browser(): except BaseException: pass # Ignore if this fails - print("Browser initialized successfully!") + log_success("Browser initialized successfully!", "๐ŸŽ‰") # Navigate to the playlist URL driver.get(playlist_url) + log_info(f"Current browser navigated to: {playlist_url}", "๐ŸŒ") - print("\n" + "=" * 60) - print("SETUP INSTRUCTIONS:") - print("1. The browser should now be visible on your screen") - print("2. Please log in to your YouTube account in the browser") - print("3. Navigate to your playlist if not already there") - print("4. Ensure the playlist URL in config.ini is correct") - print("5. The script will process videos automatically") - print("6. Press 'q' to quit at any time") - print("7. If you're in a dev container, use the command below to open the browser:") - print(f' "$BROWSER" {playlist_url}') - input("Press Enter to continue after logging in...") - print("=" * 60) + # Create a beautiful setup panel + setup_panel = Panel.fit( + """[bold cyan]SETUP INSTRUCTIONS:[/bold cyan] - print(f"Current browser navigated to: {playlist_url}") +1. The browser should now be visible on your screen +2. If not already logged in, please log in to your YouTube account +3. Navigate to your playlist if not already there +4. Ensure the playlist URL in config.ini is correct +5. The script will process videos automatically +6. Press 'q' to quit at any time""", + title="[bold magenta]๐Ÿš€ YouTube Video Classifier Setup[/bold magenta]", + border_style="bright_blue" + ) + + console.print() + console.print(setup_panel) + console.print() + + input("Press Enter to continue after confirming login status...") # Wait a moment for any redirects time.sleep(5) @@ -326,10 +390,115 @@ def init_browser(): return True except Exception as e: - print(f"Error initializing browser: {e}") + log_error(f"Error initializing browser: {e}") return False +def check_ollama_models(): + """Check if required Ollama models are available.""" + try: + response = requests.get(f'{OLLAMA_HOST}/api/tags', timeout=5) + if response.status_code == 200: + models = response.json() + available_models = [model['name'] for model in models.get('models', [])] + + primary_available = any(OLLAMA_MODEL in model for model in available_models) + fallback_available = any(OLLAMA_FALLBACK_MODEL in model for model in available_models) + + log_info(f"Available models: {available_models}", "๐Ÿค–") + + if not primary_available: + log_warning(f"Primary model '{OLLAMA_MODEL}' not found!") + log_info(f"Run: ollama pull {OLLAMA_MODEL}", "๐Ÿ’ก") + + if not fallback_available: + log_warning(f"Fallback model '{OLLAMA_FALLBACK_MODEL}' not found!") + log_info(f"Run: ollama pull {OLLAMA_FALLBACK_MODEL}", "๐Ÿ’ก") + + if not primary_available and not fallback_available: + log_error("No required models available! Please install at least one.") + return False + + return True + else: + log_error(f"Cannot connect to Ollama at {OLLAMA_HOST}") + return False + except Exception as e: + log_error(f"Error checking Ollama models: {e}") + return False + + +def call_ollama_with_fallback(prompt, image_data, purpose="processing"): + """ + Call Ollama with timeout and fallback support using continuous loop. + + Args: + prompt: The text prompt to send + image_data: Base64 encoded image data + purpose: Description of what this call is for (for logging) + + Returns: + str: The response from the LLM, or empty string if manually cancelled + """ + models = [OLLAMA_MODEL, OLLAMA_FALLBACK_MODEL] + base_timeouts = [LLM_PRIMARY_TIMEOUT, LLM_FALLBACK_TIMEOUT] + + attempt = 0 + while not quit: # Continue until we get a response or user quits + for i, model in enumerate(models): + attempt += 1 + # Calculate increasing timeout: base + (10 * number of complete cycles) + cycles_completed = (attempt - 1) // len(models) + current_timeout = base_timeouts[i] + (10 * cycles_completed) + + try: + log_process(f"Attempt {attempt}: Trying {model} for {purpose} (timeout: {current_timeout}s)", "๐Ÿ”„") + + response = requests.post( + f'{OLLAMA_HOST}/api/generate', + json={ + 'model': model, + 'prompt': prompt, + 'images': [image_data], + 'stream': False + }, + timeout=current_timeout + ) + + if response.status_code == 200: + result = response.json() + response_text = result['response'].strip() + if response_text: # Make sure we got actual content + log_success(f"Response received from {model} on attempt {attempt}", "โœ…") + return response_text + else: + log_warning(f"{model} returned empty response") + continue + else: + log_warning(f"{model} returned status {response.status_code}") + + except requests.exceptions.Timeout: + log_warning(f"{model} timed out after {current_timeout}s") + + except requests.exceptions.ConnectionError: + log_error(f"Cannot connect to {model}") + # Wait a bit before trying again if connection failed + time.sleep(2) + + except Exception as e: + log_error(f"Error calling {model}: {e}") + + # After trying both models in this cycle + if attempt >= 2: # After first complete cycle + log_info(f"Completed cycle {cycles_completed + 1}, increasing timeouts by 10s for next cycle", "๐Ÿ”„") + + # Small delay between cycles to prevent overwhelming the server + time.sleep(1) + + log_warning(f"LLM calls cancelled by user for {purpose}") + return "" + + def detect_video_language(video_title, thumbnail_path): """Use Ollama to detect the language of the video.""" try: @@ -340,32 +509,18 @@ def detect_video_language(video_title, thumbnail_path): # Prepare the prompt prompt = LANGUAGE_DETECTION_PROMPT.format(video_title=video_title) - # Make request to Ollama - response = requests.post( - f'{OLLAMA_HOST}/api/generate', - json={ - 'model': OLLAMA_MODEL, - 'prompt': prompt, - 'images': [image_data], - 'stream': False - } - ) - - if response.status_code == 200: - result = response.json() - language = result['response'].strip() - return language - else: - print(f"Error from Ollama for language detection: {response.status_code}") - return "Unknown" + # Call with fallback support + language = call_ollama_with_fallback(prompt, image_data, "language detection") + + return language if language else "Unknown" except Exception as e: - print(f"Error detecting language: {e}") + log_error(f"Error detecting language: {e}") return "Unknown" def extract_channel_name(video_element): - """Extract the channel name from the video element.""" + """Extract the channel name and link from the video element.""" try: channel_selectors = [ "#text > a", @@ -381,7 +536,12 @@ def extract_channel_name(video_element): if "yt-simple-endpoint" in classes and "style-scope" in classes and "yt-formatted-string" in classes: channel_name = channel_element.text.strip() if channel_name: - return channel_name + # Get the resolved absolute URL + channel_link = channel_element.get_attribute("href") + # If it's a relative URL, resolve it + if channel_link and channel_link.startswith("/"): + channel_link = f"https://www.youtube.com{channel_link}" + return channel_name, channel_link or "Unknown Channel Link" except BaseException: continue @@ -391,15 +551,20 @@ def extract_channel_name(video_element): channel_element = video_element.find_element(By.CSS_SELECTOR, selector) channel_name = channel_element.text.strip() if channel_name: - return channel_name + # Get the resolved absolute URL + channel_link = channel_element.get_attribute("href") + # If it's a relative URL, resolve it + if channel_link and channel_link.startswith("/"): + channel_link = f"https://www.youtube.com{channel_link}" + return channel_name, channel_link or "Unknown Channel Link" except BaseException: continue - return "Unknown Channel" + return "Unknown Channel", "Unknown Channel Link" except Exception as e: - print(f"Error extracting channel name: {e}") - return "Unknown Channel" + log_error(f"Error extracting channel name: {e}") + return "Unknown Channel", "Unknown Channel Link" def extract_video_length(video_element): @@ -456,7 +621,7 @@ def extract_video_length(video_element): return 0 # Default if no length found except Exception as e: - print(f"Error extracting video length: {e}") + log_error(f"Error extracting video length: {e}") return 0 @@ -495,7 +660,7 @@ def extract_video_date(video_element): return "Unknown Date" except Exception as e: - print(f"Error extracting video date: {e}") + log_error(f"Error extracting video date: {e}") return "Unknown Date" @@ -538,7 +703,7 @@ def parse_natural_date(date_text): return current_time except Exception as e: - print(f"Error parsing natural date: {e}") + log_error(f"Error parsing natural date: {e}") return datetime.now() @@ -568,7 +733,7 @@ def get_video_info_web(): continue if not video_element: - print("Could not find video element in playlist") + log_error("Could not find video element in playlist") return None, None, None, None, None, None, None # Extract video title @@ -593,19 +758,20 @@ def get_video_info_web(): if not video_title: video_title = f"Video_{int(time.time())}" - print(f"Extracted video title: {video_title}") + log_info(f"Extracted video title: {video_title}", "๐Ÿ“", True) - # Extract channel name - channel_name = extract_channel_name(video_element) - print(f"Extracted channel name: {channel_name}") + # Extract channel name and link + channel_name, channel_link = extract_channel_name(video_element) + log_info(f"Extracted channel name: {channel_name}", "๐Ÿ‘ค") + log_info(f"Extracted channel link: {channel_link}", "๐Ÿ”—") # Extract video length video_length = extract_video_length(video_element) - print(f"Extracted video length: {video_length} seconds") + log_info(f"Extracted video length: {video_length} seconds", "โฑ๏ธ") # Extract video date video_date = extract_video_date(video_element) - print(f"Extracted video date: {video_date}") + log_info(f"Extracted video date: {video_date}", "๐Ÿ“…") # Get video URL using share functionality video_url = None @@ -649,7 +815,7 @@ def get_video_info_web(): continue except Exception as e: - print(f"Error getting URL from share modal: {e}") + log_error(f"Error getting URL from share modal: {e}") # Close the share modal try: @@ -674,7 +840,7 @@ def get_video_info_web(): time.sleep(0.5) except Exception as e: - print(f"Error extracting video URL: {e}") + log_error(f"Error extracting video URL: {e}") # Fallback: try to get href from video title link try: title_link = video_element.find_element( @@ -685,7 +851,7 @@ def get_video_info_web(): except BaseException: video_url = f"https://youtube.com/watch?v=unknown_{int(time.time())}" - print(f"Extracted video URL: {video_url}") + log_info(f"Extracted video URL: {video_url}", "๐Ÿ”—") # Take screenshot of video thumbnail try: @@ -715,7 +881,7 @@ def get_video_info_web(): video_element.screenshot(thumbnail_path) except Exception as e: - print(f"Error taking thumbnail screenshot: {e}") + log_error(f"Error taking thumbnail screenshot: {e}") # Create a placeholder image try: img = Image.new('RGB', (320, 180), color='lightgray') @@ -723,11 +889,11 @@ def get_video_info_web(): except BaseException: thumbnail_path = None - return video_title, video_url, thumbnail_path, channel_name, video_length, video_date + return video_title, video_url, thumbnail_path, channel_name, channel_link, video_length, video_date except Exception as e: - print(f"Error extracting video info: {e}") - return None, None, None, None, None, None + log_error(f"Error extracting video info: {e}") + return None, None, None, None, None, None, None def navigate_to_next_video(): @@ -745,11 +911,11 @@ def navigate_to_next_video(): (By.CSS_SELECTOR, "ytd-playlist-video-renderer"))) return True except BaseException: - print("No more videos in playlist") + log_info("No more videos in playlist", "๐Ÿ“ญ") return False except Exception as e: - print(f"Error checking for next video: {e}") + log_error(f"Error checking for next video: {e}") return False @@ -785,7 +951,7 @@ def remove_video_from_playlist(): item_text = item.text.strip().lower() if "remove" in item_text or "delete" in item_text: driver.execute_script("arguments[0].click();", item) - print("Video removed from playlist") + log_success("Video removed from playlist", "๐Ÿ—‘๏ธ") removed = True break if removed: @@ -794,7 +960,7 @@ def remove_video_from_playlist(): continue if not removed: - print("Could not find remove button") + log_error("Could not find remove button") # Try to close the menu try: driver.find_element(By.TAG_NAME, "body").send_keys(Keys.ESCAPE) @@ -806,10 +972,50 @@ def remove_video_from_playlist(): return True except Exception as e: - print(f"Error removing video: {e}") + log_error(f"Error removing video: {e}") return False -# Initialize CSV file if it doesn't exist + +def extract_playlist_info(): + """Extract playlist name and URL from the current page.""" + global driver + try: + # Extract playlist name + playlist_name = "Unknown Playlist" + playlist_name_selector = "yt-page-header-renderer > yt-page-header-view-model > div.page-header-view-model-wiz__scroll-container > div > div.page-header-view-model-wiz__page-header-headline > div > yt-dynamic-text-view-model > h1 > span" + + try: + playlist_name_element = driver.find_element(By.CSS_SELECTOR, playlist_name_selector) + playlist_name = playlist_name_element.text.strip() + except BaseException: + # Try alternative selectors + alt_selectors = [ + "h1 span", + ".page-header-headline h1 span", + "yt-dynamic-text-view-model h1 span", + "#page-header h1 span" + ] + + for selector in alt_selectors: + try: + playlist_name_element = driver.find_element(By.CSS_SELECTOR, selector) + playlist_name = playlist_name_element.text.strip() + if playlist_name: + break + except BaseException: + continue + + # Get current playlist URL + playlist_link = driver.current_url + + log_info(f"Playlist name: {playlist_name}", "๐Ÿ“‹") + log_info(f"Playlist link: {playlist_link}", "๐Ÿ”—") + + return playlist_name, playlist_link + + except Exception as e: + log_error(f"Error extracting playlist info: {e}") + return "Unknown Playlist", driver.current_url if driver else "Unknown URL" def init_csv(): @@ -818,8 +1024,9 @@ def init_csv(): with open(classifications_csv, 'w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(['video_title', 'video_url', 'thumbnail_url', 'classification', - 'language', 'channel_name', 'video_length_seconds', 'video_date', 'detailed_subtags', 'image_data', 'timestamp']) - print(f"Created {classifications_csv}") + 'language', 'channel_name', 'channel_link', 'video_length_seconds', 'video_date', + 'detailed_subtags', 'playlist_name', 'playlist_link', 'image_data', 'timestamp']) + log_success(f"Created {classifications_csv}", "๐Ÿ“Š") def load_existing_classifications(): @@ -831,21 +1038,21 @@ def load_existing_classifications(): ) if not df.empty else set() return set() except Exception as e: - print(f"Error loading classifications: {e}") + log_error(f"Error loading classifications: {e}") return set() -def save_classification(video_title, video_url, thumbnail_url, classification, language, channel_name, video_length, video_date, detailed_subtags, image_data): +def save_classification(video_title, video_url, thumbnail_url, classification, language, channel_name, channel_link, video_length, video_date, detailed_subtags, playlist_name, playlist_link, image_data): """Save a video classification to CSV.""" try: with open(classifications_csv, 'a', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow([video_title, video_url, thumbnail_url, classification, - language, channel_name, video_length, video_date, detailed_subtags, image_data, - time.strftime('%Y-%m-%d %H:%M:%S')]) - print(f"Saved classification: {video_title} -> {classification}") + language, channel_name, channel_link, video_length, video_date, detailed_subtags, + playlist_name, playlist_link, image_data, time.strftime('%Y-%m-%d %H:%M:%S')]) + log_success(f"Saved classification: {video_title} -> {classification}", "๐Ÿ’พ") except Exception as e: - print(f"Error saving classification: {e}") + log_error(f"Error saving classification: {e}") def get_video_info(): @@ -872,27 +1079,13 @@ def classify_video_with_ollama( video_title=video_title, existing_categories=existing_cats) - # Make request to Ollama - response = requests.post( - f'{OLLAMA_HOST}/api/generate', - json={ - 'model': OLLAMA_MODEL, - 'prompt': prompt, - 'images': [image_data], - 'stream': False - } - ) - - if response.status_code == 200: - result = response.json() - classification = result['response'].strip() - return classification - else: - print(f"Error from Ollama: {response.status_code}") - return "Uncategorized" + # Call with fallback support + classification = call_ollama_with_fallback(prompt, image_data, "video classification") + + return classification if classification else "Uncategorized" except Exception as e: - print(f"Error classifying video: {e}") + log_error(f"Error classifying video: {e}") return "Uncategorized" @@ -909,43 +1102,19 @@ def generate_detailed_subtags(video_title, thumbnail_path, classification): classification=classification ) - # Make request to Ollama - response = requests.post( - f'{OLLAMA_HOST}/api/generate', - json={ - 'model': OLLAMA_MODEL, - 'prompt': prompt, - 'images': [image_data], - 'stream': False - } - ) - - if response.status_code == 200: - result = response.json() - subtags = result['response'].strip() - return subtags - else: - print(f"Error from Ollama for sub-tags generation: {response.status_code}") - return "" + # Call with fallback support + subtags = call_ollama_with_fallback(prompt, image_data, "sub-tags generation") + + return subtags if subtags else "" except Exception as e: - print(f"Error generating sub-tags: {e}") + log_error(f"Error generating sub-tags: {e}") return "" + def create_playlist_and_add_video(classification, video_url): """Create a playlist based on classification and add video to it.""" - # This functionality would need to be implemented with Selenium - # For now, it's commented out - """ - try: - # Use Selenium to navigate to YouTube playlist creation - # This would require additional implementation - print(f"Would create/add to playlist: {classification}") - - except Exception as e: - print(f"Error creating playlist/adding video: {e}") - """ - print(f"[TODO] Would add video to playlist: {classification}") + log_process(f"[TODO] Would add video to playlist: {classification}", "๐Ÿ“‹") def delete_video(): @@ -961,7 +1130,7 @@ def on_press_to_quit(key): global quit try: if key.char == 'q': - print('Closing program...') + log_warning('Closing program...', "๐Ÿ›‘") quit = True except AttributeError: pass @@ -973,7 +1142,7 @@ def cleanup_browser(): if driver: try: driver.quit() - print("Browser closed.") + log_info("Browser closed.", "๐Ÿšช") except BaseException: pass @@ -981,16 +1150,27 @@ def cleanup_browser(): # Updated main execution logic if __name__ == '__main__': try: + # Create a beautiful title + title = Text("๐ŸŽฌ YouTube Video Classifier", style="bold magenta") + console.print(Panel.fit(title, border_style="bright_magenta")) + console.print() + + # Check Ollama models before starting + log_info("Checking Ollama models availability...", "๐Ÿ”") + if not check_ollama_models(): + log_error("Required models not available. Please install them first.") + sys.exit(1) + # Initialize CSV file init_csv() # Load existing classifications existing_classifications = load_existing_classifications() - print(f"Loaded {len(existing_classifications)} existing classifications: {existing_classifications}") + log_info(f"Loaded {len(existing_classifications)} existing classifications: {existing_classifications}", "๐Ÿ“š") # Initialize browser if not init_browser(): - print("Failed to initialize browser. Exiting.") + log_error("Failed to initialize browser. Exiting.") sys.exit(1) counter = 0 @@ -999,19 +1179,32 @@ if __name__ == '__main__': listener = kb.Listener(on_press=on_press_to_quit) listener.start() - print("\nStarting video processing...") - print("Press 'q' to quit at any time.") + log_success("Starting video processing...", "๐Ÿš€", True) + log_info("Press 'q' to quit at any time.", "โŒจ๏ธ") + log_info(f"Primary model: {OLLAMA_MODEL} (timeout: {LLM_PRIMARY_TIMEOUT}s)", "๐Ÿค–") + log_info(f"Fallback model: {OLLAMA_FALLBACK_MODEL} (timeout: {LLM_FALLBACK_TIMEOUT}s)", "๐Ÿค–") + + playlist_name = "Unknown Playlist" + playlist_link = "Unknown URL" while not quit: try: + # Extract playlist information once per session + if counter == 0: + playlist_name, playlist_link = extract_playlist_info() + console.print() # Add spacing after playlist info + # Extract video information using web scraping video_data = get_video_info() - if len(video_data) == 6: # New format with all data + if len(video_data) == 7: # New format with all data including channel_link + video_title, video_url, thumbnail_path, channel_name, channel_link, video_length, video_date = video_data + elif len(video_data) == 6: # Old format with channel_name but no channel_link video_title, video_url, thumbnail_path, channel_name, video_length, video_date = video_data + channel_link = "Unknown Channel Link" else: # Fallback to old format video_title, video_url, thumbnail_path = video_data[:3] - channel_name, video_length, video_date = "Unknown Channel", 0, "Unknown Date" + channel_name, channel_link, video_length, video_date = "Unknown Channel", "Unknown Channel Link", 0, "Unknown Date" if video_title and video_url and thumbnail_path: # Convert thumbnail to base64 for storage @@ -1019,39 +1212,41 @@ if __name__ == '__main__': try: with open(thumbnail_path, "rb") as image_file: image_data = base64.b64encode(image_file.read()).decode('utf-8') - print(f"Thumbnail converted to base64 ({len(image_data)} characters)") + log_process(f"Thumbnail converted to base64 ({len(image_data)} characters)", "๐Ÿ“ธ", True) except Exception as e: - print(f"Error converting thumbnail to base64: {e}") + log_error(f"Error converting thumbnail to base64: {e}") image_data = "" - # Detect language using Ollama - print(f"\nDetecting language for video: {video_title}") + # Detect language using Ollama with fallback + log_process(f"Detecting language for video: {video_title}", "๐ŸŒ", True) language = detect_video_language(video_title, thumbnail_path) - print(f"Language detected: {language}") + log_success(f"Language detected: {language}", "๐Ÿ—ฃ๏ธ") - # Classify video using Ollama - print(f"Classifying video: {video_title}") + # Classify video using Ollama with fallback + log_process(f"Classifying video: {video_title}", "๐Ÿท๏ธ", True) classification = classify_video_with_ollama( video_title, thumbnail_path, existing_classifications) - print(f"Classification result: {classification}") + log_success(f"Classification result: {classification}", "๐Ÿ“‚") # Let's be gentle with the API time.sleep(1) - # Generate detailed sub-tags using Ollama - print(f"Generating detailed sub-tags for video: {video_title}") + # Generate detailed sub-tags using Ollama with fallback + log_process(f"Generating detailed sub-tags for video: {video_title}", "๐Ÿท๏ธ", True) detailed_subtags = generate_detailed_subtags(video_title, thumbnail_path, classification) - print(f"Detailed sub-tags: {detailed_subtags}") + log_success(f"Detailed sub-tags: {detailed_subtags}", "๐Ÿ”–") - # Save classification to CSV with new data including image data + # Save classification to CSV with new data including channel link save_classification( video_title, video_url, thumbnail_path, classification, - language, channel_name, video_length, video_date, detailed_subtags, image_data) + language, channel_name, channel_link, video_length, video_date, detailed_subtags, + playlist_name, playlist_link, image_data) # Update existing classifications set existing_classifications.add(classification) # Create playlist and add video (commented for testing) + log_process("Actions with playlist:", "๐Ÿ“‹", True) create_playlist_and_add_video(classification, video_url) # Remove video from current playlist @@ -1059,19 +1254,19 @@ if __name__ == '__main__': if remove_success: counter += 1 - print(f"Processed {counter} videos.") + log_success(f"Processed {counter} videos.", "โœ…", True) # Navigate to next video if not navigate_to_next_video(): - print("No more videos to process.") + log_info("No more videos to process.", "๐Ÿ") break else: # If can't remove, try to navigate to next video anyway if not navigate_to_next_video(): - print("Could not navigate to next video.") + log_error("Could not navigate to next video.") break counter += 1 - print(f"Processed {counter} videos (removal failed).") + log_warning(f"Processed {counter} videos (removal failed).", add_newline_before=True) # Clean up temporary thumbnail if os.path.exists(thumbnail_path): @@ -1080,24 +1275,32 @@ if __name__ == '__main__': time.sleep(2) # Brief pause between videos else: - print("Could not extract video information") + log_error("Could not extract video information") # Try to navigate to next video anyway if not navigate_to_next_video(): break except Exception as e: - print(f"Error in main processing loop: {e}") + log_error(f"Error in main processing loop: {e}") # Try to continue with next video if not navigate_to_next_video(): break listener.stop() cleanup_browser() - print("Script finished.") + + # Final summary + completion_panel = Panel.fit( + f"[bold green]โœจ Script completed successfully![/bold green]\n[cyan]Total videos processed: {counter}[/cyan]", + title="[bold blue]๐ŸŽฏ Completion Summary[/bold blue]", + border_style="bright_green" + ) + console.print() + console.print(completion_panel) except KeyboardInterrupt: - print("\nScript interrupted by user.") + log_warning("Script interrupted by user.", add_newline_before=True) cleanup_browser() except Exception as e: - print(f"Unexpected error: {e}") + log_error(f"Unexpected error: {e}", "๐Ÿ’ฅ") cleanup_browser()