Google ADK Multi-Agent Pipeline Tutorial: Data Loading, Statistical Testing, Visualization, and Report Generation in Python

by ai-intensify
0 comments
Google ADK Multi-Agent Pipeline Tutorial: Data Loading, Statistical Testing, Visualization, and Report Generation in Python

def describe_dataset(dataset_name: str, tool_context: ToolContext) -> dict:
   print(f"📊 Describing dataset: {dataset_name}")
  
   df = DATA_STORE.get_dataset(dataset_name)
   if df is None:
       return {"status": "error", "message": f"Dataset '{dataset_name}' not found"}
  
   numeric_cols = df.select_dtypes(include=(np.number)).columns.tolist()
   categorical_cols = df.select_dtypes(include=('object', 'category')).columns.tolist()
  
   result = {
       "status": "success",
       "dataset": dataset_name,
       "overview": {
           "total_rows": int(len(df)),
           "total_columns": int(len(df.columns)),
           "numeric_columns": numeric_cols,
           "categorical_columns": categorical_cols,
           "memory_mb": round(float(df.memory_usage(deep=True).sum() / 1024 / 1024), 2),
           "duplicate_rows": int(df.duplicated().sum()),
           "missing_total": int(df.isnull().sum().sum())
       }
   }
  
   if numeric_cols:
       stats_dict = {}
       for col in numeric_cols:
           col_data = df(col).dropna()
           if len(col_data) > 0:
               stats_dict(col) = {
                   "count": int(len(col_data)),
                   "mean": round(float(col_data.mean()), 3),
                   "std": round(float(col_data.std()), 3),
                   "min": round(float(col_data.min()), 3),
                   "25%": round(float(col_data.quantile(0.25)), 3),
                   "50%": round(float(col_data.median()), 3),
                   "75%": round(float(col_data.quantile(0.75)), 3),
                   "max": round(float(col_data.max()), 3),
                   "skewness": round(float(col_data.skew()), 3),
                   "missing": int(df(col).isnull().sum())
               }
       result("numeric_summary") = stats_dict
  
   if categorical_cols:
       cat_dict = {}
       for col in categorical_cols(:10):
           vc = df(col).value_counts()
           cat_dict(col) = {
               "unique_values": int(df(col).nunique()),
               "top_values": {str(k): int(v) for k, v in vc.head(5).items()},
               "missing": int(df(col).isnull().sum())
           }
       result("categorical_summary") = cat_dict
  
   DATA_STORE.log_analysis("describe", dataset_name, "Statistics generated")
   return make_serializable(result)




def correlation_analysis(dataset_name: str, method: str = "pearson", tool_context: ToolContext = None) -> dict:
   print(f"📊 Correlation analysis: {dataset_name} ({method})")
  
   df = DATA_STORE.get_dataset(dataset_name)
   if df is None:
       return {"status": "error", "message": f"Dataset '{dataset_name}' not found"}
  
   numeric_df = df.select_dtypes(include=(np.number))
  
   if numeric_df.shape(1) < 2:
       return {"status": "error", "message": "Need at least 2 numeric columns"}
  
   corr_matrix = numeric_df.corr(method=method)
  
   strong_corrs = ()
   for i in range(len(corr_matrix.columns)):
       for j in range(i + 1, len(corr_matrix.columns)):
           col1, col2 = corr_matrix.columns(i), corr_matrix.columns(j)
           val = corr_matrix.iloc(i, j)
           if abs(val) > 0.5:
               strong_corrs.append({
                   "var1": col1,
                   "var2": col2,
                   "correlation": round(float(val), 3),
                   "strength": "strong" if abs(val) > 0.7 else "moderate"
               })
  
   strong_corrs.sort(key=lambda x: abs(x("correlation")), reverse=True)
  
   corr_dict = {}
   for col in corr_matrix.columns:
       corr_dict(col) = {k: round(float(v), 3) for k, v in corr_matrix(col).items()}
  
   DATA_STORE.log_analysis("correlation", dataset_name, f"{method} correlation")
  
   return make_serializable({
       "status": "success",
       "method": method,
       "correlation_matrix": corr_dict,
       "strong_correlations": strong_corrs(:10),
       "insight": f"Found {len(strong_corrs)} pairs with |correlation| > 0.5"
   })




