A coding implementation to explore and analyze the TaskTrove dataset with streaming parsing visualization and validator detection

by ai-intensify
0 comments
A coding implementation to explore and analyze the TaskTrove dataset with streaming parsing visualization and validator detection

filename_counter: Counter = Counter()
all_json_keys:    Counter = Counter()
samples_for_show: List = ()


for i, row in enumerate(tqdm(ds_test, desc="inspecting structure", total=200)):
   if i >= 200:
       break
   p = parse_task(row("task_binary"))
   if p("format") in ("tar", "zip"):
       for name, body in p("files").items():
           filename_counter(name) += 1
           if name.endswith(".json") and isinstance(body, str):
               try:
                   obj = json.loads(body)
                   if isinstance(obj, dict):
                       for k in obj.keys():
                           all_json_keys(k) += 1
               except Exception:
                   pass
       if len(samples_for_show) < 2:
           samples_for_show.append((row("path"), p))


print("nMost common filenames inside task archives:")
for name, n in filename_counter.most_common(15):
   print(f"  {n:>4}  {name}")


print("nMost common top-level JSON keys (across any *.json):")
for k, n in all_json_keys.most_common(20):
   print(f"  {n:>4}  {k}")


if samples_for_show:
   print(f"nFull file listing for one sample task ({samples_for_show(0)(0)}):")
   for name, body in samples_for_show(0)(1)("files").items():
       sz = len(body) if isinstance(body, (str, bytes)) else 0
       print(f"  {name}  ({sz:,} B)")




VERIFIER_FILE_PATTERNS = ("verifier", "verify", "grader", "judge", "score", "eval")
VERIFIER_JSON_KEYS     = ("verifier", "verifier_config", "judge", "grader",
                         "rubric", "test_patch", "FAIL_TO_PASS", "tests")




def has_verifier(parsed: Dict(str, Any)) -> bool:
   """Detect verifiers via filename, JSON content, or both."""
   if parsed("format") not in ("tar", "zip"):
       c = parsed.get("content")
       if isinstance(c, dict):
           return any(k in c for k in VERIFIER_JSON_KEYS)
       return False


   files = parsed("files")


   for name in files:
       low = name.lower()
       if any(pat in low for pat in VERIFIER_FILE_PATTERNS):
           return True


   for name, body in files.items():
       if name.endswith((".json", ".yaml", ".yml")) and isinstance(body, str):
           try:
               obj = json.loads(body)
               if isinstance(obj, dict) and any(k in obj for k in VERIFIER_JSON_KEYS):
                   return True
           except Exception:
               pass
           low = body.lower()
           if "verifier" in low or "test_patch" in low:
               return True


   return False




class TaskTroveExplorer:
   """High-level interface to the open-thoughts/TaskTrove dataset."""


   def __init__(self, split: str = "test", dataset_id: str = DATASET_ID):
       self.dataset_id = dataset_id
       self.split = split
       self._ds = load_dataset(dataset_id, split=split, streaming=True)


   def iter(self, limit: Optional(int) = None,
            source_filter: Optional(str) = None) -> Iterator(Dict(str, Any)):
       rx = re.compile(source_filter) if source_filter else None
       n = 0
       for row in self._ds:
           if rx and not rx.search(source_of(row("path"))):
               continue
           yield row
           n += 1
           if limit is not None and n >= limit:
               return


   def sample(self, n: int = 5,
              source_filter: Optional(str) = None) -> List(Dict(str, Any)):
       out = ()
       for row in self.iter(limit=n, source_filter=source_filter):
           parsed = parse_task(row("task_binary"))
           parsed("path") = row("path")
           parsed("source") = source_of(row("path"))
           out.append(parsed)
       return out


   def summary(self, limit: int = 1000,
               source_filter: Optional(str) = None) -> pd.DataFrame:
       rows = ()
       for row in self.iter(limit=limit, source_filter=source_filter):
           parsed = parse_task(row("task_binary"))
           rows.append({
               "source": source_of(row("path")),
               "compressed": parsed("compressed_size"),
               "raw": parsed("raw_size"),
               "format": parsed("format"),
               "n_files": len(parsed.get("files", {})),
               "has_verifier": has_verifier(parsed),
           })
       df = pd.DataFrame(rows)
       if df.empty:
           return df
       return (df.groupby("source")
                 .agg(n=("compressed", "count"),
                      mean_compressed_kb=("compressed", lambda s: s.mean()/1024),
                      mean_raw_kb=("raw",                lambda s: s.mean()/1024),
                      mean_n_files=("n_files", "mean"),
                      verifier_rate=("has_verifier", "mean"))
                 .round(2)
                 .sort_values("n", ascending=False))


   @staticmethod
   def has_verifier(parsed: Dict(str, Any)) -> bool:
       return has_verifier(parsed)


   def export(self, output_dir: Union(str, Path), n: int = 10,
              source_filter: Optional(str) = None) -> Path:
       output_dir = Path(output_dir)
       output_dir.mkdir(parents=True, exist_ok=True)
       for parsed in self.sample(n=n, source_filter=source_filter):
           slug = parsed("path").replace("/", "_")
           tdir = output_dir / slug
           tdir.mkdir(exist_ok=True)
           if parsed("format") in ("tar", "zip"):
               for name, body in parsed("files").items():
                   out = tdir / name
                   out.parent.mkdir(parents=True, exist_ok=True)
                   if isinstance(body, str):
                       out.write_text(body, encoding="utf-8")
                   else:
                       out.write_bytes(body)
           else:
               content = parsed.get("content", b"")
               if isinstance(content, (dict, list)):
                   (tdir / "task.json").write_text(json.dumps(content, indent=2))
               elif isinstance(content, str):
                   (tdir / "task.txt").write_text(content)
               else:
                   (tdir / "task.bin").write_bytes(content)
       print(f"✓ exported tasks to {output_dir.resolve()}")
       return output_dir




explorer = TaskTroveExplorer(split="test")


print("nSample of 3 parsed tasks:")
for s in explorer.sample(n=3):
   print(f"path: {s('path')} | source: {s('source')} | format: {s('format')} | "
         f"files: {len(s.get('files', {}))} | verifier: {has_verifier(s)}")

Related Articles

Leave a Comment