3
def describe_dataset(dataset_name: str, tool_context: ToolContext) -> dict:
print(f"📊 Describing dataset: {dataset_name}")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"status": "error", "message": f"Dataset '{dataset_name}' not found"}
numeric_cols = df.select_dtypes(include=(np.number)).columns.tolist()
categorical_cols = df.select_dtypes(include=('object', 'category')).columns.tolist()
result = {
"status": "success",
"dataset": dataset_name,
"overview": {
"total_rows": int(len(df)),
"total_columns": int(len(df.columns)),
"numeric_columns": numeric_cols,
"categorical_columns": categorical_cols,
"memory_mb": round(float(df.memory_usage(deep=True).sum() / 1024 / 1024), 2),
"duplicate_rows": int(df.duplicated().sum()),
"missing_total": int(df.isnull().sum().sum())
}
}
if numeric_cols:
stats_dict = {}
for col in numeric_cols:
col_data = df(col).dropna()
if len(col_data) > 0:
stats_dict(col) = {
"count": int(len(col_data)),
"mean": round(float(col_data.mean()), 3),
"std": round(float(col_data.std()), 3),
"min": round(float(col_data.min()), 3),
"25%": round(float(col_data.quantile(0.25)), 3),
"50%": round(float(col_data.median()), 3),
"75%": round(float(col_data.quantile(0.75)), 3),
"max": round(float(col_data.max()), 3),
"skewness": round(float(col_data.skew()), 3),
"missing": int(df(col).isnull().sum())
}
result("numeric_summary") = stats_dict
if categorical_cols:
cat_dict = {}
for col in categorical_cols(:10):
vc = df(col).value_counts()
cat_dict(col) = {
"unique_values": int(df(col).nunique()),
"top_values": {str(k): int(v) for k, v in vc.head(5).items()},
"missing": int(df(col).isnull().sum())
}
result("categorical_summary") = cat_dict
DATA_STORE.log_analysis("describe", dataset_name, "Statistics generated")
return make_serializable(result)
def correlation_analysis(dataset_name: str, method: str = "pearson", tool_context: ToolContext = None) -> dict:
print(f"📊 Correlation analysis: {dataset_name} ({method})")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"status": "error", "message": f"Dataset '{dataset_name}' not found"}
numeric_df = df.select_dtypes(include=(np.number))
if numeric_df.shape(1) < 2:
return {"status": "error", "message": "Need at least 2 numeric columns"}
corr_matrix = numeric_df.corr(method=method)
strong_corrs = ()
for i in range(len(corr_matrix.columns)):
for j in range(i + 1, len(corr_matrix.columns)):
col1, col2 = corr_matrix.columns(i), corr_matrix.columns(j)
val = corr_matrix.iloc(i, j)
if abs(val) > 0.5:
strong_corrs.append({
"var1": col1,
"var2": col2,
"correlation": round(float(val), 3),
"strength": "strong" if abs(val) > 0.7 else "moderate"
})
strong_corrs.sort(key=lambda x: abs(x("correlation")), reverse=True)
corr_dict = {}
for col in corr_matrix.columns:
corr_dict(col) = {k: round(float(v), 3) for k, v in corr_matrix(col).items()}
DATA_STORE.log_analysis("correlation", dataset_name, f"{method} correlation")
return make_serializable({
"status": "success",
"method": method,
"correlation_matrix": corr_dict,
"strong_correlations": strong_corrs(:10),
"insight": f"Found {len(strong_corrs)} pairs with |correlation| > 0.5"
})
def hypothesis_test(dataset_name: str, test_type: str, column1: str,
column2: str = None, group_column: str = None,
tool_context: ToolContext = None) -> dict:
print(f"📊 Hypothesis test: {test_type} on {dataset_name}")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"status": "error", "message": f"Dataset '{dataset_name}' not found"}
if column1 not in df.columns:
return {"status": "error", "message": f"Column '{column1}' not found"}
try:
if test_type == "normality":
data = df(column1).dropna()
if len(data) > 5000:
data = data.sample(5000)
stat, p = stats.shapiro(data)
return make_serializable({
"status": "success",
"test": "Shapiro-Wilk Normality Test",
"column": column1,
"statistic": round(float(stat), 4),
"p_value": round(float(p), 6),
"is_normal": bool(p > 0.05),
"interpretation": "Data appears normally distributed" if p > 0.05 else "Data is NOT normally distributed"
})
elif test_type == "ttest":
if group_column is None:
return {"status": "error", "message": "group_column required for t-test"}
groups = df(group_column).dropna().unique()
if len(groups) != 2:
return {"status": "error", "message": f"T-test needs exactly 2 groups, found {len(groups)}: {list(groups)}"}
g1 = df(df(group_column) == groups(0))(column1).dropna()
g2 = df(df(group_column) == groups(1))(column1).dropna()
stat, p = stats.ttest_ind(g1, g2)
return make_serializable({
"status": "success",
"test": "Independent Samples T-Test",
"comparing": column1,
"group1": {"name": str(groups(0)), "mean": round(float(g1.mean()), 3), "n": int(len(g1))},
"group2": {"name": str(groups(1)), "mean": round(float(g2.mean()), 3), "n": int(len(g2))},
"t_statistic": round(float(stat), 4),
"p_value": round(float(p), 6),
"significant": bool(p < 0.05),
"interpretation": "Significant difference" if p < 0.05 else "No significant difference"
})
elif test_type == "anova":
if group_column is None:
return {"status": "error", "message": "group_column required for ANOVA"}
groups_data = (grp(column1).dropna().values for _, grp in df.groupby(group_column))
group_names = list(df(group_column).unique())
stat, p = stats.f_oneway(*groups_data)
group_stats = ()
for name in group_names:
grp_data = df(df(group_column) == name)(column1).dropna()
group_stats.append({
"group": str(name),
"mean": round(float(grp_data.mean()), 3),
"std": round(float(grp_data.std()), 3),
"n": int(len(grp_data))
})
return make_serializable({
"status": "success",
"test": "One-Way ANOVA",
"comparing": column1,
"across": group_column,
"n_groups": int(len(group_names)),
"group_statistics": group_stats,
"f_statistic": round(float(stat), 4),
"p_value": round(float(p), 6),
"significant": bool(p < 0.05),
"interpretation": "Significant differences among groups" if p < 0.05 else "No significant differences"
})
elif test_type == "chi2":
if column2 is None:
return {"status": "error", "message": "column2 required for chi-square test"}
contingency = pd.crosstab(df(column1), df(column2))
chi2, p, dof, _ = stats.chi2_contingency(contingency)
return make_serializable({
"status": "success",
"test": "Chi-Square Test of Independence",
"variables": (column1, column2),
"chi2_statistic": round(float(chi2), 4),
"p_value": round(float(p), 6),
"degrees_of_freedom": int(dof),
"significant": bool(p < 0.05),
"interpretation": "Variables are dependent" if p < 0.05 else "Variables are independent"
})
else:
return {"status": "error", "message": f"Unknown test: {test_type}. Use: normality, ttest, anova, chi2"}
except Exception as e:
return {"status": "error", "message": f"Test failed: {str(e)}"}
def outlier_detection(dataset_name: str, column: str, method: str = "iqr",
tool_context: ToolContext = None) -> dict:
print(f"📊 Outlier detection: {column} in {dataset_name}")
df = DATA_STORE.get_dataset(dataset_name)
if df is None:
return {"status": "error", "message": f"Dataset '{dataset_name}' not found"}
if column not in df.columns:
return {"status": "error", "message": f"Column '{column}' not found"}
data = df(column).dropna()
if method == "iqr":
Q1 = float(data.quantile(0.25))
Q3 = float(data.quantile(0.75))
IQR = Q3 - Q1
lower = Q1 - 1.5 * IQR
upper = Q3 + 1.5 * IQR
outliers = data((data < lower) | (data > upper))
return make_serializable({
"status": "success",
"method": "IQR (Interquartile Range)",
"column": column,
"bounds": {"lower": round(lower, 3), "upper": round(upper, 3)},
"iqr": round(IQR, 3),
"total_values": int(len(data)),
"outlier_count": int(len(outliers)),
"outlier_pct": round(float(len(outliers) / len(data) * 100), 2),
"outlier_examples": (round(float(x), 2) for x in outliers.head(10).tolist())
})
elif method == "zscore":
z = np.abs(stats.zscore(data))
outliers = data(z > 3)
return make_serializable({
"status": "success",
"method": "Z-Score (threshold: 3)",
"column": column,
"total_values": int(len(data)),
"outlier_count": int(len(outliers)),
"outlier_pct": round(float(len(outliers) / len(data) * 100), 2),
"outlier_examples": (round(float(x), 2) for x in outliers.head(10).tolist())
})
return {"status": "error", "message": f"Unknown method: {method}. Use: iqr, zscore"}
print("✅ Statistical analysis tools defined!")