Examples

This section provides practical examples of using pyspark-analyzer in various scenarios.

Basic Examples

Example 1: Simple CSV Profiling

from pyspark.sql import SparkSession
from pyspark_analyzer import DataFrameProfiler

# Initialize Spark
spark = SparkSession.builder \
    .appName("CSVProfiling") \
    .getOrCreate()

# Load CSV data
df = spark.read.csv("sales_data.csv", header=True, inferSchema=True)

# Profile the data
profiler = DataFrameProfiler(df)
profile = profiler.profile()

# Print overview
print(f"Total rows: {profile['overview']['row_count']:,}")
print(f"Total columns: {profile['overview']['column_count']}")

# Examine a specific column
sales_stats = profile['columns']['sales_amount']
print(f"\nSales Amount Statistics:")
print(f"  Mean: ${sales_stats['mean']:,.2f}")
print(f"  Median: ${sales_stats['median']:,.2f}")
print(f"  Std Dev: ${sales_stats['std']:,.2f}")

Example 2: Data Quality Check

def check_data_quality(df, null_threshold=0.05, unique_threshold=0.95):
    """
    Check data quality using profiler results.

    Args:
        df: PySpark DataFrame
        null_threshold: Maximum acceptable null rate
        unique_threshold: Minimum uniqueness for ID columns

    Returns:
        List of quality issues
    """
    profiler = DataFrameProfiler(df)
    profile = profiler.profile()

    issues = []

    for col_name, stats in profile['columns'].items():
        # Check null rate
        null_rate = stats['null_count'] / stats['count']
        if null_rate > null_threshold:
            issues.append({
                'column': col_name,
                'issue': 'high_nulls',
                'rate': null_rate,
                'message': f"{col_name} has {null_rate:.1%} null values"
            })

        # Check uniqueness for potential ID columns
        if 'id' in col_name.lower():
            unique_rate = stats['distinct_count'] / stats['count']
            if unique_rate < unique_threshold:
                issues.append({
                    'column': col_name,
                    'issue': 'low_uniqueness',
                    'rate': unique_rate,
                    'message': f"{col_name} is only {unique_rate:.1%} unique"
                })

        # Check for constant columns
        if stats['distinct_count'] == 1:
            issues.append({
                'column': col_name,
                'issue': 'constant',
                'message': f"{col_name} has only one unique value"
            })

    return issues

# Usage
issues = check_data_quality(df)
for issue in issues:
    print(f"⚠️  {issue['message']}")

Advanced Examples

Example 3: Comparative Profiling

def compare_datasets(df1, df2, df1_name="Dataset 1", df2_name="Dataset 2"):
    """
    Compare profiles of two datasets.
    """
    profiler1 = DataFrameProfiler(df1)
    profiler2 = DataFrameProfiler(df2)

    profile1 = profiler1.profile()
    profile2 = profiler2.profile()

    print(f"\n📊 Dataset Comparison: {df1_name} vs {df2_name}")
    print("=" * 50)

    # Compare overview
    print("\nOverview:")
    print(f"  Rows: {profile1['overview']['row_count']:,} vs {profile2['overview']['row_count']:,}")
    print(f"  Columns: {profile1['overview']['column_count']} vs {profile2['overview']['column_count']}")

    # Compare common columns
    cols1 = set(profile1['columns'].keys())
    cols2 = set(profile2['columns'].keys())
    common_cols = cols1.intersection(cols2)

    print(f"\nCommon columns: {len(common_cols)}")
    print(f"Unique to {df1_name}: {cols1 - cols2}")
    print(f"Unique to {df2_name}: {cols2 - cols1}")

    # Compare statistics for numeric columns
    print("\nNumeric Column Comparison:")
    for col in common_cols:
        stats1 = profile1['columns'][col]
        stats2 = profile2['columns'][col]

        if stats1['data_type'] in ['integer', 'double']:
            mean_diff = abs(stats1['mean'] - stats2['mean'])
            mean_pct = mean_diff / stats1['mean'] * 100 if stats1['mean'] != 0 else 0

            print(f"\n  {col}:")
            print(f"    Mean: {stats1['mean']:.2f} vs {stats2['mean']:.2f} ({mean_pct:.1f}% diff)")
            print(f"    Std:  {stats1['std']:.2f} vs {stats2['std']:.2f}")

# Usage
train_df = spark.read.parquet("train_data.parquet")
test_df = spark.read.parquet("test_data.parquet")
compare_datasets(train_df, test_df, "Training", "Test")

Example 4: Automated Feature Engineering

from pyspark_analyzer import DataFrameProfiler, SamplingConfig