def hypothesis_test(dataset_name: str, test_type: str, column1: str,
                  column2: str = None, group_column: str = None,
                  tool_context: ToolContext = None) -> dict:
   print(f"📊 Hypothesis test: {test_type} on {dataset_name}")
  
   df = DATA_STORE.get_dataset(dataset_name)
   if df is None:
       return {"status": "error", "message": f"Dataset '{dataset_name}' not found"}
  
   if column1 not in df.columns:
       return {"status": "error", "message": f"Column '{column1}' not found"}
  
   try:
       if test_type == "normality":
           data = df(column1).dropna()
           if len(data) > 5000:
               data = data.sample(5000)
           stat, p = stats.shapiro(data)
          
           return make_serializable({
               "status": "success",
               "test": "Shapiro-Wilk Normality Test",
               "column": column1,
               "statistic": round(float(stat), 4),
               "p_value": round(float(p), 6),
               "is_normal": bool(p > 0.05),
               "interpretation": "Data appears normally distributed" if p > 0.05 else "Data is NOT normally distributed"
           })
          
       elif test_type == "ttest":
           if group_column is None:
               return {"status": "error", "message": "group_column required for t-test"}
          
           groups = df(group_column).dropna().unique()
           if len(groups) != 2:
               return {"status": "error", "message": f"T-test needs exactly 2 groups, found {len(groups)}: {list(groups)}"}
          
           g1 = df(df(group_column) == groups(0))(column1).dropna()
           g2 = df(df(group_column) == groups(1))(column1).dropna()
          
           stat, p = stats.ttest_ind(g1, g2)
          
           return make_serializable({
               "status": "success",
               "test": "Independent Samples T-Test",
               "comparing": column1,
               "group1": {"name": str(groups(0)), "mean": round(float(g1.mean()), 3), "n": int(len(g1))},
               "group2": {"name": str(groups(1)), "mean": round(float(g2.mean()), 3), "n": int(len(g2))},
               "t_statistic": round(float(stat), 4),
               "p_value": round(float(p), 6),
               "significant": bool(p < 0.05),
               "interpretation": "Significant difference" if p < 0.05 else "No significant difference"
           })
          
       elif test_type == "anova":
           if group_column is None:
               return {"status": "error", "message": "group_column required for ANOVA"}
          
           groups_data = (grp(column1).dropna().values for _, grp in df.groupby(group_column))
           group_names = list(df(group_column).unique())
          
           stat, p = stats.f_oneway(*groups_data)
          
           group_stats = ()
           for name in group_names:
               grp_data = df(df(group_column) == name)(column1).dropna()
               group_stats.append({
                   "group": str(name),
                   "mean": round(float(grp_data.mean()), 3),
                   "std": round(float(grp_data.std()), 3),
                   "n": int(len(grp_data))
               })
          
           return make_serializable({
               "status": "success",
               "test": "One-Way ANOVA",
               "comparing": column1,
               "across": group_column,
               "n_groups": int(len(group_names)),
               "group_statistics": group_stats,
               "f_statistic": round(float(stat), 4),
               "p_value": round(float(p), 6),
               "significant": bool(p < 0.05),
               "interpretation": "Significant differences among groups" if p < 0.05 else "No significant differences"
           })
          
       elif test_type == "chi2":
           if column2 is None:
               return {"status": "error", "message": "column2 required for chi-square test"}
          
           contingency = pd.crosstab(df(column1), df(column2))
           chi2, p, dof, _ = stats.chi2_contingency(contingency)
          
           return make_serializable({
               "status": "success",
               "test": "Chi-Square Test of Independence",
               "variables": (column1, column2),
               "chi2_statistic": round(float(chi2), 4),
               "p_value": round(float(p), 6),
               "degrees_of_freedom": int(dof),
               "significant": bool(p < 0.05),
               "interpretation": "Variables are dependent" if p < 0.05 else "Variables are independent"
           })
          
       else:
           return {"status": "error", "message": f"Unknown test: {test_type}. Use: normality, ttest, anova, chi2"}
          
   except Exception as e:
       return {"status": "error", "message": f"Test failed: {str(e)}"}




def outlier_detection(dataset_name: str, column: str, method: str = "iqr",
                     tool_context: ToolContext = None) -> dict:
   print(f"📊 Outlier detection: {column} in {dataset_name}")
  
   df = DATA_STORE.get_dataset(dataset_name)
   if df is None:
       return {"status": "error", "message": f"Dataset '{dataset_name}' not found"}
  
   if column not in df.columns:
       return {"status": "error", "message": f"Column '{column}' not found"}
  
   data = df(column).dropna()
  
   if method == "iqr":
       Q1 = float(data.quantile(0.25))
       Q3 = float(data.quantile(0.75))
       IQR = Q3 - Q1
       lower = Q1 - 1.5 * IQR
       upper = Q3 + 1.5 * IQR
       outliers = data((data < lower) | (data > upper))
      
       return make_serializable({
           "status": "success",
           "method": "IQR (Interquartile Range)",
           "column": column,
           "bounds": {"lower": round(lower, 3), "upper": round(upper, 3)},
           "iqr": round(IQR, 3),
           "total_values": int(len(data)),
           "outlier_count": int(len(outliers)),
           "outlier_pct": round(float(len(outliers) / len(data) * 100), 2),
           "outlier_examples": (round(float(x), 2) for x in outliers.head(10).tolist())
       })
      
   elif method == "zscore":
       z = np.abs(stats.zscore(data))
       outliers = data(z > 3)
      
       return make_serializable({
           "status": "success",
           "method": "Z-Score (threshold: 3)",
           "column": column,
           "total_values": int(len(data)),
           "outlier_count": int(len(outliers)),
           "outlier_pct": round(float(len(outliers) / len(data) * 100), 2),
           "outlier_examples": (round(float(x), 2) for x in outliers.head(10).tolist())
       })
  
   return {"status": "error", "message": f"Unknown method: {method}. Use: iqr, zscore"}




print("✅ Statistical analysis tools defined!")

Related Articles

Leave a Comment