diff --git a/shadowtube/history.py b/shadowtube/history.py index 84b0702..3f3eb77 100644 --- a/shadowtube/history.py +++ b/shadowtube/history.py @@ -1,15 +1,33 @@ import json +from typing import Iterator +from .types import Video +from abc import ABC, abstractmethod -class History: #Abstract class - def __init__(self, filename: str): - #self.history = list() - pass - - def __size__(self): + +class History(ABC): # Abstract class + @abstractmethod + def __len__(self) -> int: pass - @staticmethod - def parse_history(self, filename: str): + @staticmethod + def parse_history(filename: str) -> "History": + return FreeTubeHistory(filename) + + @abstractmethod + def is_this_type(self, filename: str) -> bool: + pass + + @abstractmethod + def get_video(self, index: int) -> Video: + pass + + @abstractmethod + def __iter__(self) -> Iterator[Video]: + pass + + +class FreeTubeHistory(History): + def __init__(self, filename: str) -> None: parsed_data = [] with open(filename, "r", encoding="utf-8") as file: @@ -20,17 +38,44 @@ class History: #Abstract class try: parsed_data.append(json.loads(line)) except json.JSONDecodeError: - fixed_line = fix_unquoted_values(line) + fixed_line = FreeTubeHistory._fix_unquoted_values(line) parsed_data.append(json.loads(fixed_line)) - return parsed_data + self._parsed_data = parsed_data - def is_this_type(self, filename: str): # bool function, - pass # returns false if Youtube history + @staticmethod + def _fix_unquoted_values(line: str) -> str: + """Attempts to fix unquoted values by adding quotes around them.""" + import re - def get_video(self, index: int): - pass + def replacer(match): + key, value = match.groups() + if not (value.startswith('"') and value.endswith('"')): + value = f'"{value}"' # Add quotes around the value + return f'"{key}":{value}' + + fixed_line = re.sub(r'"(\w+)":(\w+)', replacer, line) + return fixed_line + + @staticmethod + def _to_video(entry) -> Video: + return Video( + id=entry["videoId"], + title=entry["title"], + description=entry["description"], + watch_time=entry["timeWatched"], + watch_progress=entry["watchProgress"], + ) + + def __len__(self): + return len(self._parsed_data) + + def is_this_type(self, filename: str) -> bool: + raise NotImplementedError() + + def get_video(self, index: int) -> Video: + return FreeTubeHistory._to_video(self._parsed_data[index]) def __iter__(self): - pass - + for entry in self._parsed_data: + yield FreeTubeHistory._to_video(entry)