def identify_feature_types(df, cardinality_threshold=50):
    """
    Automatically identify feature types for ML preprocessing.
    """
    # Use sampling for large datasets
    config = SamplingConfig(target_size=100_000)
    profiler = DataFrameProfiler(df, sampling_config=config)
    profile = profiler.profile()

    feature_types = {
        'numeric': [],
        'categorical': [],
        'high_cardinality': [],
        'datetime': [],
        'text': [],
        'binary': [],
        'to_drop': []
    }

    for col_name, stats in profile['columns'].items():
        # Skip target variable if specified
        if col_name == 'target':
            continue

        data_type = stats['data_type']
        distinct_count = stats['distinct_count']
        null_rate = stats['null_count'] / stats['count']

        # Drop columns with too many nulls
        if null_rate > 0.9:
            feature_types['to_drop'].append(col_name)
            continue

        # Identify feature type
        if data_type in ['integer', 'double', 'float']:
            if distinct_count == 2:
                feature_types['binary'].append(col_name)
            else:
                feature_types['numeric'].append(col_name)

        elif data_type == 'string':
            if distinct_count == 2:
                feature_types['binary'].append(col_name)
            elif distinct_count < cardinality_threshold:
                feature_types['categorical'].append(col_name)
            elif stats.get('avg_length', 0) > 50:
                feature_types['text'].append(col_name)
            else:
                feature_types['high_cardinality'].append(col_name)

        elif data_type in ['timestamp', 'date']:
            feature_types['datetime'].append(col_name)

    return feature_types

# Usage
feature_types = identify_feature_types(df)
print("Feature Types Identified:")
for ftype, columns in feature_types.items():
    if columns:
        print(f"\n{ftype.upper()}: {len(columns)} features")
        print(f"  {', '.join(columns[:5])}" + (" ..." if len(columns) > 5 else ""))

Example 5: Performance Monitoring

import time
from pyspark_analyzer import DataFrameProfiler, SamplingConfig

def profile_with_monitoring(df, name="DataFrame"):
    """
    Profile DataFrame with performance monitoring.
    """
    print(f"\n⏱️  Profiling {name}...")

    # Test different configurations
    configs = [
        ("No sampling", None),
        ("Auto sampling", SamplingConfig()),
        ("Aggressive sampling", SamplingConfig(target_size=10_000)),
    ]

    results = []

    for config_name, config in configs:
        start_time = time.time()

        if config:
            profiler = DataFrameProfiler(df, sampling_config=config)
        else:
            profiler = DataFrameProfiler(df)

        profile = profiler.profile()
        elapsed = time.time() - start_time

        # Get actual sampling info
        sampling_info = profile.get('sampling', {})
        actual_fraction = sampling_info.get('sample_fraction', 1.0)
        quality_score = sampling_info.get('quality_score', 1.0)

        results.append({
            'config': config_name,
            'time': elapsed,
            'sample_fraction': actual_fraction,
            'quality': quality_score
        })

        print(f"\n  {config_name}:")
        print(f"    Time: {elapsed:.2f}s")
        print(f"    Sample: {actual_fraction:.1%}")
        print(f"    Quality: {quality_score:.3f}")

    # Show speedup
    baseline_time = results[0]['time']
    for result in results[1:]:
        speedup = baseline_time / result['time']
        print(f"\n  {result['config']} speedup: {speedup:.1f}x")

    return results

# Usage with large dataset
large_df = spark.range(10_000_000).selectExpr(
    "id",
    "rand() as value1",
    "randn() as value2",
    "cast(rand() * 100 as int) as category"
)

profile_with_monitoring(large_df, "10M row dataset")

Real-World Scenarios

Example 6: E-commerce Data Profiling

# Profile e-commerce transaction data
def profile_ecommerce_data(transactions_df):
    profiler = DataFrameProfiler(transactions_df, optimize_for_large_datasets=True)
    profile = profiler.profile()

    # Generate business insights
    insights = []

    # Revenue analysis
    revenue_stats = profile['columns'].get('revenue', {})
    if revenue_stats:
        insights.append(f"Average order value: ${revenue_stats['mean']:.2f}")
        insights.append(f"Revenue range: ${revenue_stats['min']:.2f} - ${revenue_stats['max']:.2f}")

    # Customer analysis
    customer_stats = profile['columns'].get('customer_id', {})
    if customer_stats:
        repeat_rate = 1 - (customer_stats['distinct_count'] / customer_stats['count'])
        insights.append(f"Repeat purchase rate: {repeat_rate:.1%}")

    # Product analysis
    product_stats = profile['columns'].get('product_category', {})
    if product_stats:
        insights.append(f"Number of categories: {product_stats['distinct_count']}")

    return insights

Example 7: Time Series Data Profiling

