diff --git a/.gitignore b/.gitignore index 5d381cc..1e62605 100644 --- a/.gitignore +++ b/.gitignore @@ -87,6 +87,7 @@ ipython_config.py # For a library or package, you might want to ignore these files since the code is # intended to run in multiple environments; otherwise, check them in: # .python-version +.env/ # pipenv # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. diff --git a/examples/inner_test.py b/examples/inner_test.py new file mode 100644 index 0000000..d5e0f6b --- /dev/null +++ b/examples/inner_test.py @@ -0,0 +1,21 @@ +import shadowtube.preprocess as prep +import shadowtube.recommend as rec + +raw_history = prep.parse_database("./short.db") + +# print(raw_history[0]["title"] + ": " + str(prep.relevancy(raw_history[0], raw_history))) +# print( +# raw_history[28]["title"] + ": " + str(prep.relevancy(raw_history[28], raw_history)) +# ) + +# for i in range(0, 10): +# print(prep.get_similarity(raw_history[i], raw_history)) + +history = prep.sort_history(raw_history) +print(len(history)) + +# print(recommend(history)) + +recommendations = rec.recommend(history, verbose=False) + +print(recommendations) diff --git a/examples/tkinter_preview.py b/examples/tkinter_preview.py new file mode 100644 index 0000000..cda311d --- /dev/null +++ b/examples/tkinter_preview.py @@ -0,0 +1,82 @@ +import requests +from io import BytesIO +from PIL import Image, ImageTk +import tkinter as tk +import yt_dlp +import shadowtube.recommend as rec +import shadowtube.preprocess as prep + +# kjdshfgklshdfjkglkadshf + +import webbrowser + +# List of 8 YouTube video IDs +video_ids = rec.recommend( + prep.sort_history( + prep.parse_database( + "/home/fedir/.var/app/io.freetubeapp.FreeTube/config/FreeTube/history.db" + ) + ), + verbose=True, + count=8, +) + +print(video_ids) + + +def get_video_info(video_id): + """Fetch the title and thumbnail URL of a YouTube video using yt_dlp.""" + ydl_opts = {"quiet": True, "no_warnings": True, "extract_flat": True} + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + info = ydl.extract_info( + f"https://www.youtube.com/watch?v={video_id}", download=False + ) + return info.get("title", "Title Not Found"), info.get("thumbnail", "") + + +def open_video(event, video_id): + """Open the YouTube video in the default web browser when clicked.""" + webbrowser.open(f"https://www.youtube.com/watch?v={video_id}") + + +def show_video_preview(video_id, row, col): + """Fetch and display a video's title and thumbnail in a grid layout.""" + title, thumbnail_url = get_video_info(video_id) + + # Fetch thumbnail + response = requests.get(thumbnail_url) + if response.status_code == 200: + image_data = Image.open(BytesIO(response.content)) + image_data = image_data.resize((200, 112)) # Resize for grid + photo = ImageTk.PhotoImage(image_data) + else: + photo = None + + # Create thumbnail label (clickable) + thumbnail_label = tk.Label(root, image=photo, cursor="hand2") + thumbnail_label.image = photo # Keep reference + thumbnail_label.grid(row=row, column=col, padx=10, pady=10) + thumbnail_label.bind("<Button-1>", lambda event, v=video_id: open_video(event, v)) + + # Create title label (wrapped text) + title_label = tk.Label( + root, text=title, font=("Arial", 10, "bold"), wraplength=200, justify="center" + ) + title_label.grid(row=row + 1, column=col, padx=10, pady=5) + + +# Create Tkinter window +root = tk.Tk() +root.title("YouTube Video Previews") + +# Add all 8 videos in a 2x4 grid +for index, video_id in enumerate(video_ids): + row = (index // 4) * 2 # Every second row for the title + col = index % 4 + show_video_preview(video_id, row, col) + +root.mainloop() + + +# kjdhfglkjsdhfgkljhsdkfg diff --git a/shadowtube/__init__.py b/shadowtube/__init__.py new file mode 100644 index 0000000..4607170 --- /dev/null +++ b/shadowtube/__init__.py @@ -0,0 +1,2 @@ +from .preprocess import * +from .recommend import * diff --git a/shadowtube/cache.db b/shadowtube/cache.db new file mode 100644 index 0000000..249af3d Binary files /dev/null and b/shadowtube/cache.db differ diff --git a/shadowtube/cache_similarity.db b/shadowtube/cache_similarity.db new file mode 100644 index 0000000..53926c4 Binary files /dev/null and b/shadowtube/cache_similarity.db differ diff --git a/shadowtube/preprocess.py b/shadowtube/preprocess.py new file mode 100644 index 0000000..5f2cd88 --- /dev/null +++ b/shadowtube/preprocess.py @@ -0,0 +1,211 @@ +import json +import math +from typing import List +from sentence_transformers import SentenceTransformer +from sklearn.metrics.pairwise import cosine_similarity +import numpy as np +import sqlite3 + +model = None + +cache = sqlite3.connect("cache_similarity.db") +cursor = cache.cursor() + + +def parse_database(filename): + parsed_data = [] + + with open(filename, "r", encoding="utf-8") as file: + for line in file: + line = line.strip() + if not line: + continue + + try: + parsed_data.append(json.loads(line)) + except json.JSONDecodeError: + # Handle unquoted values by fixing the line + fixed_line = fix_unquoted_values(line) + parsed_data.append(json.loads(fixed_line)) + + return parsed_data + + +def get_embedding(text: str): + global model + if model is None: + model = SentenceTransformer("all-MiniLM-L6-v2", backend="openvino") + return model.encode(text, normalize_embeddings=True) + + +def compute_similarity(entry_embedding, database_embeddings): + similarities = cosine_similarity([entry_embedding], database_embeddings)[0] + return similarities + + +def get_cached_similarity_reflexive(entry1, entry2, verbose=False): + response = [] + response.extend(get_cached_similarity(entry1, entry2, verbose=verbose)) + response.extend(get_cached_similarity(entry2, entry1, verbose=verbose)) + if len(response) > 1: + print("WARN: duplicate pairs!") + return response + + +def get_cached_similarity(entry1, entry2, verbose=False): + query = ( + "SELECT factor FROM similarity WHERE id='" + + (entry1["videoId"] + "/" + entry2["videoId"]) + + "'" + ) + if verbose: + print("INFO: querying database: " + query + " (get_cached_similarity)") + cursor.execute(query) + response = cursor.fetchall() + if verbose: + print("INFO: response was: " + response.__str__() + "(get_cached_similarity)") + formatted_response = [] + for tup in response: + formatted_response.append(float(tup[0])) + return formatted_response + + +def set_cached_similarity(ids, similarities: List[float], verbose=False): + pair_insert = [] + for i in range(0, len(ids)): + pair_insert.append((ids[i], float(similarities[i]))) + # print(pair_insert) + cursor.executemany( + "INSERT INTO similarity VALUES(?, ?)", + pair_insert, + ) + cache.commit() + + +def get_similarity(entry1, entry2, use_cache=True): + similarity = [] + if use_cache: + similarity = get_cached_similarity_reflexive(entry1, entry2) + if len(similarity) == 0: + entry1_embedding = get_embedding( + entry1["title"] + " " + entry1["description"] + ) + entry2_embedding = get_embedding( + entry2["title"] + " " + entry2["description"] + ) + similarity.append(compute_similarity(entry1_embedding, [entry2_embedding])) + set_cached_similarity( + [entry1["videoId"] + "/" + entry2["videoId"]], similarity[0] + ) + else: + entry1_embedding = get_embedding(entry1["title"] + " " + entry1["description"]) + entry2_embedding = get_embedding(entry2["title"] + " " + entry2["description"]) + similarity.append(compute_similarity(entry1_embedding, [entry2_embedding])) + + return similarity[0] + + +def get_global_similarity(entry, database, k=10, use_cache=True, verbose=False): + entry_text = entry["title"] + " " + entry["description"] + + similarities: List[float] = [] + + # Get all embeddings in the database + database_texts = [] + text_keys = [] + for e in database: + if entry["videoId"] != e["videoId"]: + cached = get_cached_similarity_reflexive(entry, e, verbose=verbose) + if len(cached) == 0: + text_keys.append(entry["videoId"] + "/" + e["videoId"]) + if "description" not in e: + print(e["title"]) + database_texts.append(e["title"]) + else: + database_texts.append(e["title"] + " " + e["description"]) + else: + similarities.append(cached[0]) + + # Compute similarity + # print(len(text_keys)) + if len(text_keys) > 0: + entry_embedding = get_embedding(entry_text) + database_embeddings = np.array([get_embedding(text) for text in database_texts]) + computed_similarities: List[float] = compute_similarity( + entry_embedding, database_embeddings + ) + set_cached_similarity(text_keys, computed_similarities) + similarities.extend(computed_similarities) + + # print(similarities) + + # Exclude self-similarity + similarities_sorted = np.sort(similarities)[-k:-1] + + # Normalize score to [0, 1] + return float(np.mean(similarities_sorted)) + + +def sort_history(history): + """returns the same database, but with values sorted by the watch time and freshness.""" + sorted_history = history + + max_time_watched = -math.inf + min_time_watched = math.inf + max_watch_progress = -math.inf + min_watch_progress = math.inf + for entry in history: + if "timeWatched" in entry: + if int(entry["timeWatched"]) > max_time_watched: + max_time_watched = int(entry["timeWatched"]) + if int(entry["timeWatched"]) < min_time_watched: + min_time_watched = int(entry["timeWatched"]) + if "watchProgress" in entry: + if int(entry["watchProgress"]) > max_watch_progress: + max_watch_progress = int(entry["watchProgress"]) + if int(entry["watchProgress"]) < min_watch_progress: + min_watch_progress = int(entry["watchProgress"]) + + wp_factor = max_watch_progress - min_watch_progress + wp_offset = min_watch_progress + + tw_factor = max_time_watched - min_time_watched + tw_offset = min_time_watched + + def quality(entry): + q = 0 + if "timeWatched" in entry: + q += (entry["timeWatched"] - tw_offset) / tw_factor + else: + q += 0.5 + + if "watchProgress" in entry: + q += (entry["watchProgress"] - wp_offset) / wp_factor + else: + q += 0.5 + + # EXPERIMENTAL!!! WILL MAKE COMPUTER EXPLODE!!! + # q += get_similarity(entry, history) + + return (2 - q) / 2 + + for entry in sorted_history: + entry["quality"] = quality(entry) + + sorted_history.sort(key=lambda x: x["quality"]) + + return sorted_history + + +def fix_unquoted_values(line): + """Attempts to fix unquoted values by adding quotes around them.""" + import re + + def replacer(match): + key, value = match.groups() + if not (value.startswith('"') and value.endswith('"')): + value = f'"{value}"' # Add quotes around the value + return f'"{key}":{value}' + + fixed_line = re.sub(r'"(\w+)":(\w+)', replacer, line) + return fixed_line diff --git a/shadowtube/recommend.py b/shadowtube/recommend.py new file mode 100644 index 0000000..7fcdbed --- /dev/null +++ b/shadowtube/recommend.py @@ -0,0 +1,155 @@ +import sqlite3 +from random import sample + +from innertube.clients import InnerTube + +client = InnerTube("WEB") +cache = sqlite3.connect("cache.db") +cursor = cache.cursor() + + +# def recommend(history): +# recommendations = [] +# for video_id in history: +# video = client.next(video_id) +# recommendations += fetch_recommended(video) +# return recommendations + + +# data = client.search(query="distrotube") +# video = client.next("BV1O7RR-VoA") + + +def fetch_recommended(video, verbose=False): + # we need to fetch all of the recommended videos + recommended = [] + + try: + recommended.append( + video["contents"]["twoColumnWatchNextResults"]["autoplay"]["autoplay"][ + "sets" + ][0]["autoplayVideo"]["watchEndpoint"]["videoId"] + ) + except: + if verbose: + print("ERR:no autoplay (fetch_recommended)") + + try: + reccount = len( + video["contents"]["twoColumnWatchNextResults"]["secondaryResults"][ + "secondaryResults" + ]["results"] + ) + + if verbose: + print( + "INFO: Recommendation count is " + + str(reccount) + + " (fetch_recommended)" + ) + except: + if verbose: + print("ERR:no recommendations (fetch_recommended)") + else: + for rec in video["contents"]["twoColumnWatchNextResults"]["secondaryResults"][ + "secondaryResults" + ]["results"]: + if "compactVideoRenderer" in rec: + recommended.append(rec["compactVideoRenderer"]["videoId"]) + else: + if verbose: + print("WARN:invalid format (fetch_recommended)") + + if len(recommended) == 0: + recommended.append("") + + return recommended + + +def fetch_from_database(video_id, verbose=False): + query = "SELECT vid2 FROM recommends WHERE vid1='" + video_id + "'" + if verbose: + print("INFO: querying database: " + query) + cursor.execute(query) + response = cursor.fetchall() + if verbose: + print("INFO: response was: " + response.__str__()) + formatted_response = [] + for tup in response: + formatted_response.append(tup[0]) + return formatted_response + + +def insert_to_database(video_id, recommended): + pair_insert = [] + for vid in recommended: + pair_insert.append((video_id, vid)) + # print(pair_insert) + cursor.executemany( + "INSERT INTO recommends VALUES(?, ?)", + pair_insert, + ) + cache.commit() + + +def recommend_by_video(video_id, use_cache=True, verbose=False): + recommended = [] + if use_cache: + response = fetch_from_database(video_id, verbose=verbose) + if len(response) == 0: + recommended_now = fetch_recommended(client.next(video_id), verbose=verbose) + if len(recommended_now) != 0 and recommended_now[0] != "": + insert_to_database(video_id, recommended_now) + recommended = recommended_now + else: + recommended = response + else: + recommended = fetch_recommended(client.next(video_id), verbose=verbose) + return recommended + + +# minp - cuts off all videos that have worse value than `minp`. Set to 0 to disable. +def sampler(history, count, minp: float = 0): + """Selects `count` random videos from history.""" + videos = [] + for i in range(0, count): + entry = sample(history, 1)[0] + if "quality" in entry: + while entry["quality"] < minp: + entry = sample(history, 1)[0] + videos.append(entry) + + return videos + + +def remove_viewed(history, videos): + new_videos = videos.copy() + for entry in history: + for video in new_videos: + if entry["videoId"] == video: + new_videos.remove(video) + return new_videos + + +def recommend(history, count=8, verbose=False, use_cache=True): + recommended = [] + initial_videos = sampler(history, count, minp=0.8) + for entry in initial_videos: + video_id = entry["videoId"] + title = entry["title"] + if verbose: + print("INFO:recommending by " + video_id + ': "' + title + '"') + rec = remove_viewed( + history, recommend_by_video(video_id, use_cache=use_cache, verbose=verbose) + ) + if rec[0] != "": + recommended.append(sample(rec, k=1)[0]) + + return recommended + + +def id_to_link(id_list): + link_list = [] + for id in id_list: + link_list.append("https://youtu.be/" + id) + return link_list