Added old code

This commit is contained in:
BritishTeapot 2025-02-17 20:08:35 +01:00
parent bb3d40304d
commit f9253e4f50
8 changed files with 472 additions and 0 deletions

1
.gitignore vendored
View File

@ -87,6 +87,7 @@ ipython_config.py
# For a library or package, you might want to ignore these files since the code is # For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in: # intended to run in multiple environments; otherwise, check them in:
# .python-version # .python-version
.env/
# pipenv # pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.

21
examples/inner_test.py Normal file
View File

@ -0,0 +1,21 @@
import shadowtube.preprocess as prep
import shadowtube.recommend as rec
raw_history = prep.parse_database("./short.db")
# print(raw_history[0]["title"] + ": " + str(prep.relevancy(raw_history[0], raw_history)))
# print(
# raw_history[28]["title"] + ": " + str(prep.relevancy(raw_history[28], raw_history))
# )
# for i in range(0, 10):
# print(prep.get_similarity(raw_history[i], raw_history))
history = prep.sort_history(raw_history)
print(len(history))
# print(recommend(history))
recommendations = rec.recommend(history, verbose=False)
print(recommendations)

View File

@ -0,0 +1,82 @@
import requests
from io import BytesIO
from PIL import Image, ImageTk
import tkinter as tk
import yt_dlp
import shadowtube.recommend as rec
import shadowtube.preprocess as prep
# kjdshfgklshdfjkglkadshf
import webbrowser
# List of 8 YouTube video IDs
video_ids = rec.recommend(
prep.sort_history(
prep.parse_database(
"/home/fedir/.var/app/io.freetubeapp.FreeTube/config/FreeTube/history.db"
)
),
verbose=True,
count=8,
)
print(video_ids)
def get_video_info(video_id):
"""Fetch the title and thumbnail URL of a YouTube video using yt_dlp."""
ydl_opts = {"quiet": True, "no_warnings": True, "extract_flat": True}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(
f"https://www.youtube.com/watch?v={video_id}", download=False
)
return info.get("title", "Title Not Found"), info.get("thumbnail", "")
def open_video(event, video_id):
"""Open the YouTube video in the default web browser when clicked."""
webbrowser.open(f"https://www.youtube.com/watch?v={video_id}")
def show_video_preview(video_id, row, col):
"""Fetch and display a video's title and thumbnail in a grid layout."""
title, thumbnail_url = get_video_info(video_id)
# Fetch thumbnail
response = requests.get(thumbnail_url)
if response.status_code == 200:
image_data = Image.open(BytesIO(response.content))
image_data = image_data.resize((200, 112)) # Resize for grid
photo = ImageTk.PhotoImage(image_data)
else:
photo = None
# Create thumbnail label (clickable)
thumbnail_label = tk.Label(root, image=photo, cursor="hand2")
thumbnail_label.image = photo # Keep reference
thumbnail_label.grid(row=row, column=col, padx=10, pady=10)
thumbnail_label.bind("<Button-1>", lambda event, v=video_id: open_video(event, v))
# Create title label (wrapped text)
title_label = tk.Label(
root, text=title, font=("Arial", 10, "bold"), wraplength=200, justify="center"
)
title_label.grid(row=row + 1, column=col, padx=10, pady=5)
# Create Tkinter window
root = tk.Tk()
root.title("YouTube Video Previews")
# Add all 8 videos in a 2x4 grid
for index, video_id in enumerate(video_ids):
row = (index // 4) * 2 # Every second row for the title
col = index % 4
show_video_preview(video_id, row, col)
root.mainloop()
# kjdhfglkjsdhfgkljhsdkfg

2
shadowtube/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from .preprocess import *
from .recommend import *

BIN
shadowtube/cache.db Normal file

Binary file not shown.

Binary file not shown.

211
shadowtube/preprocess.py Normal file
View File

@ -0,0 +1,211 @@
import json
import math
from typing import List
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import sqlite3
model = None
cache = sqlite3.connect("cache_similarity.db")
cursor = cache.cursor()
def parse_database(filename):
parsed_data = []
with open(filename, "r", encoding="utf-8") as file:
for line in file:
line = line.strip()
if not line:
continue
try:
parsed_data.append(json.loads(line))
except json.JSONDecodeError:
# Handle unquoted values by fixing the line
fixed_line = fix_unquoted_values(line)
parsed_data.append(json.loads(fixed_line))
return parsed_data
def get_embedding(text: str):
global model
if model is None:
model = SentenceTransformer("all-MiniLM-L6-v2", backend="openvino")
return model.encode(text, normalize_embeddings=True)
def compute_similarity(entry_embedding, database_embeddings):
similarities = cosine_similarity([entry_embedding], database_embeddings)[0]
return similarities
def get_cached_similarity_reflexive(entry1, entry2, verbose=False):
response = []
response.extend(get_cached_similarity(entry1, entry2, verbose=verbose))
response.extend(get_cached_similarity(entry2, entry1, verbose=verbose))
if len(response) > 1:
print("WARN: duplicate pairs!")
return response
def get_cached_similarity(entry1, entry2, verbose=False):
query = (
"SELECT factor FROM similarity WHERE id='"
+ (entry1["videoId"] + "/" + entry2["videoId"])
+ "'"
)
if verbose:
print("INFO: querying database: " + query + " (get_cached_similarity)")
cursor.execute(query)
response = cursor.fetchall()
if verbose:
print("INFO: response was: " + response.__str__() + "(get_cached_similarity)")
formatted_response = []
for tup in response:
formatted_response.append(float(tup[0]))
return formatted_response
def set_cached_similarity(ids, similarities: List[float], verbose=False):
pair_insert = []
for i in range(0, len(ids)):
pair_insert.append((ids[i], float(similarities[i])))
# print(pair_insert)
cursor.executemany(
"INSERT INTO similarity VALUES(?, ?)",
pair_insert,
)
cache.commit()
def get_similarity(entry1, entry2, use_cache=True):
similarity = []
if use_cache:
similarity = get_cached_similarity_reflexive(entry1, entry2)
if len(similarity) == 0:
entry1_embedding = get_embedding(
entry1["title"] + " " + entry1["description"]
)
entry2_embedding = get_embedding(
entry2["title"] + " " + entry2["description"]
)
similarity.append(compute_similarity(entry1_embedding, [entry2_embedding]))
set_cached_similarity(
[entry1["videoId"] + "/" + entry2["videoId"]], similarity[0]
)
else:
entry1_embedding = get_embedding(entry1["title"] + " " + entry1["description"])
entry2_embedding = get_embedding(entry2["title"] + " " + entry2["description"])
similarity.append(compute_similarity(entry1_embedding, [entry2_embedding]))
return similarity[0]
def get_global_similarity(entry, database, k=10, use_cache=True, verbose=False):
entry_text = entry["title"] + " " + entry["description"]
similarities: List[float] = []
# Get all embeddings in the database
database_texts = []
text_keys = []
for e in database:
if entry["videoId"] != e["videoId"]:
cached = get_cached_similarity_reflexive(entry, e, verbose=verbose)
if len(cached) == 0:
text_keys.append(entry["videoId"] + "/" + e["videoId"])
if "description" not in e:
print(e["title"])
database_texts.append(e["title"])
else:
database_texts.append(e["title"] + " " + e["description"])
else:
similarities.append(cached[0])
# Compute similarity
# print(len(text_keys))
if len(text_keys) > 0:
entry_embedding = get_embedding(entry_text)
database_embeddings = np.array([get_embedding(text) for text in database_texts])
computed_similarities: List[float] = compute_similarity(
entry_embedding, database_embeddings
)
set_cached_similarity(text_keys, computed_similarities)
similarities.extend(computed_similarities)
# print(similarities)
# Exclude self-similarity
similarities_sorted = np.sort(similarities)[-k:-1]
# Normalize score to [0, 1]
return float(np.mean(similarities_sorted))
def sort_history(history):
"""returns the same database, but with values sorted by the watch time and freshness."""
sorted_history = history
max_time_watched = -math.inf
min_time_watched = math.inf
max_watch_progress = -math.inf
min_watch_progress = math.inf
for entry in history:
if "timeWatched" in entry:
if int(entry["timeWatched"]) > max_time_watched:
max_time_watched = int(entry["timeWatched"])
if int(entry["timeWatched"]) < min_time_watched:
min_time_watched = int(entry["timeWatched"])
if "watchProgress" in entry:
if int(entry["watchProgress"]) > max_watch_progress:
max_watch_progress = int(entry["watchProgress"])
if int(entry["watchProgress"]) < min_watch_progress:
min_watch_progress = int(entry["watchProgress"])
wp_factor = max_watch_progress - min_watch_progress
wp_offset = min_watch_progress
tw_factor = max_time_watched - min_time_watched
tw_offset = min_time_watched
def quality(entry):
q = 0
if "timeWatched" in entry:
q += (entry["timeWatched"] - tw_offset) / tw_factor
else:
q += 0.5
if "watchProgress" in entry:
q += (entry["watchProgress"] - wp_offset) / wp_factor
else:
q += 0.5
# EXPERIMENTAL!!! WILL MAKE COMPUTER EXPLODE!!!
# q += get_similarity(entry, history)
return (2 - q) / 2
for entry in sorted_history:
entry["quality"] = quality(entry)
sorted_history.sort(key=lambda x: x["quality"])
return sorted_history
def fix_unquoted_values(line):
"""Attempts to fix unquoted values by adding quotes around them."""
import re
def replacer(match):
key, value = match.groups()
if not (value.startswith('"') and value.endswith('"')):
value = f'"{value}"' # Add quotes around the value
return f'"{key}":{value}'
fixed_line = re.sub(r'"(\w+)":(\w+)', replacer, line)
return fixed_line

155
shadowtube/recommend.py Normal file
View File

@ -0,0 +1,155 @@
import sqlite3
from random import sample
from innertube.clients import InnerTube
client = InnerTube("WEB")
cache = sqlite3.connect("cache.db")
cursor = cache.cursor()
# def recommend(history):
# recommendations = []
# for video_id in history:
# video = client.next(video_id)
# recommendations += fetch_recommended(video)
# return recommendations
# data = client.search(query="distrotube")
# video = client.next("BV1O7RR-VoA")
def fetch_recommended(video, verbose=False):
# we need to fetch all of the recommended videos
recommended = []
try:
recommended.append(
video["contents"]["twoColumnWatchNextResults"]["autoplay"]["autoplay"][
"sets"
][0]["autoplayVideo"]["watchEndpoint"]["videoId"]
)
except:
if verbose:
print("ERR:no autoplay (fetch_recommended)")
try:
reccount = len(
video["contents"]["twoColumnWatchNextResults"]["secondaryResults"][
"secondaryResults"
]["results"]
)
if verbose:
print(
"INFO: Recommendation count is "
+ str(reccount)
+ " (fetch_recommended)"
)
except:
if verbose:
print("ERR:no recommendations (fetch_recommended)")
else:
for rec in video["contents"]["twoColumnWatchNextResults"]["secondaryResults"][
"secondaryResults"
]["results"]:
if "compactVideoRenderer" in rec:
recommended.append(rec["compactVideoRenderer"]["videoId"])
else:
if verbose:
print("WARN:invalid format (fetch_recommended)")
if len(recommended) == 0:
recommended.append("")
return recommended
def fetch_from_database(video_id, verbose=False):
query = "SELECT vid2 FROM recommends WHERE vid1='" + video_id + "'"
if verbose:
print("INFO: querying database: " + query)
cursor.execute(query)
response = cursor.fetchall()
if verbose:
print("INFO: response was: " + response.__str__())
formatted_response = []
for tup in response:
formatted_response.append(tup[0])
return formatted_response
def insert_to_database(video_id, recommended):
pair_insert = []
for vid in recommended:
pair_insert.append((video_id, vid))
# print(pair_insert)
cursor.executemany(
"INSERT INTO recommends VALUES(?, ?)",
pair_insert,
)
cache.commit()
def recommend_by_video(video_id, use_cache=True, verbose=False):
recommended = []
if use_cache:
response = fetch_from_database(video_id, verbose=verbose)
if len(response) == 0:
recommended_now = fetch_recommended(client.next(video_id), verbose=verbose)
if len(recommended_now) != 0 and recommended_now[0] != "":
insert_to_database(video_id, recommended_now)
recommended = recommended_now
else:
recommended = response
else:
recommended = fetch_recommended(client.next(video_id), verbose=verbose)
return recommended
# minp - cuts off all videos that have worse value than `minp`. Set to 0 to disable.
def sampler(history, count, minp: float = 0):
"""Selects `count` random videos from history."""
videos = []
for i in range(0, count):
entry = sample(history, 1)[0]
if "quality" in entry:
while entry["quality"] < minp:
entry = sample(history, 1)[0]
videos.append(entry)
return videos
def remove_viewed(history, videos):
new_videos = videos.copy()
for entry in history:
for video in new_videos:
if entry["videoId"] == video:
new_videos.remove(video)
return new_videos
def recommend(history, count=8, verbose=False, use_cache=True):
recommended = []
initial_videos = sampler(history, count, minp=0.8)
for entry in initial_videos:
video_id = entry["videoId"]
title = entry["title"]
if verbose:
print("INFO:recommending by " + video_id + ': "' + title + '"')
rec = remove_viewed(
history, recommend_by_video(video_id, use_cache=use_cache, verbose=verbose)
)
if rec[0] != "":
recommended.append(sample(rec, k=1)[0])
return recommended
def id_to_link(id_list):
link_list = []
for id in id_list:
link_list.append("https://youtu.be/" + id)
return link_list