def profile_time_series(df, timestamp_col='timestamp', value_col='value'):
    """
    Specialized profiling for time series data.
    """
    profiler = DataFrameProfiler(df)
    profile = profiler.profile()

    # Get temporal statistics
    ts_stats = profile['columns'][timestamp_col]
    value_stats = profile['columns'][value_col]

    # Calculate additional time series metrics
    from pyspark.sql import functions as F

    # Time range
    time_range = pd.to_datetime(ts_stats['max']) - pd.to_datetime(ts_stats['min'])

    # Sampling frequency
    total_points = profile['overview']['row_count']
    avg_frequency = total_points / time_range.total_seconds()

    # Missing periods (simplified)
    expected_points = time_range.total_seconds() * avg_frequency
    missing_rate = 1 - (total_points / expected_points)

    print(f"Time Series Profile for {value_col}:")
    print(f"  Period: {ts_stats['min']} to {ts_stats['max']}")
    print(f"  Duration: {time_range}")
    print(f"  Data points: {total_points:,}")
    print(f"  Average frequency: {avg_frequency:.2f} points/second")
    print(f"  Missing data rate: {missing_rate:.1%}")
    print(f"  Value range: [{value_stats['min']:.2f}, {value_stats['max']:.2f}]")
    print(f"  Value mean: {value_stats['mean']:.2f}{value_stats['std']:.2f})")

Integration Examples

Example 8: Integration with MLflow

import mlflow

def log_data_profile_to_mlflow(df, dataset_name="training"):
    """
    Log data profile to MLflow for experiment tracking.
    """
    profiler = DataFrameProfiler(df)
    profile = profiler.profile()

    with mlflow.start_run():
        # Log overview metrics
        mlflow.log_metric(f"{dataset_name}_rows", profile['overview']['row_count'])
        mlflow.log_metric(f"{dataset_name}_columns", profile['overview']['column_count'])

        # Log column statistics
        for col_name, stats in profile['columns'].items():
            if stats['data_type'] in ['integer', 'double']:
                mlflow.log_metric(f"{dataset_name}_{col_name}_mean", stats['mean'])
                mlflow.log_metric(f"{dataset_name}_{col_name}_std", stats['std'])
                mlflow.log_metric(f"{dataset_name}_{col_name}_nulls", stats['null_count'])

        # Log profile as artifact
        import json
        with open(f"{dataset_name}_profile.json", "w") as f:
            json.dump(profile, f, indent=2)
        mlflow.log_artifact(f"{dataset_name}_profile.json")

Example 9: Automated Report Generation

def generate_html_report(df, output_file="profile_report.html"):
    """
    Generate an HTML report from profile data.
    """
    profiler = DataFrameProfiler(df)
    profile = profiler.profile()

    html_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>Data Profile Report</title>
        <style>
            body { font-family: Arial, sans-serif; margin: 20px; }
            table { border-collapse: collapse; width: 100%; }
            th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
            th { background-color: #4CAF50; color: white; }
            .overview { background-color: #f0f0f0; padding: 10px; margin-bottom: 20px; }
        </style>
    </head>
    <body>
        <h1>Data Profile Report</h1>
        <div class="overview">
            <h2>Overview</h2>
            <p>Rows: {row_count:,}</p>
            <p>Columns: {column_count}</p>
            <p>Memory Usage: {memory_mb:.2f} MB</p>
        </div>

        <h2>Column Statistics</h2>
        <table>
            <tr>
                <th>Column</th>
                <th>Type</th>
                <th>Non-Null</th>
                <th>Unique</th>
                <th>Mean</th>
                <th>Std</th>
                <th>Min</th>
                <th>Max</th>
            </tr>
            {column_rows}
        </table>
    </body>
    </html>
    """

    # Generate column rows
    column_rows = []
    for col_name, stats in profile['columns'].items():
        non_null_pct = (1 - stats['null_count'] / stats['count']) * 100

        row = f"""
        <tr>
            <td>{col_name}</td>
            <td>{stats['data_type']}</td>
            <td>{non_null_pct:.1f}%</td>
            <td>{stats['distinct_count']:,}</td>
            <td>{stats.get('mean', 'N/A')}</td>
            <td>{stats.get('std', 'N/A')}</td>
            <td>{stats.get('min', 'N/A')}</td>
            <td>{stats.get('max', 'N/A')}</td>
        </tr>
        """
        column_rows.append(row)

    # Fill template
    html_content = html_template.format(
        row_count=profile['overview']['row_count'],
        column_count=profile['overview']['column_count'],
        memory_mb=profile['overview']['memory_usage_bytes'] / 1024 / 1024,
        column_rows=''.join(column_rows)
    )

    with open(output_file, 'w') as f:
        f.write(html_content)

    print(f"Report generated: {output_file}")

# Usage
generate_html_report(df)