some improvements
This commit is contained in:
109
gitapi.py
109
gitapi.py
@@ -7,10 +7,8 @@ from objects import *
|
|||||||
|
|
||||||
def parse_user(user_line: str | None):
|
def parse_user(user_line: str | None):
|
||||||
if user_line is None: return None
|
if user_line is None: return None
|
||||||
|
|
||||||
name, user_line = user_line.split("<", maxsplit=1)
|
name, user_line = user_line.split("<", maxsplit=1)
|
||||||
email, user_line = user_line.split(">", maxsplit=1)
|
email, user_line = user_line.split(">", maxsplit=1)
|
||||||
|
|
||||||
epoch, user_line = user_line.split(maxsplit=1)
|
epoch, user_line = user_line.split(maxsplit=1)
|
||||||
zone = user_line.strip()
|
zone = user_line.strip()
|
||||||
hours = int(zone[1:3])
|
hours = int(zone[1:3])
|
||||||
@@ -25,18 +23,22 @@ def deparse_user(user: GitUser) -> str:
|
|||||||
timestamp = int(user.time.timestamp())
|
timestamp = int(user.time.timestamp())
|
||||||
|
|
||||||
offset = user.time.utcoffset()
|
offset = user.time.utcoffset()
|
||||||
if offset is None: tz_str = "+0000"
|
if user.time.tzinfo is None or user.time.utcoffset() is None:
|
||||||
else:
|
local_tz = datetime.now().astimezone().tzinfo
|
||||||
|
user.time = user.time.replace(tzinfo=local_tz)
|
||||||
|
offset = user.time.utcoffset()
|
||||||
|
assert offset
|
||||||
total_seconds = int(offset.total_seconds())
|
total_seconds = int(offset.total_seconds())
|
||||||
hours = abs(total_seconds) // 3600
|
hours = abs(total_seconds) // 3600
|
||||||
minutes = (abs(total_seconds) % 3600) // 60
|
minutes = (abs(total_seconds) % 3600) // 60
|
||||||
sign = '+' if total_seconds >= 0 else '-'
|
sign = '+' if total_seconds >= 0 else '-'
|
||||||
|
|
||||||
tz_str = f"{sign}{hours:02d}{minutes:02d}"
|
tz_str = f"{sign}{hours:02d}{minutes:02d}"
|
||||||
|
timestamp = int(user.time.timestamp())
|
||||||
return f"{user.name} <{user.email}> {timestamp} {tz_str}"
|
return f"{user.name} <{user.email}> {timestamp} {tz_str}"
|
||||||
|
|
||||||
class Git:
|
class Git:
|
||||||
def __init__(self, path: Path | str) -> None:
|
def __init__(self, path: Path | str) -> None: self.path = Path(path)
|
||||||
self.path = Path(path)
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def init(path: Path | str, initial_branch: str = "main", description: str = "Unnamed repo") -> "Git":
|
def init(path: Path | str, initial_branch: str = "main", description: str = "Unnamed repo") -> "Git":
|
||||||
@@ -79,9 +81,7 @@ class Git:
|
|||||||
def add_hook(self, name: str, hook: str) -> Path:
|
def add_hook(self, name: str, hook: str) -> Path:
|
||||||
path = Path(self.path, 'hooks', name)
|
path = Path(self.path, 'hooks', name)
|
||||||
path.write_text(hook)
|
path.write_text(hook)
|
||||||
if os.name != "nt":
|
if os.name != "nt": path.chmod(path.stat().st_mode | stat.S_IEXEC)
|
||||||
st = path.stat()
|
|
||||||
path.chmod(st.st_mode | stat.S_IEXEC)
|
|
||||||
return path
|
return path
|
||||||
|
|
||||||
def get_loose_heads(self):
|
def get_loose_heads(self):
|
||||||
@@ -91,6 +91,24 @@ class Git:
|
|||||||
for item in heads_dir.glob("*"):
|
for item in heads_dir.glob("*"):
|
||||||
if item.is_file(): out[str(item.relative_to(heads_dir))] = item.read_text().strip()
|
if item.is_file(): out[str(item.relative_to(heads_dir))] = item.read_text().strip()
|
||||||
return out
|
return out
|
||||||
|
def read_packed_refs(self) -> dict[str, str]:
|
||||||
|
packed_refs = Path(self.path, "packed-refs")
|
||||||
|
if not packed_refs.exists(): return {}
|
||||||
|
refs: dict[str, str] = {}
|
||||||
|
last_ref = None
|
||||||
|
with packed_refs.open("r", encoding="utf-8") as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if not line or line.startswith("#"): continue
|
||||||
|
if line.startswith("^"):
|
||||||
|
peeled = line[1:]
|
||||||
|
if last_ref: refs[last_ref + "^{}"] = peeled
|
||||||
|
continue
|
||||||
|
try: hash_str, refname = line.split(maxsplit=1)
|
||||||
|
except ValueError: continue
|
||||||
|
refs[refname] = hash_str
|
||||||
|
last_ref = refname
|
||||||
|
return refs
|
||||||
|
|
||||||
def get_loose_tags(self):
|
def get_loose_tags(self):
|
||||||
heads_dir = Path(self.path, "refs", "tags")
|
heads_dir = Path(self.path, "refs", "tags")
|
||||||
@@ -183,13 +201,12 @@ class Git:
|
|||||||
option_dict[key] = value
|
option_dict[key] = value
|
||||||
return Tag(sha1, option_dict.get("tag"), self.get_object(option_dict.get("object"), parent_limit, load_data), parse_user(option_dict.get("tagger")), message, data)
|
return Tag(sha1, option_dict.get("tag"), self.get_object(option_dict.get("object"), parent_limit, load_data), parse_user(option_dict.get("tagger")), message, data)
|
||||||
|
|
||||||
def get_object(self, sha1: str | None, parent_limit: int = 0, load_data: bool = True, look_in_packed: bool = True):
|
def get_object(self, sha1: str | None, parent_limit: int = 0, load_data: bool = True):
|
||||||
if sha1 is None: return None
|
if sha1 is None: return None
|
||||||
|
|
||||||
try: data = Path(self.path, "objects", sha1[:2], sha1[2:]).read_bytes()
|
try: data = Path(self.path, "objects", sha1[:2], sha1[2:]).read_bytes()
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
if look_in_packed: return self.get_packed_object(sha1, parent_limit, load_data, False)
|
return self._get_packed_object(sha1, parent_limit, load_data)
|
||||||
else: raise
|
|
||||||
data = zlib.decompress(data)
|
data = zlib.decompress(data)
|
||||||
if hashlib.sha1(data).hexdigest() != sha1: raise Exception("Corrupted object (sha1 discrepancy)")
|
if hashlib.sha1(data).hexdigest() != sha1: raise Exception("Corrupted object (sha1 discrepancy)")
|
||||||
prefix, data = data.split(b"\x00", maxsplit=1)
|
prefix, data = data.split(b"\x00", maxsplit=1)
|
||||||
@@ -295,7 +312,7 @@ class Git:
|
|||||||
if isinstance(ref_or_commit, Commit): head_file.write_text(ref_or_commit.hash + "\n")
|
if isinstance(ref_or_commit, Commit): head_file.write_text(ref_or_commit.hash + "\n")
|
||||||
else: head_file.write_text(f"ref: refs/heads/{ref_or_commit}\n")
|
else: head_file.write_text(f"ref: refs/heads/{ref_or_commit}\n")
|
||||||
|
|
||||||
def get_packed_object(self, sha1: str, parent_limit: int = 0, load_data: bool = True, look_in_unpacked: bool = True):
|
def _get_packed_object(self, sha1: str, parent_limit: int = 0, load_data: bool = True):
|
||||||
pack_dir = Path(self.path, "objects", "pack")
|
pack_dir = Path(self.path, "objects", "pack")
|
||||||
idx_file = next(pack_dir.glob("*.idx"), None)
|
idx_file = next(pack_dir.glob("*.idx"), None)
|
||||||
pack_file = next(pack_dir.glob("*.pack"), None)
|
pack_file = next(pack_dir.glob("*.pack"), None)
|
||||||
@@ -336,22 +353,13 @@ class Git:
|
|||||||
|
|
||||||
sha_bytes = bytes.fromhex(sha1)
|
sha_bytes = bytes.fromhex(sha1)
|
||||||
try: index = sha_list.index(sha_bytes)
|
try: index = sha_list.index(sha_bytes)
|
||||||
except ValueError:
|
except ValueError: raise Exception("Object not found")
|
||||||
if look_in_unpacked: return self.get_object(sha1, parent_limit, load_data, False)
|
|
||||||
else: raise Exception("Object not found")
|
|
||||||
|
|
||||||
offset = offsets[index]
|
return self._read_pack_object(memoryview(pack_file.read_bytes()), offsets[index], sha1, sha_list, offsets, parent_limit, load_data)
|
||||||
pack_data = memoryview(pack_file.read_bytes())
|
|
||||||
|
|
||||||
return self._read_pack_object(pack_data, offset, sha1, sha_list, offsets,
|
def _read_pack_object(self, pack_data: memoryview, offset, expected_sha1, sha_list, offsets, parent_limit, load_data):
|
||||||
parent_limit, load_data, look_in_unpacked)
|
if pack_data[:4] != b"PACK": raise Exception("Corrupted .pack (missing PACK header)")
|
||||||
|
if pack_data[-20:].hex() != hashlib.sha1(pack_data[:-20]).hexdigest(): raise Exception("Corrupted .pack (wrong checksum)")
|
||||||
def _read_pack_object(self, pack_data: memoryview, offset, expected_sha1, sha_list, offsets,
|
|
||||||
parent_limit, load_data, look_in_unpacked):
|
|
||||||
if pack_data[:4] != b"PACK":
|
|
||||||
raise Exception("Corrupted .pack (missing PACK header)")
|
|
||||||
if pack_data[-20:].hex() != hashlib.sha1(pack_data[:-20]).hexdigest():
|
|
||||||
raise Exception("Corrupted .pack (wrong checksum)")
|
|
||||||
|
|
||||||
pos = offset
|
pos = offset
|
||||||
c = pack_data[pos]
|
c = pack_data[pos]
|
||||||
@@ -395,31 +403,25 @@ class Git:
|
|||||||
|
|
||||||
if obj_type_str == "ref_delta":
|
if obj_type_str == "ref_delta":
|
||||||
assert base_ref
|
assert base_ref
|
||||||
base_obj = self.get_packed_object(base_ref, 0, True, look_in_unpacked)
|
base_obj = self.get_object(base_ref, 0, False)
|
||||||
if not base_obj:
|
if not base_obj: raise Exception(f"Base object {base_ref} not found for delta")
|
||||||
raise Exception(f"Base object {base_ref} not found for delta")
|
|
||||||
obj_type_str = base_obj.__class__.__name__.lower()
|
obj_type_str = base_obj.__class__.__name__.lower()
|
||||||
raw = self._apply_delta(base_obj.bytes, raw)
|
raw = self._apply_delta(base_obj.bytes, raw)
|
||||||
elif obj_type_str == "ofs_delta":
|
elif obj_type_str == "ofs_delta":
|
||||||
base_obj = self._read_pack_object(pack_data, base_offset, None, sha_list,
|
base_obj = self._read_pack_object(pack_data, base_offset, None, sha_list, offsets, 0, True)
|
||||||
offsets, 0, True, look_in_unpacked)
|
|
||||||
obj_type_str = base_obj.__class__.__name__.lower()
|
obj_type_str = base_obj.__class__.__name__.lower()
|
||||||
raw = self._apply_delta(base_obj.bytes, raw)
|
raw = self._apply_delta(base_obj.bytes, raw)
|
||||||
|
|
||||||
if expected_sha1:
|
if expected_sha1:
|
||||||
computed_hash = hashlib.sha1(f"{obj_type_str} {len(raw)}\0".encode() + raw).hexdigest()
|
computed_hash = hashlib.sha1(f"{obj_type_str} {len(raw)}\0".encode() + raw).hexdigest()
|
||||||
if computed_hash != expected_sha1:
|
if computed_hash != expected_sha1: raise Exception("Hash doesn't match")
|
||||||
raise Exception("Hash doesn't match")
|
|
||||||
else: computed_hash = None
|
else: computed_hash = None
|
||||||
|
|
||||||
return self._parse_object(expected_sha1 or computed_hash, GitObjectType(obj_type_str), raw, parent_limit, load_data)
|
return self._parse_object(expected_sha1 or computed_hash, GitObjectType(obj_type_str), raw, parent_limit, load_data)
|
||||||
|
|
||||||
def _apply_delta(self, base_data: bytes, delta_data: bytes) -> bytes:
|
def _apply_delta(self, base_data: bytes, delta_data: bytes) -> bytes:
|
||||||
"""Apply delta instructions to reconstruct an object from its base."""
|
pos = base_size = shift = result_size = 0
|
||||||
pos = 0
|
|
||||||
|
|
||||||
base_size = 0
|
|
||||||
shift = 0
|
|
||||||
while True:
|
while True:
|
||||||
c = delta_data[pos]
|
c = delta_data[pos]
|
||||||
pos += 1
|
pos += 1
|
||||||
@@ -427,7 +429,6 @@ class Git:
|
|||||||
if not (c & 0x80): break
|
if not (c & 0x80): break
|
||||||
shift += 7
|
shift += 7
|
||||||
|
|
||||||
result_size = 0
|
|
||||||
shift = 0
|
shift = 0
|
||||||
while True:
|
while True:
|
||||||
c = delta_data[pos]
|
c = delta_data[pos]
|
||||||
@@ -442,8 +443,7 @@ class Git:
|
|||||||
pos += 1
|
pos += 1
|
||||||
|
|
||||||
if cmd & 0x80:
|
if cmd & 0x80:
|
||||||
copy_offset = 0
|
copy_offset = copy_size = 0
|
||||||
copy_size = 0
|
|
||||||
|
|
||||||
if cmd & 0x01:
|
if cmd & 0x01:
|
||||||
copy_offset = delta_data[pos]
|
copy_offset = delta_data[pos]
|
||||||
@@ -468,7 +468,6 @@ class Git:
|
|||||||
copy_size |= delta_data[pos] << 16
|
copy_size |= delta_data[pos] << 16
|
||||||
pos += 1
|
pos += 1
|
||||||
|
|
||||||
# Size 0 means 0x10000
|
|
||||||
if copy_size == 0: copy_size = 0x10000
|
if copy_size == 0: copy_size = 0x10000
|
||||||
result.extend(base_data[copy_offset:copy_offset + copy_size])
|
result.extend(base_data[copy_offset:copy_offset + copy_size])
|
||||||
else:
|
else:
|
||||||
@@ -477,37 +476,14 @@ class Git:
|
|||||||
pos += cmd
|
pos += cmd
|
||||||
|
|
||||||
if len(result) != result_size: raise Exception(f"Delta application size mismatch: expected {result_size}, got {len(result)}")
|
if len(result) != result_size: raise Exception(f"Delta application size mismatch: expected {result_size}, got {len(result)}")
|
||||||
|
|
||||||
return bytes(result)
|
return bytes(result)
|
||||||
def read_packed_refs(self) -> dict[str, str]:
|
|
||||||
packed_refs = Path(self.path, "packed-refs")
|
|
||||||
if not packed_refs.exists(): return {}
|
|
||||||
refs: dict[str, str] = {}
|
|
||||||
last_ref = None
|
|
||||||
with packed_refs.open("r", encoding="utf-8") as f:
|
|
||||||
for line in f:
|
|
||||||
line = line.strip()
|
|
||||||
if not line or line.startswith("#"):
|
|
||||||
continue
|
|
||||||
if line.startswith("^"):
|
|
||||||
peeled = line[1:]
|
|
||||||
if last_ref:
|
|
||||||
refs[last_ref + "^{}"] = peeled
|
|
||||||
continue
|
|
||||||
|
|
||||||
try: hash_str, refname = line.split(maxsplit=1)
|
|
||||||
except ValueError: continue
|
|
||||||
refs[refname] = hash_str
|
|
||||||
last_ref = refname
|
|
||||||
return refs
|
|
||||||
|
|
||||||
def write_packed_ref(self, refname: str, hash_str: str, peeled: str | None = None) -> None:
|
def write_packed_ref(self, refname: str, hash_str: str, peeled: str | None = None) -> None:
|
||||||
packed_refs = Path(self.path, "packed-refs")
|
packed_refs = Path(self.path, "packed-refs")
|
||||||
|
|
||||||
refs = self.read_packed_refs()
|
refs = self.read_packed_refs()
|
||||||
refs[refname] = hash_str
|
refs[refname] = hash_str
|
||||||
if peeled:
|
if peeled: refs[refname + "^{}"] = peeled
|
||||||
refs[refname + "^{}"] = peeled
|
|
||||||
|
|
||||||
lines = ["# pack-refs with: peeled fully-peeled"]
|
lines = ["# pack-refs with: peeled fully-peeled"]
|
||||||
for name, value in refs.items():
|
for name, value in refs.items():
|
||||||
@@ -515,7 +491,6 @@ class Git:
|
|||||||
lines.append(f"{value} {name}")
|
lines.append(f"{value} {name}")
|
||||||
peeled_hash = refs.get(name + "^{}")
|
peeled_hash = refs.get(name + "^{}")
|
||||||
if peeled_hash: lines.append(f"^{peeled_hash}")
|
if peeled_hash: lines.append(f"^{peeled_hash}")
|
||||||
|
|
||||||
packed_refs.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
packed_refs.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
||||||
|
|
||||||
def remove_packed_ref(self, refname: str) -> None:
|
def remove_packed_ref(self, refname: str) -> None:
|
||||||
|
|||||||
Reference in New Issue
Block a user