1
def normalize(data):
return {"text": (data.get("text") or "").strip().lower()}
def tokenize(data):
text = data.get("text", "")
cleaned = "".join(c if (c.isalnum() or c.isspace()) else " " for c in text)
tokens = (t for t in cleaned.split() if t)
return {"tokens": tokens, "count": len(tokens)}
def sentiment(data):
toks = data.get("tokens", ())
pos = sum(t in POSITIVE for t in toks)
neg = sum(t in NEGATIVE for t in toks)
score = pos - neg
label = "positive" if score > 0 else "negative" if score < 0 else "neutral"
return {"label": label, "score": score, "pos": pos, "neg": neg}
def keywords(data):
toks = data.get("tokens", ())
stop = {"the","a","an","is","it","to","of","and","in","for","on","how"}
freq = Counter(t for t in toks if t not in stop and len
return {"keywords": freq.most_common(data.get("top_n", 5))}
def analyze(data):
norm = worker.trigger({"function_id": "text::normalize", "payload": {"text": data.get("text","")}})
toks = worker.trigger({"function_id": "text::tokenize", "payload": norm})
sent = worker.trigger({"function_id": "text::sentiment", "payload": toks})
keys = worker.trigger({"function_id": "text::keywords", "payload": {**toks, "top_n": data.get("top_n", 5)}})
with _LOCK:
_STATE("docs_analyzed") += 1
for k, c in keys("keywords"):
_STATE("keyword_totals")(k) += c
n = _STATE("docs_analyzed")
return {"tokens": toks("count"), "sentiment": sent, "keywords": keys("keywords"), "docs_analyzed": n}
def report(data):
with _LOCK:
return {"docs_analyzed": _STATE("docs_analyzed"),
"heartbeats": _STATE("heartbeats"),
"top_keywords_all_docs": _STATE("keyword_totals").most_common(5)}
def http_analyze(data):
body = data.get("body") or {}
result = worker.trigger({"function_id": "pipeline::analyze", "payload": body})
return {"status_code": 200, "body": result, "headers": {"Content-Type": "application/json"}}
def heartbeat(data):
with _LOCK:
_STATE("heartbeats") += 1
return {"ok": True}
for fid, fn in (
("text::normalize", normalize), ("text::tokenize", tokenize),
("text::sentiment", sentiment), ("text::keywords", keywords),
("pipeline::analyze", analyze), ("stats::report", report),
("http::analyze", http_analyze), ("cron::heartbeat", heartbeat),
):
worker.register_function(fid, fn)