commit c3df0f26a9d07d7aea662cd68c7a88ec13aadee1 Author: KubaPro010 <132459354+KubaPro010@users.noreply.github.com> Date: Fri Nov 14 22:22:34 2025 +0100 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7bfcd7a --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.pyc \ No newline at end of file diff --git a/gitapi.py b/gitapi.py new file mode 100644 index 0000000..41fd085 --- /dev/null +++ b/gitapi.py @@ -0,0 +1,538 @@ +from pathlib import Path +import stat, os +import zlib, struct +from datetime import datetime, timezone, timedelta +import hashlib +from objects import * + +def parse_user(user_line: str | None): + if user_line is None: return None + + name, user_line = user_line.split("<", maxsplit=1) + email, user_line = user_line.split(">", maxsplit=1) + + epoch, user_line = user_line.split(maxsplit=1) + zone = user_line.strip() + hours = int(zone[1:3]) + minutes = int(zone[3:]) + + offset = timedelta(hours=hours, minutes=minutes) + if zone[0] == "-": offset = -offset + return GitUser(name.strip(), email, datetime.fromtimestamp(int(epoch), timezone(offset))) + +def deparse_user(user: GitUser) -> str: + """Convert GitUser back to Git format: 'Name timestamp timezone'""" + timestamp = int(user.time.timestamp()) + + offset = user.time.utcoffset() + if offset is None: tz_str = "+0000" + else: + total_seconds = int(offset.total_seconds()) + hours = abs(total_seconds) // 3600 + minutes = (abs(total_seconds) % 3600) // 60 + sign = '+' if total_seconds >= 0 else '-' + tz_str = f"{sign}{hours:02d}{minutes:02d}" + return f"{user.name} <{user.email}> {timestamp} {tz_str}" + +class Git: + def __init__(self, path: Path | str) -> None: + self.path = Path(path) + + @staticmethod + def init(path: Path | str, initial_branch: str = "main", description: str = "Unnamed repo") -> "Git": + path = Path(path) + + path.mkdir(exist_ok=True) + + config_file = Path(path, "config") + config = "[core]\n" + config += f"\trepositoryformatversion = 0\n" + config += f"\tbare = true\n" + if os.name == "nt": config += f"\tignorecase = true\n" + else: config += f"\tfilemode = true\n" + config_file.touch(exist_ok=False) + config_file.write_text(config + "\n") + + Path(path, "hooks").mkdir() + + Path(path, "info").mkdir() + Path(path, "info", "exclude").touch() + + Path(path, "objects").mkdir() + Path(path, "objects", "info").mkdir() + Path(path, "objects", "pack").mkdir() + + Path(path, "refs").mkdir() + Path(path, "refs", "heads").mkdir() + Path(path, "refs", "tags").mkdir() + + description_file = Path(path, "description") + description_file.touch() + description_file.write_text(description + "\n") + + head_file = Path(path, "HEAD") + head_file.touch() + head_file.write_text(f"ref: refs/heads/{initial_branch}\n") + + return Git(path) + + def add_hook(self, name: str, hook: str) -> Path: + path = Path(self.path, 'hooks', name) + path.write_text(hook) + if os.name != "nt": + st = path.stat() + path.chmod(st.st_mode | stat.S_IEXEC) + return path + + def get_loose_heads(self): + heads_dir = Path(self.path, "refs", "heads") + if not heads_dir.exists(): return [] + out = {} + for item in heads_dir.glob("*"): + if item.is_file(): out[str(item.relative_to(heads_dir))] = item.read_text().strip() + return out + + def get_loose_tags(self): + heads_dir = Path(self.path, "refs", "tags") + if not heads_dir.exists(): return [] + out = {} + for item in heads_dir.glob("*"): + if item.is_file(): out[str(item.relative_to(heads_dir))] = item.read_text().strip() + return out + + def get_head(self, follow_ref: bool = True): + head_file = Path(self.path, "HEAD") + if not head_file.exists(): return None, False + + data = head_file.read_text().strip() + + if not data.startswith("ref: "): return data, True + + ref_path = data.removeprefix("ref: ").strip() + if not follow_ref: return ref_path.split("/")[-1], False + + ref_file = Path(self.path, ref_path) + if ref_file.exists(): return ref_file.read_text().strip(), False + + packed_refs = Path(self.path, "packed-refs") + if packed_refs.exists(): + with packed_refs.open("r", encoding="utf-8") as f: + for line in f: + if not line or line.startswith("#") or line.startswith("^"): continue + try: hash_str, name = line.strip().split(maxsplit=1) + except ValueError: continue + if name == ref_path: return hash_str, False + + return None, False + + def _parse_object(self, sha1: str | None, object: GitObjectType, data: bytes, parent_limit: int = 0, load_data: bool = True): + if sha1 is None: return None + match object: + case GitObjectType.COMMIT: + datamsgsplit = data.decode().split("\n\n", maxsplit=1) + message, options = datamsgsplit[1].strip(), datamsgsplit[0].splitlines() + option_dict = {} + for option in options: + if option.strip(): + key, value = option.split(maxsplit=1) + option_dict[key] = value + parent = self.get_object(option_dict.get("parent"), parent_limit-1, load_data) if parent_limit != 0 else None + if not isinstance(parent, Commit): parent = None + + return Commit(sha1, self.get_object(option_dict.get("tree"), parent_limit-1, load_data), parent, parse_user(option_dict.get("author")), parse_user(option_dict.get("committer")), message, data) + case GitObjectType.TREE: + files: list[GitFile] = [] + parser_mode = 0 + temp = b"" + temp_file = GitFile(0, "", "", b"") + for byte in data: + if parser_mode == 0: + if byte != ord(" "): temp += bytes([byte]) + else: + temp_file.mode = int(temp.decode(), 8) + temp = b"" + parser_mode = 1 + continue + elif parser_mode == 1: + # filename + if byte != 0x0: temp += bytes([byte]) + else: + temp_file.filename = temp.decode() + temp = b"" + parser_mode = 2 + continue + elif parser_mode == 2: + temp += bytes([byte]) + if len(temp) == 20: + temp_file.data_hash = temp.hex() + temp_file.data = self.get_object(temp.hex(), parent_limit, load_data) if load_data else None # type: ignore + temp = b"" + files.append(temp_file) + temp_file = GitFile(0, "", "", b"") + parser_mode = 0 + return Tree(sha1, files, data) + case GitObjectType.BLOB: + return Blob(sha1, data) + case GitObjectType.TAG: + datamsgsplit = data.decode().split("\n\n", maxsplit=1) + message, options = datamsgsplit[1].strip(), datamsgsplit[0].splitlines() + option_dict = {} + for option in options: + if option.strip(): + key, value = option.split(maxsplit=1) + option_dict[key] = value + return Tag(sha1, option_dict.get("tag"), self.get_object(option_dict.get("object"), parent_limit, load_data), parse_user(option_dict.get("tagger")), message, data) + + def get_object(self, sha1: str | None, parent_limit: int = 0, load_data: bool = True, look_in_packed: bool = True): + if sha1 is None: return None + + try: data = Path(self.path, "objects", sha1[:2], sha1[2:]).read_bytes() + except FileNotFoundError: + if look_in_packed: return self.get_packed_object(sha1, parent_limit, load_data, False) + else: raise + data = zlib.decompress(data) + if hashlib.sha1(data).hexdigest() != sha1: raise Exception("Corrupted object (sha1 discrepancy)") + prefix, data = data.split(b"\x00", maxsplit=1) + object_type, data_size = prefix.decode().split(maxsplit=1) + if len(data) != int(data_size): raise Exception("Corrupted object (size incorrect)") + + return self._parse_object(sha1, GitObjectType(object_type), data, parent_limit, load_data) + + def create_blob(self, data: bytes) -> str: + file_content = b"blob " + str(len(data)).encode() + b"\x00" + data + + hash = hashlib.sha1(file_content).hexdigest() + + object_file = Path(self.path, "objects", hash[:2], hash[2:]) + object_file.parent.mkdir(exist_ok=True) # make sure that the lower object dir exists + + if object_file.exists(): return hash + object_file.write_bytes(zlib.compress(file_content)) + return hash + + def create_tree(self, files: list[GitFile]) -> str: + tree_data = b"" + for file in sorted(files, key=lambda f: f.filename): # Git sorts entries + tree_data += f"{file.mode:o} ".encode() # octal mode + tree_data += file.filename.encode() + b"\x00" + tree_data += bytes.fromhex(file.data_hash) + + file_content = b"tree " + str(len(tree_data)).encode() + b"\x00" + tree_data + hash = hashlib.sha1(file_content).hexdigest() + + object_file = Path(self.path, "objects", hash[:2], hash[2:]) + object_file.parent.mkdir(exist_ok=True) + if object_file.exists(): return hash + object_file.write_bytes(zlib.compress(file_content)) + return hash + + def create_commit(self, message: str, tree: Tree, author: GitUser, committer: GitUser, parent_commit: Commit | None): + out = f"tree {tree.hash}\n" + if parent_commit: out += f"parent {parent_commit.hash}\n" + out += f"author {deparse_user(author)}\n" + out += f"committer {deparse_user(committer)}\n" + out += f"\n{message}\n" + file_content = b"commit " + str(len(out.encode())).encode() + b"\x00" + out.encode() + hash = hashlib.sha1(file_content).hexdigest() + + object_file = Path(self.path, "objects", hash[:2], hash[2:]) + object_file.parent.mkdir(exist_ok=True) + if object_file.exists(): return hash + object_file.write_bytes(zlib.compress(file_content)) + return hash + + def create_annotated_tag(self, message: str, object: Commit | Tree | Tag | Blob | None, name: str, tagger: GitUser): + if object is None: return None + + out = f"object {object.hash}\n" + out += f"type {object.__class__.__name__.lower().strip()}\n" + out += f"tag {name}\n" + out += f"tagger {deparse_user(tagger)}\n" + out += f"\n{message}\n" + file_content = b"tag " + str(len(out.encode())).encode() + b"\x00" + out.encode() + hash = hashlib.sha1(file_content).hexdigest() + + object_file = Path(self.path, "objects", hash[:2], hash[2:]) + object_file.parent.mkdir(exist_ok=True) + if object_file.exists(): return hash + object_file.write_bytes(zlib.compress(file_content)) + return hash + + def set_unpacked_tag(self, name: str, hash: str): + tag_file = Path(self.path, "refs", "tags", name) + tag_file.parent.mkdir(parents=True, exist_ok=True) + tag_file.write_text(hash + "\n") + + def remove_unpacked_tag(self, name: str, annotated: bool = False): + tag_file = Path(self.path, "refs", "tags", name) + if not tag_file.exists(): + raise Exception(f"Tag '{name}' does not exist") + + if annotated: + tag_hash = tag_file.read_text().strip() + obj_path = Path(self.path, "objects", tag_hash[:2], tag_hash[2:]) + if obj_path.exists(): + obj_path.unlink() + + tag_file.unlink() + + def set_unpacked_ref(self, name: str, commit_hash: str): + branch_file = Path(self.path, "refs", "heads", name) + branch_file.parent.mkdir(parents=True, exist_ok=True) + branch_file.write_text(commit_hash + "\n") + + def remove_unpacked_ref(self, name: str): + head, detached_head = self.get_head(follow_ref=False) + if not detached_head and head == name: raise Exception(f"Cannot delete ref '{name}' while it is as HEAD") + + branch_file = Path(self.path, "refs", "heads", name) + if not branch_file.exists(): raise Exception(f"Branch '{name}' does not exist") + + branch_file.unlink() + + def set_head(self, ref_or_commit: str | Commit): + head_file = Path(self.path, "HEAD") + if isinstance(ref_or_commit, Commit): head_file.write_text(ref_or_commit.hash + "\n") + else: head_file.write_text(f"ref: refs/heads/{ref_or_commit}\n") + + def get_packed_object(self, sha1: str, parent_limit: int = 0, load_data: bool = True, look_in_unpacked: bool = True): + pack_dir = Path(self.path, "objects", "pack") + idx_file = next(pack_dir.glob("*.idx"), None) + pack_file = next(pack_dir.glob("*.pack"), None) + if not idx_file or not pack_file: return None + + data = idx_file.read_bytes() + if data[:4] == b'\xfftOc': + version = struct.unpack(">I", data[4:8])[0] + if version != 2: raise Exception(f"Unsupported .idx version {version}") + data = memoryview(data[8:]) + else: + raise Exception("Unsupported idx format (v1 not handled)") + + fanout = struct.unpack(">256I", data[:1024]) + data = data[1024:] + + nobjects = fanout[-1] + + sha_list = [data[i*20:(i+1)*20].tobytes() for i in range(nobjects)] + data = data[nobjects * 20:] + data = data[nobjects * 4:] + offsets_32 = list(struct.unpack(f">{nobjects}I", data[:nobjects * 4])) + data = data[nobjects * 4:] + large_indices = [i for i, off in enumerate(offsets_32) if off & 0x80000000] + if large_indices: + num_large = len(large_indices) + if len(data) < num_large * 8: raise Exception("Corrupted idx (missing large offsets table)") + large_offsets = list(struct.unpack(f">{num_large}Q", data[:num_large * 8])) + data = data[num_large * 8:] + + li = 0 + for i, off in enumerate(offsets_32): + if off & 0x80000000: + offsets_32[i] = large_offsets[li] + li += 1 + offsets = offsets_32 + data = data[nobjects * 4:] + + sha_bytes = bytes.fromhex(sha1) + try: index = sha_list.index(sha_bytes) + except ValueError: + if look_in_unpacked: return self.get_object(sha1, parent_limit, load_data, False) + else: raise Exception("Object not found") + + offset = offsets[index] + pack_data = memoryview(pack_file.read_bytes()) + + return self._read_pack_object(pack_data, offset, sha1, sha_list, offsets, + parent_limit, load_data, look_in_unpacked) + + def _read_pack_object(self, pack_data: memoryview, offset, expected_sha1, sha_list, offsets, + parent_limit, load_data, look_in_unpacked): + if pack_data[:4] != b"PACK": + raise Exception("Corrupted .pack (missing PACK header)") + if pack_data[-20:].hex() != hashlib.sha1(pack_data[:-20]).hexdigest(): + raise Exception("Corrupted .pack (wrong checksum)") + + pos = offset + c = pack_data[pos] + pos += 1 + obj_type = (c >> 4) & 0x7 + size = c & 0x0F + shift = 4 + while c & 0x80: + c = pack_data[pos] + pos += 1 + size |= (c & 0x7F) << shift + shift += 7 + + obj_type_str = OBJ_TYPES.get(obj_type, f"unknown({obj_type})") + base_ref = None + base_offset = None + + if obj_type_str == "ref_delta": + base_ref = bytes(pack_data[pos:pos + 20]).hex() + pos += 20 + elif obj_type_str == "ofs_delta": + c = pack_data[pos] + pos += 1 + base_offset = c & 0x7F + while c & 0x80: + c = pack_data[pos] + pos += 1 + base_offset = ((base_offset + 1) << 7) | (c & 0x7F) + base_offset = offset - base_offset + + d = zlib.decompressobj() + raw = b"" + while True: + chunk = pack_data[pos:pos+1] + pos += 1 + if not chunk: break + raw += d.decompress(chunk) + if d.eof: + pos -= len(d.unused_data) + break + + if obj_type_str == "ref_delta": + assert base_ref + base_obj = self.get_packed_object(base_ref, 0, True, look_in_unpacked) + if not base_obj: + raise Exception(f"Base object {base_ref} not found for delta") + obj_type_str = base_obj.__class__.__name__.lower() + raw = self._apply_delta(base_obj.bytes, raw) + elif obj_type_str == "ofs_delta": + base_obj = self._read_pack_object(pack_data, base_offset, None, sha_list, + offsets, 0, True, look_in_unpacked) + obj_type_str = base_obj.__class__.__name__.lower() + raw = self._apply_delta(base_obj.bytes, raw) + + if expected_sha1: + computed_hash = hashlib.sha1(f"{obj_type_str} {len(raw)}\0".encode() + raw).hexdigest() + if computed_hash != expected_sha1: + raise Exception("Hash doesn't match") + else: computed_hash = None + + return self._parse_object(expected_sha1 or computed_hash, GitObjectType(obj_type_str), raw, parent_limit, load_data) + + def _apply_delta(self, base_data: bytes, delta_data: bytes) -> bytes: + """Apply delta instructions to reconstruct an object from its base.""" + pos = 0 + + base_size = 0 + shift = 0 + while True: + c = delta_data[pos] + pos += 1 + base_size |= (c & 0x7F) << shift + if not (c & 0x80): break + shift += 7 + + result_size = 0 + shift = 0 + while True: + c = delta_data[pos] + pos += 1 + result_size |= (c & 0x7F) << shift + if not (c & 0x80): break + shift += 7 + + result = bytearray() + while pos < len(delta_data): + cmd = delta_data[pos] + pos += 1 + + if cmd & 0x80: + copy_offset = 0 + copy_size = 0 + + if cmd & 0x01: + copy_offset = delta_data[pos] + pos += 1 + if cmd & 0x02: + copy_offset |= delta_data[pos] << 8 + pos += 1 + if cmd & 0x04: + copy_offset |= delta_data[pos] << 16 + pos += 1 + if cmd & 0x08: + copy_offset |= delta_data[pos] << 24 + pos += 1 + + if cmd & 0x10: + copy_size = delta_data[pos] + pos += 1 + if cmd & 0x20: + copy_size |= delta_data[pos] << 8 + pos += 1 + if cmd & 0x40: + copy_size |= delta_data[pos] << 16 + pos += 1 + + # Size 0 means 0x10000 + if copy_size == 0: copy_size = 0x10000 + result.extend(base_data[copy_offset:copy_offset + copy_size]) + else: + if cmd == 0: raise Exception("Invalid delta instruction") + result.extend(delta_data[pos:pos + cmd]) + pos += cmd + + if len(result) != result_size: raise Exception(f"Delta application size mismatch: expected {result_size}, got {len(result)}") + + return bytes(result) + def read_packed_refs(self) -> dict[str, str]: + packed_refs = Path(self.path, "packed-refs") + if not packed_refs.exists(): return {} + refs: dict[str, str] = {} + last_ref = None + with packed_refs.open("r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if line.startswith("^"): + peeled = line[1:] + if last_ref: + refs[last_ref + "^{}"] = peeled + continue + + try: hash_str, refname = line.split(maxsplit=1) + except ValueError: continue + refs[refname] = hash_str + last_ref = refname + return refs + + def write_packed_ref(self, refname: str, hash_str: str, peeled: str | None = None) -> None: + packed_refs = Path(self.path, "packed-refs") + + refs = self.read_packed_refs() + refs[refname] = hash_str + if peeled: + refs[refname + "^{}"] = peeled + + lines = ["# pack-refs with: peeled fully-peeled"] + for name, value in refs.items(): + if name.endswith("^{}"): continue + lines.append(f"{value} {name}") + peeled_hash = refs.get(name + "^{}") + if peeled_hash: lines.append(f"^{peeled_hash}") + + packed_refs.write_text("\n".join(lines) + "\n", encoding="utf-8") + + def remove_packed_ref(self, refname: str) -> None: + packed_refs = Path(self.path, "packed-refs") + if not packed_refs.exists(): + raise Exception("No packed-refs file found") + + refs = self.read_packed_refs() + if refname not in refs: raise Exception(f"Ref '{refname}' not found in packed-refs") + + refs.pop(refname, None) + refs.pop(refname + "^{}", None) + + lines = ["# pack-refs with: peeled fully-peeled"] + for name, value in refs.items(): + if name.endswith("^{}"): continue + lines.append(f"{value} {name}") + peeled_hash = refs.get(name + "^{}") + if peeled_hash: lines.append(f"^{peeled_hash}") + packed_refs.write_text("\n".join(lines) + "\n", encoding="utf-8") \ No newline at end of file diff --git a/objects.py b/objects.py new file mode 100644 index 0000000..05ec140 --- /dev/null +++ b/objects.py @@ -0,0 +1,61 @@ +from datetime import datetime +from dataclasses import dataclass +from enum import StrEnum + +OBJ_TYPES = { + 1: "commit", + 2: "tree", + 3: "blob", + 4: "tag", + 6: "ofs_delta", + 7: "ref_delta" +} + +@dataclass +class GitUser: + name: str + email: str + time: datetime + +class GitObjectType(StrEnum): + COMMIT = "commit" + TREE = "tree" + BLOB = "blob" + TAG = "tag" + +@dataclass +class GitFile: + mode: int + filename: str + data_hash: str + data: bytes | None = None + +@dataclass +class Tree: + hash: str + files: list[GitFile] + bytes: "bytes | None" = None + +@dataclass +class Commit: + hash: str + tree: Tree | None + parent: "Commit | None" + author: GitUser | None + committer: GitUser | None + message: str + bytes: "bytes" + +@dataclass +class Blob: + hash: str + bytes: "bytes" + +@dataclass +class Tag: + hash: str + name: str | None + object: "Commit | Tree | bytes | Tag | None" + tagger: GitUser | None + message: str + bytes: "bytes" \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..d7cf4bd --- /dev/null +++ b/test.py @@ -0,0 +1,12 @@ +from gitapi import * + +repo = Git.init(r"C:\Users\Kuba\AppData\Local\Temp\git\new") +blob_hash = repo.create_blob(b"testcontent") +tree = [GitFile(33188, "testfile", blob_hash)] +tree_hash = repo.create_tree(tree) +user = GitUser("Hacker123", "hacker123@example.com", datetime.now()) +commit_hash = repo.create_commit("testcommit", Tree(tree_hash, tree), user, user, None) +repo.set_unpacked_ref("main", commit_hash) +tag_hash = repo.create_annotated_tag("Test tag", repo.get_object(commit_hash), "test", user) +if tag_hash: repo.set_unpacked_tag("test", tag_hash) +print(blob_hash, tree_hash, commit_hash, tag_hash)