diff --git a/shadowtube/history.py b/shadowtube/history.py index fbc2766..3f3eb77 100644 --- a/shadowtube/history.py +++ b/shadowtube/history.py @@ -1,18 +1,81 @@ -class History: #Abstract class - def __init__(self, filename): - self.history = list() - - def __size__(self): - return len(history) - - def parse_history(self, filename): - return history +import json +from typing import Iterator +from .types import Video +from abc import ABC, abstractmethod - def is_this_type(self, filename): # bool function, - return false # returns false if Youtube history - def get_video(self, index): - return history[index] +class History(ABC): # Abstract class + @abstractmethod + def __len__(self) -> int: + pass + + @staticmethod + def parse_history(filename: str) -> "History": + return FreeTubeHistory(filename) + + @abstractmethod + def is_this_type(self, filename: str) -> bool: + pass + + @abstractmethod + def get_video(self, index: int) -> Video: + pass + + @abstractmethod + def __iter__(self) -> Iterator[Video]: + pass + + +class FreeTubeHistory(History): + def __init__(self, filename: str) -> None: + parsed_data = [] + + with open(filename, "r", encoding="utf-8") as file: + for line in file: + line = line.strip() + if not line: + continue + try: + parsed_data.append(json.loads(line)) + except json.JSONDecodeError: + fixed_line = FreeTubeHistory._fix_unquoted_values(line) + parsed_data.append(json.loads(fixed_line)) + + self._parsed_data = parsed_data + + @staticmethod + def _fix_unquoted_values(line: str) -> str: + """Attempts to fix unquoted values by adding quotes around them.""" + import re + + def replacer(match): + key, value = match.groups() + if not (value.startswith('"') and value.endswith('"')): + value = f'"{value}"' # Add quotes around the value + return f'"{key}":{value}' + + fixed_line = re.sub(r'"(\w+)":(\w+)', replacer, line) + return fixed_line + + @staticmethod + def _to_video(entry) -> Video: + return Video( + id=entry["videoId"], + title=entry["title"], + description=entry["description"], + watch_time=entry["timeWatched"], + watch_progress=entry["watchProgress"], + ) + + def __len__(self): + return len(self._parsed_data) + + def is_this_type(self, filename: str) -> bool: + raise NotImplementedError() + + def get_video(self, index: int) -> Video: + return FreeTubeHistory._to_video(self._parsed_data[index]) def __iter__(self): - return iter(history) + for entry in self._parsed_data: + yield FreeTubeHistory._to_video(entry)