538 lines
21 KiB
Python
538 lines
21 KiB
Python
from pathlib import Path
|
|
import stat, os
|
|
import zlib, struct
|
|
from datetime import datetime, timezone, timedelta
|
|
import hashlib
|
|
from objects import *
|
|
|
|
def parse_user(user_line: str | None):
|
|
if user_line is None: return None
|
|
|
|
name, user_line = user_line.split("<", maxsplit=1)
|
|
email, user_line = user_line.split(">", maxsplit=1)
|
|
|
|
epoch, user_line = user_line.split(maxsplit=1)
|
|
zone = user_line.strip()
|
|
hours = int(zone[1:3])
|
|
minutes = int(zone[3:])
|
|
|
|
offset = timedelta(hours=hours, minutes=minutes)
|
|
if zone[0] == "-": offset = -offset
|
|
return GitUser(name.strip(), email, datetime.fromtimestamp(int(epoch), timezone(offset)))
|
|
|
|
def deparse_user(user: GitUser) -> str:
|
|
"""Convert GitUser back to Git format: 'Name <email> timestamp timezone'"""
|
|
timestamp = int(user.time.timestamp())
|
|
|
|
offset = user.time.utcoffset()
|
|
if offset is None: tz_str = "+0000"
|
|
else:
|
|
total_seconds = int(offset.total_seconds())
|
|
hours = abs(total_seconds) // 3600
|
|
minutes = (abs(total_seconds) % 3600) // 60
|
|
sign = '+' if total_seconds >= 0 else '-'
|
|
tz_str = f"{sign}{hours:02d}{minutes:02d}"
|
|
return f"{user.name} <{user.email}> {timestamp} {tz_str}"
|
|
|
|
class Git:
|
|
def __init__(self, path: Path | str) -> None:
|
|
self.path = Path(path)
|
|
|
|
@staticmethod
|
|
def init(path: Path | str, initial_branch: str = "main", description: str = "Unnamed repo") -> "Git":
|
|
path = Path(path)
|
|
|
|
path.mkdir(exist_ok=True)
|
|
|
|
config_file = Path(path, "config")
|
|
config = "[core]\n"
|
|
config += f"\trepositoryformatversion = 0\n"
|
|
config += f"\tbare = true\n"
|
|
if os.name == "nt": config += f"\tignorecase = true\n"
|
|
else: config += f"\tfilemode = true\n"
|
|
config_file.touch(exist_ok=False)
|
|
config_file.write_text(config + "\n")
|
|
|
|
Path(path, "hooks").mkdir()
|
|
|
|
Path(path, "info").mkdir()
|
|
Path(path, "info", "exclude").touch()
|
|
|
|
Path(path, "objects").mkdir()
|
|
Path(path, "objects", "info").mkdir()
|
|
Path(path, "objects", "pack").mkdir()
|
|
|
|
Path(path, "refs").mkdir()
|
|
Path(path, "refs", "heads").mkdir()
|
|
Path(path, "refs", "tags").mkdir()
|
|
|
|
description_file = Path(path, "description")
|
|
description_file.touch()
|
|
description_file.write_text(description + "\n")
|
|
|
|
head_file = Path(path, "HEAD")
|
|
head_file.touch()
|
|
head_file.write_text(f"ref: refs/heads/{initial_branch}\n")
|
|
|
|
return Git(path)
|
|
|
|
def add_hook(self, name: str, hook: str) -> Path:
|
|
path = Path(self.path, 'hooks', name)
|
|
path.write_text(hook)
|
|
if os.name != "nt":
|
|
st = path.stat()
|
|
path.chmod(st.st_mode | stat.S_IEXEC)
|
|
return path
|
|
|
|
def get_loose_heads(self):
|
|
heads_dir = Path(self.path, "refs", "heads")
|
|
if not heads_dir.exists(): return []
|
|
out = {}
|
|
for item in heads_dir.glob("*"):
|
|
if item.is_file(): out[str(item.relative_to(heads_dir))] = item.read_text().strip()
|
|
return out
|
|
|
|
def get_loose_tags(self):
|
|
heads_dir = Path(self.path, "refs", "tags")
|
|
if not heads_dir.exists(): return []
|
|
out = {}
|
|
for item in heads_dir.glob("*"):
|
|
if item.is_file(): out[str(item.relative_to(heads_dir))] = item.read_text().strip()
|
|
return out
|
|
|
|
def get_head(self, follow_ref: bool = True):
|
|
head_file = Path(self.path, "HEAD")
|
|
if not head_file.exists(): return None, False
|
|
|
|
data = head_file.read_text().strip()
|
|
|
|
if not data.startswith("ref: "): return data, True
|
|
|
|
ref_path = data.removeprefix("ref: ").strip()
|
|
if not follow_ref: return ref_path.split("/")[-1], False
|
|
|
|
ref_file = Path(self.path, ref_path)
|
|
if ref_file.exists(): return ref_file.read_text().strip(), False
|
|
|
|
packed_refs = Path(self.path, "packed-refs")
|
|
if packed_refs.exists():
|
|
with packed_refs.open("r", encoding="utf-8") as f:
|
|
for line in f:
|
|
if not line or line.startswith("#") or line.startswith("^"): continue
|
|
try: hash_str, name = line.strip().split(maxsplit=1)
|
|
except ValueError: continue
|
|
if name == ref_path: return hash_str, False
|
|
|
|
return None, False
|
|
|
|
def _parse_object(self, sha1: str | None, object: GitObjectType, data: bytes, parent_limit: int = 0, load_data: bool = True):
|
|
if sha1 is None: return None
|
|
match object:
|
|
case GitObjectType.COMMIT:
|
|
datamsgsplit = data.decode().split("\n\n", maxsplit=1)
|
|
message, options = datamsgsplit[1].strip(), datamsgsplit[0].splitlines()
|
|
option_dict = {}
|
|
for option in options:
|
|
if option.strip():
|
|
key, value = option.split(maxsplit=1)
|
|
option_dict[key] = value
|
|
parent = self.get_object(option_dict.get("parent"), parent_limit-1, load_data) if parent_limit != 0 else None
|
|
if not isinstance(parent, Commit): parent = None
|
|
|
|
return Commit(sha1, self.get_object(option_dict.get("tree"), parent_limit-1, load_data), parent, parse_user(option_dict.get("author")), parse_user(option_dict.get("committer")), message, data)
|
|
case GitObjectType.TREE:
|
|
files: list[GitFile] = []
|
|
parser_mode = 0
|
|
temp = b""
|
|
temp_file = GitFile(0, "", "", b"")
|
|
for byte in data:
|
|
if parser_mode == 0:
|
|
if byte != ord(" "): temp += bytes([byte])
|
|
else:
|
|
temp_file.mode = int(temp.decode(), 8)
|
|
temp = b""
|
|
parser_mode = 1
|
|
continue
|
|
elif parser_mode == 1:
|
|
# filename
|
|
if byte != 0x0: temp += bytes([byte])
|
|
else:
|
|
temp_file.filename = temp.decode()
|
|
temp = b""
|
|
parser_mode = 2
|
|
continue
|
|
elif parser_mode == 2:
|
|
temp += bytes([byte])
|
|
if len(temp) == 20:
|
|
temp_file.data_hash = temp.hex()
|
|
temp_file.data = self.get_object(temp.hex(), parent_limit, load_data) if load_data else None # type: ignore
|
|
temp = b""
|
|
files.append(temp_file)
|
|
temp_file = GitFile(0, "", "", b"")
|
|
parser_mode = 0
|
|
return Tree(sha1, files, data)
|
|
case GitObjectType.BLOB:
|
|
return Blob(sha1, data)
|
|
case GitObjectType.TAG:
|
|
datamsgsplit = data.decode().split("\n\n", maxsplit=1)
|
|
message, options = datamsgsplit[1].strip(), datamsgsplit[0].splitlines()
|
|
option_dict = {}
|
|
for option in options:
|
|
if option.strip():
|
|
key, value = option.split(maxsplit=1)
|
|
option_dict[key] = value
|
|
return Tag(sha1, option_dict.get("tag"), self.get_object(option_dict.get("object"), parent_limit, load_data), parse_user(option_dict.get("tagger")), message, data)
|
|
|
|
def get_object(self, sha1: str | None, parent_limit: int = 0, load_data: bool = True, look_in_packed: bool = True):
|
|
if sha1 is None: return None
|
|
|
|
try: data = Path(self.path, "objects", sha1[:2], sha1[2:]).read_bytes()
|
|
except FileNotFoundError:
|
|
if look_in_packed: return self.get_packed_object(sha1, parent_limit, load_data, False)
|
|
else: raise
|
|
data = zlib.decompress(data)
|
|
if hashlib.sha1(data).hexdigest() != sha1: raise Exception("Corrupted object (sha1 discrepancy)")
|
|
prefix, data = data.split(b"\x00", maxsplit=1)
|
|
object_type, data_size = prefix.decode().split(maxsplit=1)
|
|
if len(data) != int(data_size): raise Exception("Corrupted object (size incorrect)")
|
|
|
|
return self._parse_object(sha1, GitObjectType(object_type), data, parent_limit, load_data)
|
|
|
|
def create_blob(self, data: bytes) -> str:
|
|
file_content = b"blob " + str(len(data)).encode() + b"\x00" + data
|
|
|
|
hash = hashlib.sha1(file_content).hexdigest()
|
|
|
|
object_file = Path(self.path, "objects", hash[:2], hash[2:])
|
|
object_file.parent.mkdir(exist_ok=True) # make sure that the lower object dir exists
|
|
|
|
if object_file.exists(): return hash
|
|
object_file.write_bytes(zlib.compress(file_content))
|
|
return hash
|
|
|
|
def create_tree(self, files: list[GitFile]) -> str:
|
|
tree_data = b""
|
|
for file in sorted(files, key=lambda f: f.filename): # Git sorts entries
|
|
tree_data += f"{file.mode:o} ".encode() # octal mode
|
|
tree_data += file.filename.encode() + b"\x00"
|
|
tree_data += bytes.fromhex(file.data_hash)
|
|
|
|
file_content = b"tree " + str(len(tree_data)).encode() + b"\x00" + tree_data
|
|
hash = hashlib.sha1(file_content).hexdigest()
|
|
|
|
object_file = Path(self.path, "objects", hash[:2], hash[2:])
|
|
object_file.parent.mkdir(exist_ok=True)
|
|
if object_file.exists(): return hash
|
|
object_file.write_bytes(zlib.compress(file_content))
|
|
return hash
|
|
|
|
def create_commit(self, message: str, tree: Tree, author: GitUser, committer: GitUser, parent_commit: Commit | None):
|
|
out = f"tree {tree.hash}\n"
|
|
if parent_commit: out += f"parent {parent_commit.hash}\n"
|
|
out += f"author {deparse_user(author)}\n"
|
|
out += f"committer {deparse_user(committer)}\n"
|
|
out += f"\n{message}\n"
|
|
file_content = b"commit " + str(len(out.encode())).encode() + b"\x00" + out.encode()
|
|
hash = hashlib.sha1(file_content).hexdigest()
|
|
|
|
object_file = Path(self.path, "objects", hash[:2], hash[2:])
|
|
object_file.parent.mkdir(exist_ok=True)
|
|
if object_file.exists(): return hash
|
|
object_file.write_bytes(zlib.compress(file_content))
|
|
return hash
|
|
|
|
def create_annotated_tag(self, message: str, object: Commit | Tree | Tag | Blob | None, name: str, tagger: GitUser):
|
|
if object is None: return None
|
|
|
|
out = f"object {object.hash}\n"
|
|
out += f"type {object.__class__.__name__.lower().strip()}\n"
|
|
out += f"tag {name}\n"
|
|
out += f"tagger {deparse_user(tagger)}\n"
|
|
out += f"\n{message}\n"
|
|
file_content = b"tag " + str(len(out.encode())).encode() + b"\x00" + out.encode()
|
|
hash = hashlib.sha1(file_content).hexdigest()
|
|
|
|
object_file = Path(self.path, "objects", hash[:2], hash[2:])
|
|
object_file.parent.mkdir(exist_ok=True)
|
|
if object_file.exists(): return hash
|
|
object_file.write_bytes(zlib.compress(file_content))
|
|
return hash
|
|
|
|
def set_unpacked_tag(self, name: str, hash: str):
|
|
tag_file = Path(self.path, "refs", "tags", name)
|
|
tag_file.parent.mkdir(parents=True, exist_ok=True)
|
|
tag_file.write_text(hash + "\n")
|
|
|
|
def remove_unpacked_tag(self, name: str, annotated: bool = False):
|
|
tag_file = Path(self.path, "refs", "tags", name)
|
|
if not tag_file.exists():
|
|
raise Exception(f"Tag '{name}' does not exist")
|
|
|
|
if annotated:
|
|
tag_hash = tag_file.read_text().strip()
|
|
obj_path = Path(self.path, "objects", tag_hash[:2], tag_hash[2:])
|
|
if obj_path.exists():
|
|
obj_path.unlink()
|
|
|
|
tag_file.unlink()
|
|
|
|
def set_unpacked_ref(self, name: str, commit_hash: str):
|
|
branch_file = Path(self.path, "refs", "heads", name)
|
|
branch_file.parent.mkdir(parents=True, exist_ok=True)
|
|
branch_file.write_text(commit_hash + "\n")
|
|
|
|
def remove_unpacked_ref(self, name: str):
|
|
head, detached_head = self.get_head(follow_ref=False)
|
|
if not detached_head and head == name: raise Exception(f"Cannot delete ref '{name}' while it is as HEAD")
|
|
|
|
branch_file = Path(self.path, "refs", "heads", name)
|
|
if not branch_file.exists(): raise Exception(f"Branch '{name}' does not exist")
|
|
|
|
branch_file.unlink()
|
|
|
|
def set_head(self, ref_or_commit: str | Commit):
|
|
head_file = Path(self.path, "HEAD")
|
|
if isinstance(ref_or_commit, Commit): head_file.write_text(ref_or_commit.hash + "\n")
|
|
else: head_file.write_text(f"ref: refs/heads/{ref_or_commit}\n")
|
|
|
|
def get_packed_object(self, sha1: str, parent_limit: int = 0, load_data: bool = True, look_in_unpacked: bool = True):
|
|
pack_dir = Path(self.path, "objects", "pack")
|
|
idx_file = next(pack_dir.glob("*.idx"), None)
|
|
pack_file = next(pack_dir.glob("*.pack"), None)
|
|
if not idx_file or not pack_file: return None
|
|
|
|
data = idx_file.read_bytes()
|
|
if data[:4] == b'\xfftOc':
|
|
version = struct.unpack(">I", data[4:8])[0]
|
|
if version != 2: raise Exception(f"Unsupported .idx version {version}")
|
|
data = memoryview(data[8:])
|
|
else:
|
|
raise Exception("Unsupported idx format (v1 not handled)")
|
|
|
|
fanout = struct.unpack(">256I", data[:1024])
|
|
data = data[1024:]
|
|
|
|
nobjects = fanout[-1]
|
|
|
|
sha_list = [data[i*20:(i+1)*20].tobytes() for i in range(nobjects)]
|
|
data = data[nobjects * 20:]
|
|
data = data[nobjects * 4:]
|
|
offsets_32 = list(struct.unpack(f">{nobjects}I", data[:nobjects * 4]))
|
|
data = data[nobjects * 4:]
|
|
large_indices = [i for i, off in enumerate(offsets_32) if off & 0x80000000]
|
|
if large_indices:
|
|
num_large = len(large_indices)
|
|
if len(data) < num_large * 8: raise Exception("Corrupted idx (missing large offsets table)")
|
|
large_offsets = list(struct.unpack(f">{num_large}Q", data[:num_large * 8]))
|
|
data = data[num_large * 8:]
|
|
|
|
li = 0
|
|
for i, off in enumerate(offsets_32):
|
|
if off & 0x80000000:
|
|
offsets_32[i] = large_offsets[li]
|
|
li += 1
|
|
offsets = offsets_32
|
|
data = data[nobjects * 4:]
|
|
|
|
sha_bytes = bytes.fromhex(sha1)
|
|
try: index = sha_list.index(sha_bytes)
|
|
except ValueError:
|
|
if look_in_unpacked: return self.get_object(sha1, parent_limit, load_data, False)
|
|
else: raise Exception("Object not found")
|
|
|
|
offset = offsets[index]
|
|
pack_data = memoryview(pack_file.read_bytes())
|
|
|
|
return self._read_pack_object(pack_data, offset, sha1, sha_list, offsets,
|
|
parent_limit, load_data, look_in_unpacked)
|
|
|
|
def _read_pack_object(self, pack_data: memoryview, offset, expected_sha1, sha_list, offsets,
|
|
parent_limit, load_data, look_in_unpacked):
|
|
if pack_data[:4] != b"PACK":
|
|
raise Exception("Corrupted .pack (missing PACK header)")
|
|
if pack_data[-20:].hex() != hashlib.sha1(pack_data[:-20]).hexdigest():
|
|
raise Exception("Corrupted .pack (wrong checksum)")
|
|
|
|
pos = offset
|
|
c = pack_data[pos]
|
|
pos += 1
|
|
obj_type = (c >> 4) & 0x7
|
|
size = c & 0x0F
|
|
shift = 4
|
|
while c & 0x80:
|
|
c = pack_data[pos]
|
|
pos += 1
|
|
size |= (c & 0x7F) << shift
|
|
shift += 7
|
|
|
|
obj_type_str = OBJ_TYPES.get(obj_type, f"unknown({obj_type})")
|
|
base_ref = None
|
|
base_offset = None
|
|
|
|
if obj_type_str == "ref_delta":
|
|
base_ref = bytes(pack_data[pos:pos + 20]).hex()
|
|
pos += 20
|
|
elif obj_type_str == "ofs_delta":
|
|
c = pack_data[pos]
|
|
pos += 1
|
|
base_offset = c & 0x7F
|
|
while c & 0x80:
|
|
c = pack_data[pos]
|
|
pos += 1
|
|
base_offset = ((base_offset + 1) << 7) | (c & 0x7F)
|
|
base_offset = offset - base_offset
|
|
|
|
d = zlib.decompressobj()
|
|
raw = b""
|
|
while True:
|
|
chunk = pack_data[pos:pos+1]
|
|
pos += 1
|
|
if not chunk: break
|
|
raw += d.decompress(chunk)
|
|
if d.eof:
|
|
pos -= len(d.unused_data)
|
|
break
|
|
|
|
if obj_type_str == "ref_delta":
|
|
assert base_ref
|
|
base_obj = self.get_packed_object(base_ref, 0, True, look_in_unpacked)
|
|
if not base_obj:
|
|
raise Exception(f"Base object {base_ref} not found for delta")
|
|
obj_type_str = base_obj.__class__.__name__.lower()
|
|
raw = self._apply_delta(base_obj.bytes, raw)
|
|
elif obj_type_str == "ofs_delta":
|
|
base_obj = self._read_pack_object(pack_data, base_offset, None, sha_list,
|
|
offsets, 0, True, look_in_unpacked)
|
|
obj_type_str = base_obj.__class__.__name__.lower()
|
|
raw = self._apply_delta(base_obj.bytes, raw)
|
|
|
|
if expected_sha1:
|
|
computed_hash = hashlib.sha1(f"{obj_type_str} {len(raw)}\0".encode() + raw).hexdigest()
|
|
if computed_hash != expected_sha1:
|
|
raise Exception("Hash doesn't match")
|
|
else: computed_hash = None
|
|
|
|
return self._parse_object(expected_sha1 or computed_hash, GitObjectType(obj_type_str), raw, parent_limit, load_data)
|
|
|
|
def _apply_delta(self, base_data: bytes, delta_data: bytes) -> bytes:
|
|
"""Apply delta instructions to reconstruct an object from its base."""
|
|
pos = 0
|
|
|
|
base_size = 0
|
|
shift = 0
|
|
while True:
|
|
c = delta_data[pos]
|
|
pos += 1
|
|
base_size |= (c & 0x7F) << shift
|
|
if not (c & 0x80): break
|
|
shift += 7
|
|
|
|
result_size = 0
|
|
shift = 0
|
|
while True:
|
|
c = delta_data[pos]
|
|
pos += 1
|
|
result_size |= (c & 0x7F) << shift
|
|
if not (c & 0x80): break
|
|
shift += 7
|
|
|
|
result = bytearray()
|
|
while pos < len(delta_data):
|
|
cmd = delta_data[pos]
|
|
pos += 1
|
|
|
|
if cmd & 0x80:
|
|
copy_offset = 0
|
|
copy_size = 0
|
|
|
|
if cmd & 0x01:
|
|
copy_offset = delta_data[pos]
|
|
pos += 1
|
|
if cmd & 0x02:
|
|
copy_offset |= delta_data[pos] << 8
|
|
pos += 1
|
|
if cmd & 0x04:
|
|
copy_offset |= delta_data[pos] << 16
|
|
pos += 1
|
|
if cmd & 0x08:
|
|
copy_offset |= delta_data[pos] << 24
|
|
pos += 1
|
|
|
|
if cmd & 0x10:
|
|
copy_size = delta_data[pos]
|
|
pos += 1
|
|
if cmd & 0x20:
|
|
copy_size |= delta_data[pos] << 8
|
|
pos += 1
|
|
if cmd & 0x40:
|
|
copy_size |= delta_data[pos] << 16
|
|
pos += 1
|
|
|
|
# Size 0 means 0x10000
|
|
if copy_size == 0: copy_size = 0x10000
|
|
result.extend(base_data[copy_offset:copy_offset + copy_size])
|
|
else:
|
|
if cmd == 0: raise Exception("Invalid delta instruction")
|
|
result.extend(delta_data[pos:pos + cmd])
|
|
pos += cmd
|
|
|
|
if len(result) != result_size: raise Exception(f"Delta application size mismatch: expected {result_size}, got {len(result)}")
|
|
|
|
return bytes(result)
|
|
def read_packed_refs(self) -> dict[str, str]:
|
|
packed_refs = Path(self.path, "packed-refs")
|
|
if not packed_refs.exists(): return {}
|
|
refs: dict[str, str] = {}
|
|
last_ref = None
|
|
with packed_refs.open("r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
if line.startswith("^"):
|
|
peeled = line[1:]
|
|
if last_ref:
|
|
refs[last_ref + "^{}"] = peeled
|
|
continue
|
|
|
|
try: hash_str, refname = line.split(maxsplit=1)
|
|
except ValueError: continue
|
|
refs[refname] = hash_str
|
|
last_ref = refname
|
|
return refs
|
|
|
|
def write_packed_ref(self, refname: str, hash_str: str, peeled: str | None = None) -> None:
|
|
packed_refs = Path(self.path, "packed-refs")
|
|
|
|
refs = self.read_packed_refs()
|
|
refs[refname] = hash_str
|
|
if peeled:
|
|
refs[refname + "^{}"] = peeled
|
|
|
|
lines = ["# pack-refs with: peeled fully-peeled"]
|
|
for name, value in refs.items():
|
|
if name.endswith("^{}"): continue
|
|
lines.append(f"{value} {name}")
|
|
peeled_hash = refs.get(name + "^{}")
|
|
if peeled_hash: lines.append(f"^{peeled_hash}")
|
|
|
|
packed_refs.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
|
|
def remove_packed_ref(self, refname: str) -> None:
|
|
packed_refs = Path(self.path, "packed-refs")
|
|
if not packed_refs.exists():
|
|
raise Exception("No packed-refs file found")
|
|
|
|
refs = self.read_packed_refs()
|
|
if refname not in refs: raise Exception(f"Ref '{refname}' not found in packed-refs")
|
|
|
|
refs.pop(refname, None)
|
|
refs.pop(refname + "^{}", None)
|
|
|
|
lines = ["# pack-refs with: peeled fully-peeled"]
|
|
for name, value in refs.items():
|
|
if name.endswith("^{}"): continue
|
|
lines.append(f"{value} {name}")
|
|
peeled_hash = refs.get(name + "^{}")
|
|
if peeled_hash: lines.append(f"^{peeled_hash}")
|
|
packed_refs.write_text("\n".join(lines) + "\n", encoding="utf-8") |