From 4d7d59ee9356fae9d8a209a0182d4d2587c8540f Mon Sep 17 00:00:00 2001 From: KubaPro010 <132459354+KubaPro010@users.noreply.github.com> Date: Fri, 14 Nov 2025 22:47:39 +0100 Subject: [PATCH] some improvements --- gitapi.py | 119 +++++++++++++++++++++--------------------------------- test2.py | 7 ++++ 2 files changed, 54 insertions(+), 72 deletions(-) create mode 100644 test2.py diff --git a/gitapi.py b/gitapi.py index 41fd085..691803e 100644 --- a/gitapi.py +++ b/gitapi.py @@ -7,10 +7,8 @@ from objects import * def parse_user(user_line: str | None): if user_line is None: return None - name, user_line = user_line.split("<", maxsplit=1) email, user_line = user_line.split(">", maxsplit=1) - epoch, user_line = user_line.split(maxsplit=1) zone = user_line.strip() hours = int(zone[1:3]) @@ -25,18 +23,22 @@ def deparse_user(user: GitUser) -> str: timestamp = int(user.time.timestamp()) offset = user.time.utcoffset() - if offset is None: tz_str = "+0000" - else: - total_seconds = int(offset.total_seconds()) - hours = abs(total_seconds) // 3600 - minutes = (abs(total_seconds) % 3600) // 60 - sign = '+' if total_seconds >= 0 else '-' - tz_str = f"{sign}{hours:02d}{minutes:02d}" + if user.time.tzinfo is None or user.time.utcoffset() is None: + local_tz = datetime.now().astimezone().tzinfo + user.time = user.time.replace(tzinfo=local_tz) + offset = user.time.utcoffset() + assert offset + total_seconds = int(offset.total_seconds()) + hours = abs(total_seconds) // 3600 + minutes = (abs(total_seconds) % 3600) // 60 + sign = '+' if total_seconds >= 0 else '-' + + tz_str = f"{sign}{hours:02d}{minutes:02d}" + timestamp = int(user.time.timestamp()) return f"{user.name} <{user.email}> {timestamp} {tz_str}" class Git: - def __init__(self, path: Path | str) -> None: - self.path = Path(path) + def __init__(self, path: Path | str) -> None: self.path = Path(path) @staticmethod def init(path: Path | str, initial_branch: str = "main", description: str = "Unnamed repo") -> "Git": @@ -79,9 +81,7 @@ class Git: def add_hook(self, name: str, hook: str) -> Path: path = Path(self.path, 'hooks', name) path.write_text(hook) - if os.name != "nt": - st = path.stat() - path.chmod(st.st_mode | stat.S_IEXEC) + if os.name != "nt": path.chmod(path.stat().st_mode | stat.S_IEXEC) return path def get_loose_heads(self): @@ -91,6 +91,24 @@ class Git: for item in heads_dir.glob("*"): if item.is_file(): out[str(item.relative_to(heads_dir))] = item.read_text().strip() return out + def read_packed_refs(self) -> dict[str, str]: + packed_refs = Path(self.path, "packed-refs") + if not packed_refs.exists(): return {} + refs: dict[str, str] = {} + last_ref = None + with packed_refs.open("r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): continue + if line.startswith("^"): + peeled = line[1:] + if last_ref: refs[last_ref + "^{}"] = peeled + continue + try: hash_str, refname = line.split(maxsplit=1) + except ValueError: continue + refs[refname] = hash_str + last_ref = refname + return refs def get_loose_tags(self): heads_dir = Path(self.path, "refs", "tags") @@ -183,13 +201,12 @@ class Git: option_dict[key] = value return Tag(sha1, option_dict.get("tag"), self.get_object(option_dict.get("object"), parent_limit, load_data), parse_user(option_dict.get("tagger")), message, data) - def get_object(self, sha1: str | None, parent_limit: int = 0, load_data: bool = True, look_in_packed: bool = True): + def get_object(self, sha1: str | None, parent_limit: int = 0, load_data: bool = True): if sha1 is None: return None try: data = Path(self.path, "objects", sha1[:2], sha1[2:]).read_bytes() except FileNotFoundError: - if look_in_packed: return self.get_packed_object(sha1, parent_limit, load_data, False) - else: raise + return self._get_packed_object(sha1, parent_limit, load_data) data = zlib.decompress(data) if hashlib.sha1(data).hexdigest() != sha1: raise Exception("Corrupted object (sha1 discrepancy)") prefix, data = data.split(b"\x00", maxsplit=1) @@ -295,7 +312,7 @@ class Git: if isinstance(ref_or_commit, Commit): head_file.write_text(ref_or_commit.hash + "\n") else: head_file.write_text(f"ref: refs/heads/{ref_or_commit}\n") - def get_packed_object(self, sha1: str, parent_limit: int = 0, load_data: bool = True, look_in_unpacked: bool = True): + def _get_packed_object(self, sha1: str, parent_limit: int = 0, load_data: bool = True): pack_dir = Path(self.path, "objects", "pack") idx_file = next(pack_dir.glob("*.idx"), None) pack_file = next(pack_dir.glob("*.pack"), None) @@ -336,22 +353,13 @@ class Git: sha_bytes = bytes.fromhex(sha1) try: index = sha_list.index(sha_bytes) - except ValueError: - if look_in_unpacked: return self.get_object(sha1, parent_limit, load_data, False) - else: raise Exception("Object not found") + except ValueError: raise Exception("Object not found") - offset = offsets[index] - pack_data = memoryview(pack_file.read_bytes()) + return self._read_pack_object(memoryview(pack_file.read_bytes()), offsets[index], sha1, sha_list, offsets, parent_limit, load_data) - return self._read_pack_object(pack_data, offset, sha1, sha_list, offsets, - parent_limit, load_data, look_in_unpacked) - - def _read_pack_object(self, pack_data: memoryview, offset, expected_sha1, sha_list, offsets, - parent_limit, load_data, look_in_unpacked): - if pack_data[:4] != b"PACK": - raise Exception("Corrupted .pack (missing PACK header)") - if pack_data[-20:].hex() != hashlib.sha1(pack_data[:-20]).hexdigest(): - raise Exception("Corrupted .pack (wrong checksum)") + def _read_pack_object(self, pack_data: memoryview, offset, expected_sha1, sha_list, offsets, parent_limit, load_data): + if pack_data[:4] != b"PACK": raise Exception("Corrupted .pack (missing PACK header)") + if pack_data[-20:].hex() != hashlib.sha1(pack_data[:-20]).hexdigest(): raise Exception("Corrupted .pack (wrong checksum)") pos = offset c = pack_data[pos] @@ -395,31 +403,25 @@ class Git: if obj_type_str == "ref_delta": assert base_ref - base_obj = self.get_packed_object(base_ref, 0, True, look_in_unpacked) - if not base_obj: - raise Exception(f"Base object {base_ref} not found for delta") + base_obj = self.get_object(base_ref, 0, False) + if not base_obj: raise Exception(f"Base object {base_ref} not found for delta") obj_type_str = base_obj.__class__.__name__.lower() raw = self._apply_delta(base_obj.bytes, raw) elif obj_type_str == "ofs_delta": - base_obj = self._read_pack_object(pack_data, base_offset, None, sha_list, - offsets, 0, True, look_in_unpacked) + base_obj = self._read_pack_object(pack_data, base_offset, None, sha_list, offsets, 0, True) obj_type_str = base_obj.__class__.__name__.lower() raw = self._apply_delta(base_obj.bytes, raw) if expected_sha1: computed_hash = hashlib.sha1(f"{obj_type_str} {len(raw)}\0".encode() + raw).hexdigest() - if computed_hash != expected_sha1: - raise Exception("Hash doesn't match") + if computed_hash != expected_sha1: raise Exception("Hash doesn't match") else: computed_hash = None return self._parse_object(expected_sha1 or computed_hash, GitObjectType(obj_type_str), raw, parent_limit, load_data) def _apply_delta(self, base_data: bytes, delta_data: bytes) -> bytes: - """Apply delta instructions to reconstruct an object from its base.""" - pos = 0 + pos = base_size = shift = result_size = 0 - base_size = 0 - shift = 0 while True: c = delta_data[pos] pos += 1 @@ -427,7 +429,6 @@ class Git: if not (c & 0x80): break shift += 7 - result_size = 0 shift = 0 while True: c = delta_data[pos] @@ -442,8 +443,7 @@ class Git: pos += 1 if cmd & 0x80: - copy_offset = 0 - copy_size = 0 + copy_offset = copy_size = 0 if cmd & 0x01: copy_offset = delta_data[pos] @@ -468,7 +468,6 @@ class Git: copy_size |= delta_data[pos] << 16 pos += 1 - # Size 0 means 0x10000 if copy_size == 0: copy_size = 0x10000 result.extend(base_data[copy_offset:copy_offset + copy_size]) else: @@ -477,37 +476,14 @@ class Git: pos += cmd if len(result) != result_size: raise Exception(f"Delta application size mismatch: expected {result_size}, got {len(result)}") - return bytes(result) - def read_packed_refs(self) -> dict[str, str]: - packed_refs = Path(self.path, "packed-refs") - if not packed_refs.exists(): return {} - refs: dict[str, str] = {} - last_ref = None - with packed_refs.open("r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if not line or line.startswith("#"): - continue - if line.startswith("^"): - peeled = line[1:] - if last_ref: - refs[last_ref + "^{}"] = peeled - continue - - try: hash_str, refname = line.split(maxsplit=1) - except ValueError: continue - refs[refname] = hash_str - last_ref = refname - return refs def write_packed_ref(self, refname: str, hash_str: str, peeled: str | None = None) -> None: packed_refs = Path(self.path, "packed-refs") refs = self.read_packed_refs() refs[refname] = hash_str - if peeled: - refs[refname + "^{}"] = peeled + if peeled: refs[refname + "^{}"] = peeled lines = ["# pack-refs with: peeled fully-peeled"] for name, value in refs.items(): @@ -515,7 +491,6 @@ class Git: lines.append(f"{value} {name}") peeled_hash = refs.get(name + "^{}") if peeled_hash: lines.append(f"^{peeled_hash}") - packed_refs.write_text("\n".join(lines) + "\n", encoding="utf-8") def remove_packed_ref(self, refname: str) -> None: diff --git a/test2.py b/test2.py new file mode 100644 index 0000000..8ec69d0 --- /dev/null +++ b/test2.py @@ -0,0 +1,7 @@ +from gitapi import * + +repo = Git(r"C:\Users\Kuba\AppData\Local\Temp\git\new") +head, _ = repo.get_head() +print(head) +commit = repo.get_object(head, load_data=True) +print(commit.tree.files) \ No newline at end of file