Skip to content

soc-pipeline

Clauset-grade power-law validation for self-organized criticality (SOC) data.

pip install soc-pipeline

Quick start

from soc_pipeline import validate
import numpy as np

# Synthetic Pareto sample
data = np.random.pareto(1.5, 5000) + 1

verdict = validate(data, label="my_events", expected_band=(2.4, 2.6))
print(verdict.verdict, verdict.alpha, verdict.in_band)

Top-level entry points

validate

Unified one-call validation API.

This is the canonical entry point for users who want a quick PASS/FAIL/INCONCLUSIVE verdict on whether a sample looks like a self-organized criticality (SOC) power-law relative to a pre-registered band.

The contract:

>>> from soc_pipeline import validate
>>> v = validate(data, label="earthquake_M", expected_band=(1.9, 2.1))
>>> print(v.verdict, v.alpha, v.in_band)
INCONCLUSIVE rules
  • n_total < min_samples (default 100)
  • powerlaw fit raises / returns NaN
  • bootstrap CI cannot be computed (too few values)
  • LR test prefers lognormal or exponential alternative with high confidence
PASS rules
  • Fit succeeds
  • alpha lies inside expected_band (if provided); otherwise just "fit OK + no alternative preferred" -> PASS
  • No alternative model (lognormal / exponential) significantly beats the power-law (R > 0 OR p >= 0.1)
FAIL rules
  • Fit succeeds but alpha is outside the pre-registered band
  • or an alternative model significantly beats power-law (R<0 AND p<0.1)

Verdict dataclass

Verdict(verdict: Literal['PASS', 'FAIL', 'INCONCLUSIVE'], alpha: float, alpha_ci_lo: float, alpha_ci_hi: float, xmin: float, n_tail: int, ks_distance: float, vs_lognormal_R: float, vs_lognormal_p: float, vs_exponential_R: float, vs_exponential_p: float, pre_registered_band: tuple[float, float] | None, in_band: bool | None, label: str = 'data', reason: str = '')

Unified verdict returned by :func:validate.

Attributes:

Name Type Description
verdict Literal['PASS', 'FAIL', 'INCONCLUSIVE']

One of "PASS" / "FAIL" / "INCONCLUSIVE".

alpha float

Power-law exponent estimate (Clauset 2009 MLE).

alpha_ci_lo float

Lower bound of 95% bootstrap CI on alpha.

alpha_ci_hi float

Upper bound of 95% bootstrap CI on alpha.

xmin float

Lower bound selected by KS minimisation.

n_tail int

Sample size at and above xmin.

ks_distance float

Kolmogorov-Smirnov distance of empirical tail vs fit.

vs_lognormal_R float

Vuong LR statistic (positive favours power-law).

vs_lognormal_p float

Two-sided p-value of LR vs lognormal.

vs_exponential_R float

Vuong LR statistic vs exponential.

vs_exponential_p float

Two-sided p-value of LR vs exponential.

pre_registered_band tuple[float, float] | None

Optional (low, high) band supplied by caller.

in_band bool | None

True/False if alpha falls inside the pre-registered band, or None when no band was supplied / fit failed.

label str

Caller-supplied label.

reason str

Human-readable reason string (most useful for INCONCLUSIVE).

validate

validate(data: ndarray, label: str = 'data', expected_band: tuple[float, float] | None = None, *, discrete: bool = False, n_boot: int = 200, seed: int = 42, min_samples: int = 100) -> Verdict

Run the canonical Clauset-grade SOC validation on a 1-D sample.

Parameters:

Name Type Description Default
data ndarray

1-D positive array of event sizes / durations.

required
label str

Caller-supplied label for round-tripping (e.g. "earthquake_M").

'data'
expected_band tuple[float, float] | None

Optional (low, high) pre-registered band on alpha. When supplied, the verdict checks whether the fitted alpha falls inside this band; otherwise the band check is skipped.

None
discrete bool

Pass-through to :func:fit_clauset_powerlaw.

False
n_boot int

Number of bootstrap resamples for the CI (default 200).

200
seed int

RNG seed for reproducibility of the bootstrap CI.

42
min_samples int

Minimum sample size; below this returns INCONCLUSIVE.

100

Returns:

Name Type Description
A Verdict

class:Verdict with the unified PASS/FAIL/INCONCLUSIVE outcome.

Source code in packages/soc-pipeline/src/soc_pipeline/validate.py
def validate(
    data: np.ndarray,
    label: str = "data",
    expected_band: tuple[float, float] | None = None,
    *,
    discrete: bool = False,
    n_boot: int = 200,
    seed: int = 42,
    min_samples: int = 100,
) -> Verdict:
    """Run the canonical Clauset-grade SOC validation on a 1-D sample.

    Args:
        data: 1-D positive array of event sizes / durations.
        label: Caller-supplied label for round-tripping (e.g. ``"earthquake_M"``).
        expected_band: Optional ``(low, high)`` pre-registered band on alpha.
            When supplied, the verdict checks whether the fitted alpha falls
            inside this band; otherwise the band check is skipped.
        discrete: Pass-through to :func:`fit_clauset_powerlaw`.
        n_boot: Number of bootstrap resamples for the CI (default 200).
        seed: RNG seed for reproducibility of the bootstrap CI.
        min_samples: Minimum sample size; below this returns INCONCLUSIVE.

    Returns:
        A :class:`Verdict` with the unified PASS/FAIL/INCONCLUSIVE outcome.
    """
    arr = np.asarray(data, dtype=float)
    arr = arr[np.isfinite(arr) & (arr > 0)]
    n_total = int(arr.size)

    if n_total < min_samples:
        return _inconclusive(
            label,
            f"too few values: {n_total} < {min_samples}",
            band=expected_band,
        )

    fit = fit_clauset_powerlaw(arr, name=label, discrete=discrete)
    if fit.error or fit.alpha is None or not np.isfinite(fit.alpha):
        return _inconclusive(
            label,
            f"fit failed: {fit.error or 'non-finite alpha'}",
            band=expected_band,
        )

    if n_boot <= 0:
        # caller explicitly disabled bootstrap (fast path for tests / CI)
        ci_lo, ci_hi = _NAN, _NAN
    else:
        boot = bootstrap_ci(arr, n_boot=n_boot, seed=seed, discrete=discrete)
        if boot.error or boot.ci_low is None or boot.ci_high is None:
            ci_lo, ci_hi = _NAN, _NAN
        else:
            ci_lo = float(boot.ci_low)
            ci_hi = float(boot.ci_high)

    alpha = float(fit.alpha)
    xmin = float(fit.xmin) if fit.xmin is not None else _NAN
    n_tail = int(fit.n_tail)
    ks = float(fit.ks_statistic) if fit.ks_statistic is not None else _NAN

    R_ln = float(fit.vs_lognormal_R) if fit.vs_lognormal_R is not None else _NAN
    p_ln = float(fit.vs_lognormal_p) if fit.vs_lognormal_p is not None else _NAN
    R_exp = float(fit.vs_exponential_R) if fit.vs_exponential_R is not None else _NAN
    p_exp = float(fit.vs_exponential_p) if fit.vs_exponential_p is not None else _NAN

    # band check
    if expected_band is None:
        in_band: bool | None = None
    else:
        lo, hi = expected_band
        in_band = bool(lo <= alpha <= hi)

    # alternative-model rejection (Clauset 2009 §6 rule of thumb)
    rejects = False
    rejection_reason = ""
    if np.isfinite(R_ln) and R_ln < 0 and np.isfinite(p_ln) and p_ln < 0.1:
        rejects = True
        rejection_reason = (
            f"lognormal preferred: R={R_ln:.2f} p={p_ln:.3f}"
        )
    if np.isfinite(R_exp) and R_exp < 0 and np.isfinite(p_exp) and p_exp < 0.1:
        rejects = True
        rejection_reason = (
            f"exponential preferred: R={R_exp:.2f} p={p_exp:.3f}"
        )

    if rejects:
        verdict: Literal["PASS", "FAIL", "INCONCLUSIVE"] = "FAIL"
        reason = rejection_reason
    elif expected_band is not None and not in_band:
        verdict = "FAIL"
        reason = (
            f"alpha={alpha:.3f} outside pre-registered band "
            f"[{expected_band[0]:.3f}, {expected_band[1]:.3f}]"
        )
    else:
        verdict = "PASS"
        if expected_band is not None:
            reason = (
                f"alpha={alpha:.3f} inside band "
                f"[{expected_band[0]:.3f}, {expected_band[1]:.3f}]"
            )
        else:
            reason = f"alpha={alpha:.3f} (no band check)"

    return Verdict(
        verdict=verdict,
        alpha=alpha,
        alpha_ci_lo=ci_lo,
        alpha_ci_hi=ci_hi,
        xmin=xmin,
        n_tail=n_tail,
        ks_distance=ks,
        vs_lognormal_R=R_ln,
        vs_lognormal_p=p_ln,
        vs_exponential_R=R_exp,
        vs_exponential_p=p_exp,
        pre_registered_band=expected_band,
        in_band=in_band,
        label=label,
        reason=reason,
    )

Verdict dataclass

Verdict(verdict: Literal['PASS', 'FAIL', 'INCONCLUSIVE'], alpha: float, alpha_ci_lo: float, alpha_ci_hi: float, xmin: float, n_tail: int, ks_distance: float, vs_lognormal_R: float, vs_lognormal_p: float, vs_exponential_R: float, vs_exponential_p: float, pre_registered_band: tuple[float, float] | None, in_band: bool | None, label: str = 'data', reason: str = '')

Unified verdict returned by :func:validate.

Attributes:

Name Type Description
verdict Literal['PASS', 'FAIL', 'INCONCLUSIVE']

One of "PASS" / "FAIL" / "INCONCLUSIVE".

alpha float

Power-law exponent estimate (Clauset 2009 MLE).

alpha_ci_lo float

Lower bound of 95% bootstrap CI on alpha.

alpha_ci_hi float

Upper bound of 95% bootstrap CI on alpha.

xmin float

Lower bound selected by KS minimisation.

n_tail int

Sample size at and above xmin.

ks_distance float

Kolmogorov-Smirnov distance of empirical tail vs fit.

vs_lognormal_R float

Vuong LR statistic (positive favours power-law).

vs_lognormal_p float

Two-sided p-value of LR vs lognormal.

vs_exponential_R float

Vuong LR statistic vs exponential.

vs_exponential_p float

Two-sided p-value of LR vs exponential.

pre_registered_band tuple[float, float] | None

Optional (low, high) band supplied by caller.

in_band bool | None

True/False if alpha falls inside the pre-registered band, or None when no band was supplied / fit failed.

label str

Caller-supplied label.

reason str

Human-readable reason string (most useful for INCONCLUSIVE).

Power-law fit

fit_clauset_powerlaw

fit_clauset_powerlaw(x_data: ndarray, name: str = 'values', discrete: bool = False, xmin_method: str = 'ks', min_samples: int = 100) -> FitResult

Fit a power-law to the tail of x_data using the Clauset 2009 method.

Parameters:

Name Type Description Default
x_data ndarray

1-D array of positive observed sizes/durations.

required
name str

Caller-provided label for logging / round-tripping.

'values'
discrete bool

True for integer event counts, False for continuous sizes.

False
xmin_method str

Currently only 'ks' (Kolmogorov-Smirnov minimization).

'ks'
min_samples int

Minimum sample size; below this, the function returns a FitResult with an error message and unset fields.

100

Returns:

Type Description
FitResult

FitResult dataclass.

Notes

Requires the powerlaw package (Alstott et al. 2014). If the package is not installed, FitResult.error is set and other fields are None.

Source code in packages/soc-pipeline/src/soc_pipeline/fit.py
def fit_clauset_powerlaw(
    x_data: np.ndarray,
    name: str = "values",
    discrete: bool = False,
    xmin_method: str = "ks",
    min_samples: int = 100,
) -> FitResult:
    """Fit a power-law to the tail of x_data using the Clauset 2009 method.

    Args:
        x_data: 1-D array of positive observed sizes/durations.
        name: Caller-provided label for logging / round-tripping.
        discrete: True for integer event counts, False for continuous sizes.
        xmin_method: Currently only 'ks' (Kolmogorov-Smirnov minimization).
        min_samples: Minimum sample size; below this, the function returns a
            FitResult with an error message and unset fields.

    Returns:
        FitResult dataclass.

    Notes:
        Requires the `powerlaw` package (Alstott et al. 2014). If the package
        is not installed, FitResult.error is set and other fields are None.
    """
    try:
        import powerlaw  # type: ignore
    except Exception as exc:  # pragma: no cover - import-time only
        return FitResult(name=name, error=f"powerlaw missing: {exc}")

    if xmin_method != "ks":
        return FitResult(name=name, error=f"xmin_method {xmin_method} not supported")

    x_data = np.asarray(x_data, dtype=float)
    x_data = x_data[np.isfinite(x_data) & (x_data > 0)]
    n_total = int(len(x_data))
    if n_total < min_samples:
        return FitResult(
            name=name,
            n_total=n_total,
            error=f"too few values: {n_total} < {min_samples}",
        )

    fit = powerlaw.Fit(x_data, discrete=discrete, xmin_distance="D", verbose=False)
    alpha = float(fit.power_law.alpha)
    sigma = float(fit.power_law.sigma)
    xmin = float(fit.power_law.xmin)
    n_tail = int(np.sum(x_data >= xmin))
    try:
        ks = float(fit.power_law.D)
    except Exception:
        ks = None

    try:
        R_ln, p_ln = fit.distribution_compare(
            "power_law", "lognormal", normalized_ratio=True
        )
    except Exception:
        R_ln, p_ln = (None, None)
    try:
        R_exp, p_exp = fit.distribution_compare(
            "power_law", "exponential", normalized_ratio=True
        )
    except Exception:
        R_exp, p_exp = (None, None)

    rejects = False
    if R_exp is not None and R_exp < 0:
        rejects = True
    if R_ln is not None and R_ln < 0:
        rejects = True

    if R_ln is None:
        winner = "inconclusive"
    elif R_ln > 0 and (p_ln is None or p_ln < 0.1):
        winner = "power_law"
    elif R_ln < 0 and (p_ln is None or p_ln < 0.1):
        winner = "lognormal"
    else:
        winner = "inconclusive"

    return FitResult(
        alpha=alpha,
        xmin=xmin,
        sigma=sigma,
        n_total=n_total,
        n_tail=n_tail,
        ks_statistic=ks,
        vs_lognormal_R=None if R_ln is None else float(R_ln),
        vs_lognormal_p=None if p_ln is None else float(p_ln),
        vs_exponential_R=None if R_exp is None else float(R_exp),
        vs_exponential_p=None if p_exp is None else float(p_exp),
        vs_powerlaw_lognormal_winner=winner,
        rejects_power_law=bool(rejects),
        name=name,
    )

FitResult dataclass

FitResult(alpha: float | None = None, xmin: float | None = None, sigma: float | None = None, n_total: int = 0, n_tail: int = 0, ks_statistic: float | None = None, vs_lognormal_R: float | None = None, vs_lognormal_p: float | None = None, vs_exponential_R: float | None = None, vs_exponential_p: float | None = None, vs_powerlaw_lognormal_winner: str = 'inconclusive', rejects_power_law: bool = False, name: str = 'values', error: str | None = None, extra: dict[str, Any] = dict())

Result of a Clauset 2009 power-law fit.

Attributes:

Name Type Description
alpha float | None

Power-law scaling exponent (P(x) ~ x^-alpha).

xmin float | None

Lower-bound xmin selected by KS minimization.

sigma float | None

Standard error on alpha.

n_total int

Total sample size before xmin cut.

n_tail int

Number of samples >= xmin (used for the fit).

ks_statistic float | None

KS distance between empirical tail and fitted power-law.

vs_lognormal_R float | None

Vuong LR statistic vs lognormal (positive -> power-law).

vs_lognormal_p float | None

Two-sided p-value for the LR test vs lognormal.

vs_exponential_R float | None

Vuong LR statistic vs exponential.

vs_exponential_p float | None

Two-sided p-value for the LR test vs exponential.

vs_powerlaw_lognormal_winner str

'power_law' / 'lognormal' / 'inconclusive'.

rejects_power_law bool

True if comparison vs simpler model rejects PL.

name str

Caller-provided label.

error str | None

If non-None, fit failed and other fields are unset.

to_dict

to_dict() -> dict[str, Any]

Backward-compatible dict view (matches legacy v4/lib API).

Source code in packages/soc-pipeline/src/soc_pipeline/fit.py
def to_dict(self) -> dict[str, Any]:
    """Backward-compatible dict view (matches legacy v4/lib API)."""
    d = {
        "name": self.name,
        "alpha": self.alpha,
        "sigma_alpha": self.sigma,
        "xmin": self.xmin,
        "n_total": self.n_total,
        "n_tail": self.n_tail,
        "vs_lognormal_R": self.vs_lognormal_R,
        "vs_lognormal_p": self.vs_lognormal_p,
        "vs_exponential_R": self.vs_exponential_R,
        "vs_exponential_p": self.vs_exponential_p,
        "vs_powerlaw_lognormal_winner": self.vs_powerlaw_lognormal_winner,
        "rejects_power_law": self.rejects_power_law,
        "ks_statistic": self.ks_statistic,
    }
    if self.error:
        d["error"] = self.error
    return d

Bootstrap

bootstrap_ci

bootstrap_ci(x_data: ndarray, n_boot: int = 200, seed: int = 42, discrete: bool = False, ci_pct: tuple[float, float] = (2.5, 97.5), min_samples: int = 200) -> BootstrapResult

Bootstrap a CI on the Clauset 2009 alpha estimator.

Parameters:

Name Type Description Default
x_data ndarray

1-D positive sample.

required
n_boot int

Number of resamples (default 200; >=200 recommended).

200
seed int

RNG seed for reproducibility.

42
discrete bool

Pass-through to powerlaw.Fit.

False
ci_pct tuple[float, float]

Lower/upper percentile pair for the CI (default 95% CI).

(2.5, 97.5)
min_samples int

Minimum sample size to attempt bootstrap.

200

Returns:

Type Description
BootstrapResult

BootstrapResult dataclass. If fewer than 20 fits succeed, returns

BootstrapResult

a result with error set.

Source code in packages/soc-pipeline/src/soc_pipeline/bootstrap.py
def bootstrap_ci(
    x_data: np.ndarray,
    n_boot: int = 200,
    seed: int = 42,
    discrete: bool = False,
    ci_pct: tuple[float, float] = (2.5, 97.5),
    min_samples: int = 200,
) -> BootstrapResult:
    """Bootstrap a CI on the Clauset 2009 alpha estimator.

    Args:
        x_data: 1-D positive sample.
        n_boot: Number of resamples (default 200; >=200 recommended).
        seed: RNG seed for reproducibility.
        discrete: Pass-through to powerlaw.Fit.
        ci_pct: Lower/upper percentile pair for the CI (default 95% CI).
        min_samples: Minimum sample size to attempt bootstrap.

    Returns:
        BootstrapResult dataclass. If fewer than 20 fits succeed, returns
        a result with error set.
    """
    try:
        import powerlaw  # type: ignore
    except Exception as exc:
        return BootstrapResult(error=f"powerlaw missing: {exc}")

    x_data = np.asarray(x_data, dtype=float)
    x_data = x_data[np.isfinite(x_data) & (x_data > 0)]
    if len(x_data) < min_samples:
        return BootstrapResult(error=f"too few values: {len(x_data)} < {min_samples}")
    rng = np.random.default_rng(seed)
    n = len(x_data)
    alphas: list[float] = []
    for _ in range(n_boot):
        sample = rng.choice(x_data, size=n, replace=True)
        try:
            f = powerlaw.Fit(sample, discrete=discrete, xmin_distance="D", verbose=False)
            alphas.append(float(f.power_law.alpha))
        except Exception:
            continue
    if len(alphas) < 20:
        return BootstrapResult(
            n_boot_succeeded=len(alphas),
            error=f"only {len(alphas)}/{n_boot} fits succeeded",
        )
    arr = np.asarray(alphas)
    return BootstrapResult(
        alpha_mean=float(arr.mean()),
        alpha_median=float(np.median(arr)),
        alpha_std=float(arr.std()),
        ci_low=float(np.percentile(arr, ci_pct[0])),
        ci_high=float(np.percentile(arr, ci_pct[1])),
        n_boot_succeeded=int(len(arr)),
    )

BootstrapResult dataclass

BootstrapResult(alpha_mean: float | None = None, alpha_median: float | None = None, alpha_std: float | None = None, ci_low: float | None = None, ci_high: float | None = None, n_boot_succeeded: int = 0, error: str | None = None)

Bootstrap CI on the power-law alpha.

Attributes:

Name Type Description
alpha_mean float | None

Mean of bootstrap-resampled alpha.

alpha_median float | None

Median of bootstrap-resampled alpha.

alpha_std float | None

Standard deviation across bootstrap samples.

ci_low float | None

Lower percentile of alpha (default 2.5).

ci_high float | None

Upper percentile of alpha (default 97.5).

n_boot_succeeded int

Number of resamples where fitting succeeded.

error str | None

Optional error string when fewer than 20 resamples succeeded.

Likelihood-ratio test (Vuong)

vuong_lr_test

vuong_lr_test(x_data: ndarray, vs: Distribution = 'lognormal', discrete: bool = False, alpha_threshold: float = 0.1) -> LRResult

Run Vuong 1989 LR test on power-law vs alternative.

Parameters:

Name Type Description Default
x_data ndarray

Positive 1-D sample.

required
vs Distribution

Alternative distribution ('lognormal' / 'exponential' / 'stretched_exponential' / 'truncated_power_law').

'lognormal'
discrete bool

Pass-through to powerlaw.Fit.

False
alpha_threshold float

p-value cutoff for declaring a winner.

0.1

Returns:

Type Description
LRResult

LRResult dataclass.

Source code in packages/soc-pipeline/src/soc_pipeline/lr_test.py
def vuong_lr_test(
    x_data: np.ndarray,
    vs: Distribution = "lognormal",
    discrete: bool = False,
    alpha_threshold: float = 0.1,
) -> LRResult:
    """Run Vuong 1989 LR test on power-law vs alternative.

    Args:
        x_data: Positive 1-D sample.
        vs: Alternative distribution ('lognormal' / 'exponential' /
            'stretched_exponential' / 'truncated_power_law').
        discrete: Pass-through to powerlaw.Fit.
        alpha_threshold: p-value cutoff for declaring a winner.

    Returns:
        LRResult dataclass.
    """
    try:
        import powerlaw  # type: ignore
    except Exception as exc:
        return LRResult(vs=vs, error=f"powerlaw missing: {exc}")

    x_data = np.asarray(x_data, dtype=float)
    x_data = x_data[np.isfinite(x_data) & (x_data > 0)]
    if len(x_data) < 100:
        return LRResult(vs=vs, error=f"too few values: {len(x_data)}")
    try:
        fit = powerlaw.Fit(x_data, discrete=discrete, xmin_distance="D", verbose=False)
        R, p = fit.distribution_compare("power_law", vs, normalized_ratio=True)
    except Exception as exc:
        return LRResult(vs=vs, error=f"lr test failed: {exc}")

    winner = "inconclusive"
    if R is not None and p is not None and p < alpha_threshold:
        winner = "power_law" if R > 0 else vs

    return LRResult(vs=vs, R=float(R) if R is not None else None,
                    p=float(p) if p is not None else None, winner=winner)

LRResult dataclass

LRResult(vs: str, R: float | None = None, p: float | None = None, winner: str = 'inconclusive', error: str | None = None)

Result of a Vuong LR test power_law vs vs.

Attributes:

Name Type Description
vs str

Alternative distribution name.

R float | None

Normalized LR statistic. R > 0 favors power-law, R < 0 favors alternative.

p float | None

Two-sided p-value (small => one model significantly better).

winner str

'power_law' / vs / 'inconclusive'.

error str | None

Error string when fit failed.

Null controls

synthetic_null

synthetic_null(kind: NullKind | None = None, n: int = 20000, seed: int = 42) -> dict[str, NullCase] | NullCase

Run the pipeline on one or all synthetic nulls.

Parameters:

Name Type Description Default
kind NullKind | None

If None, run all three null kinds (gaussian_walk, exponential, poisson_iat) and return a dict keyed by name. If a specific kind, return a single NullCase.

None
n int

Sample size per null.

20000
seed int

RNG seed.

42

Returns:

Type Description
dict[str, NullCase] | NullCase

Either a dict[str, NullCase] (when kind is None) or a single NullCase.

Notes

Use the dict form for the standard validation suite. The returned cases each carry a correctly_rejected flag — pipeline is healthy iff all three reject power-law.

Source code in packages/soc-pipeline/src/soc_pipeline/null_controls.py
def synthetic_null(
    kind: NullKind | None = None,
    n: int = 20_000,
    seed: int = 42,
) -> dict[str, NullCase] | NullCase:
    """Run the pipeline on one or all synthetic nulls.

    Args:
        kind: If None, run all three null kinds (gaussian_walk, exponential,
            poisson_iat) and return a dict keyed by name. If a specific kind,
            return a single NullCase.
        n: Sample size per null.
        seed: RNG seed.

    Returns:
        Either a dict[str, NullCase] (when kind is None) or a single NullCase.

    Notes:
        Use the dict form for the standard validation suite. The returned
        cases each carry a `correctly_rejected` flag — pipeline is healthy
        iff all three reject power-law.
    """
    rng = np.random.default_rng(seed)
    if kind is not None:
        sample = _generate(kind, n, rng)
        fit = fit_clauset_powerlaw(sample, name=kind)
        return NullCase(name=kind, fit=fit, correctly_rejected=bool(fit.rejects_power_law))

    out: dict[str, NullCase] = {}
    for k in ("gaussian_walk", "exponential", "poisson_iat"):
        sample = _generate(k, n, rng)  # type: ignore[arg-type]
        fit = fit_clauset_powerlaw(sample, name=k)
        out[k] = NullCase(
            name=k,  # type: ignore[arg-type]
            fit=fit,
            correctly_rejected=bool(fit.rejects_power_law),
        )
    return out

NullCase dataclass

NullCase(name: NullKind, fit: FitResult, correctly_rejected: bool)

Result of running the pipeline on one synthetic null distribution.

Attributes:

Name Type Description
name NullKind

Which null distribution generated the sample.

fit FitResult

FitResult from passing the sample through the SOC pipeline.

correctly_rejected bool

True iff the pipeline reported rejects_power_law.

Universal collapse

shape_normalized_collapse

shape_normalized_collapse(samples: dict[str, tuple[ndarray, float]], s_star_percentile: float = 99.0, n_points: int = 120) -> CollapseResult

Compute shape-normalized collapse curves across systems.

Parameters:

Name Type Description Default
samples dict[str, tuple[ndarray, float]]

dict mapping system name -> (event_sizes, known_alpha). event_sizes: 1-D positive array. known_alpha: scaling exponent from Clauset fit (or literature value).

required
s_star_percentile float

Percentile used to choose system-specific cutoff s*. 99 means s* = 99th percentile (robust to outliers).

99.0
n_points int

Number of log-spaced CCDF grid points per system.

120

Returns:

Type Description
CollapseResult

CollapseResult with one SystemCurve per system.

Source code in packages/soc-pipeline/src/soc_pipeline/universal_collapse.py
def shape_normalized_collapse(
    samples: dict[str, tuple[np.ndarray, float]],
    s_star_percentile: float = 99.0,
    n_points: int = 120,
) -> CollapseResult:
    """Compute shape-normalized collapse curves across systems.

    Args:
        samples: dict mapping system name -> (event_sizes, known_alpha).
            event_sizes: 1-D positive array. known_alpha: scaling exponent
            from Clauset fit (or literature value).
        s_star_percentile: Percentile used to choose system-specific cutoff s*.
            99 means s* = 99th percentile (robust to outliers).
        n_points: Number of log-spaced CCDF grid points per system.

    Returns:
        CollapseResult with one SystemCurve per system.
    """
    out: dict[str, SystemCurve] = {}
    alpha_min, alpha_max = float("inf"), float("-inf")
    for name, (vals, alpha) in samples.items():
        vals = np.asarray(vals, dtype=float)
        vals = vals[np.isfinite(vals) & (vals > 0)]
        if len(vals) < 100:
            continue
        grid, ccdf = empirical_ccdf(vals, n_points=n_points)
        if grid is None or ccdf is None:
            continue
        s_star = float(np.percentile(vals, s_star_percentile))
        x_rescaled = grid / s_star
        y_rescaled = (s_star ** (alpha - 1)) * ccdf
        out[name] = SystemCurve(
            name=name,
            alpha=float(alpha),
            s_star=s_star,
            n=int(len(vals)),
            x_rescaled=np.asarray(x_rescaled),
            y_rescaled=np.asarray(y_rescaled),
            raw_grid=np.asarray(grid),
            raw_ccdf=np.asarray(ccdf),
        )
        alpha_min = min(alpha_min, float(alpha))
        alpha_max = max(alpha_max, float(alpha))

    return CollapseResult(
        systems=out,
        alpha_range=(alpha_min, alpha_max) if out else None,
        n_systems=len(out),
        notes=(
            "Strict alpha collapse fails across different observables. "
            "Functional-form collapse (power-law tail with exponential cutoff) succeeds."
        ),
    )

CollapseResult dataclass

CollapseResult(systems: dict[str, SystemCurve] = dict(), alpha_range: tuple[float, float] | None = None, n_systems: int = 0, notes: str = '')

Result of shape-normalized collapse over multiple systems.

Attributes:

Name Type Description
systems dict[str, SystemCurve]

Dict mapping system name -> SystemCurve.

alpha_range tuple[float, float] | None

(min_alpha, max_alpha) across systems.

n_systems int

Number of systems included.

notes str

Free-text summary.

Omori-law decay

fit_omori_p

fit_omori_p(dts_sec: ndarray, min_sec: float = 300.0, max_sec: float = 30 * 86400, n_bins: int = 24, c_grid_days: tuple[float, ...] = (0.001, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5)) -> OmoriResult

Omori-Utsu fit on a stack of aftershock times after a main shock.

Parameters:

Name Type Description Default
dts_sec ndarray

Delay (seconds) of each aftershock after its main shock.

required
min_sec float

Lower edge of fit window (default 5 min).

300.0
max_sec float

Upper edge of fit window (default 30 days).

30 * 86400
n_bins int

Number of log-spaced time bins.

24
c_grid_days tuple[float, ...]

Grid for the c (time-shift) parameter; best by R^2 wins.

(0.001, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5)

Returns:

Type Description
OmoriResult

OmoriResult dataclass.

Source code in packages/soc-pipeline/src/soc_pipeline/omori.py
def fit_omori_p(
    dts_sec: np.ndarray,
    min_sec: float = 300.0,
    max_sec: float = 30 * 86400,
    n_bins: int = 24,
    c_grid_days: tuple[float, ...] = (0.001, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5),
) -> OmoriResult:
    """Omori-Utsu fit on a stack of aftershock times after a main shock.

    Args:
        dts_sec: Delay (seconds) of each aftershock after its main shock.
        min_sec: Lower edge of fit window (default 5 min).
        max_sec: Upper edge of fit window (default 30 days).
        n_bins: Number of log-spaced time bins.
        c_grid_days: Grid for the c (time-shift) parameter; best by R^2 wins.

    Returns:
        OmoriResult dataclass.
    """
    dts = np.asarray(dts_sec, dtype=float)
    if len(dts) < 100:
        return OmoriResult(error=f"too few aftershocks ({len(dts)})")

    dts = dts[(dts >= min_sec) & (dts <= max_sec)]
    if len(dts) == 0:
        return OmoriResult(error="no aftershocks in [min_sec, max_sec]")

    bins = np.logspace(math.log10(min_sec), math.log10(max_sec), n_bins + 1)
    centers = np.sqrt(bins[:-1] * bins[1:])
    widths = bins[1:] - bins[:-1]
    counts, _ = np.histogram(dts, bins=bins)
    rate_per_sec = counts / widths
    valid = counts >= 3
    if valid.sum() < 6:
        return OmoriResult(error=f"too few valid bins ({int(valid.sum())})")

    t_sec = centers[valid]
    r = rate_per_sec[valid]
    w = np.sqrt(counts[valid].astype(float))

    best: dict[str, Any] | None = None
    for c_day in c_grid_days:
        c_sec = c_day * 86400
        x = np.log10(t_sec + c_sec)
        y = np.log10(r)
        W = w ** 2
        Sw = np.sum(W)
        Sx = np.sum(W * x)
        Sy = np.sum(W * y)
        Sxx = np.sum(W * x * x)
        Sxy = np.sum(W * x * y)
        denom = Sw * Sxx - Sx * Sx
        if denom == 0:
            continue
        slope = (Sw * Sxy - Sx * Sy) / denom
        intercept = (Sy - slope * Sx) / Sw
        p_val = -slope
        logK = intercept
        pred_y = intercept + slope * x
        ss_res = float(np.sum(W * (y - pred_y) ** 2))
        ss_tot = float(np.sum(W * (y - np.average(y, weights=W)) ** 2))
        r2 = 1 - ss_res / ss_tot if ss_tot > 0 else None
        residuals = y - pred_y
        dof = max(len(y) - 2, 1)
        mse = float(np.sum(W * residuals ** 2) / dof)
        var_slope = mse * Sw / denom
        sigma_p = float(math.sqrt(abs(var_slope)))
        if best is None or (r2 is not None and r2 > best["R2"]):
            best = {
                "c_days": c_day,
                "p": float(p_val),
                "p_sigma": sigma_p,
                "logK": float(logK),
                "R2": float(r2) if r2 is not None else None,
            }

    if best is None:
        return OmoriResult(error="no valid Omori fit")

    return OmoriResult(
        p=best["p"],
        p_sigma=best["p_sigma"],
        c_days=best["c_days"],
        logK=best["logK"],
        R2=best["R2"],
        n_aftershocks_in_fit=int(np.sum(counts[valid])),
        n_bins_used=int(valid.sum()),
        t_range_days=(float(t_sec.min() / 86400), float(t_sec.max() / 86400)),
    )

OmoriResult dataclass

OmoriResult(p: float | None = None, p_sigma: float | None = None, c_days: float | None = None, logK: float | None = None, R2: float | None = None, n_aftershocks_in_fit: int = 0, n_bins_used: int = 0, t_range_days: tuple[float, float] | None = None, n_main: int | None = None, mu: float | None = None, sigma: float | None = None, threshold: float | None = None, error: str | None = None, extra: dict[str, Any] = dict())

Result of an Omori-Utsu p-value fit.

Attributes:

Name Type Description
p float | None

Omori-Utsu decay exponent.

p_sigma float | None

Standard error on p.

c_days float | None

Time-shift constant (days) chosen from grid by best R^2.

logK float | None

log10 of productivity K.

R2 float | None

Goodness-of-fit on log-spaced bins.

n_aftershocks_in_fit int

How many aftershocks fed the regression.

n_bins_used int

How many time bins survived the count>=3 filter.

t_range_days tuple[float, float] | None

(t_min, t_max) of valid bins in days.

n_main int | None

For bin_and_omori only: how many main shocks were detected.

mu float | None

For bin_and_omori only: mean bin count.

sigma float | None

For bin_and_omori only: std bin count.

threshold float | None

For bin_and_omori only: mu + sigma_k * sigma.

error str | None

Error string when fit failed.

extra dict[str, Any]

Misc fields preserved for backward-compat.

bin_and_omori_from_events

bin_and_omori_from_events(event_times_sec: ndarray, bin_seconds: float = 60.0, sigma_k: float = 3.0, window_bins: int = 60) -> OmoriResult

Discrete-time Omori detector for event streams without main/aftershock labels.

Pipeline:

  1. Bin events into windows of bin_seconds.
  2. Detect main shocks where count > mu + sigma_k * sigma.
  3. Stack post-shock counts in window_bins succeeding bins.
  4. Fit log(rate - mu) = logK - p * log(tau + 1) on positive-excess bins.

Parameters:

Name Type Description Default
event_times_sec ndarray

1-D array of event timestamps in seconds.

required
bin_seconds float

Bin width in seconds.

60.0
sigma_k float

Number of std above mean for main-shock threshold.

3.0
window_bins int

How many bins after a main shock to track.

60

Returns:

Type Description
OmoriResult

OmoriResult dataclass.

Source code in packages/soc-pipeline/src/soc_pipeline/omori.py
def bin_and_omori_from_events(
    event_times_sec: np.ndarray,
    bin_seconds: float = 60.0,
    sigma_k: float = 3.0,
    window_bins: int = 60,
) -> OmoriResult:
    """Discrete-time Omori detector for event streams without main/aftershock labels.

    Pipeline:

    1. Bin events into windows of bin_seconds.
    2. Detect main shocks where count > mu + sigma_k * sigma.
    3. Stack post-shock counts in window_bins succeeding bins.
    4. Fit log(rate - mu) = logK - p * log(tau + 1) on positive-excess bins.

    Args:
        event_times_sec: 1-D array of event timestamps in seconds.
        bin_seconds: Bin width in seconds.
        sigma_k: Number of std above mean for main-shock threshold.
        window_bins: How many bins after a main shock to track.

    Returns:
        OmoriResult dataclass.
    """
    times = np.asarray(event_times_sec, dtype=float)
    if len(times) == 0:
        return OmoriResult(error="no events")

    t0 = times.min()
    span = times.max() - t0
    if span <= 0:
        return OmoriResult(error="zero time span")
    n_bins = int(span / bin_seconds) + 1
    counts = np.zeros(n_bins, dtype=np.int64)
    idx = ((times - t0) / bin_seconds).astype(np.int64)
    idx = np.clip(idx, 0, n_bins - 1)
    np.add.at(counts, idx, 1)

    mu = float(counts.mean())
    sig = float(counts.std())
    threshold = mu + sigma_k * sig
    main_idx = np.where(counts > threshold)[0]
    valid = main_idx[(main_idx + window_bins) < n_bins]
    if len(valid) < 10:
        return OmoriResult(
            n_main=int(len(valid)),
            mu=mu,
            sigma=sig,
            threshold=threshold,
            error=f"too few main shocks ({len(valid)})",
            extra={"n_total_bins": int(n_bins)},
        )

    rates = np.zeros(window_bins)
    for tau in range(1, window_bins + 1):
        rates[tau - 1] = float(counts[valid + tau].mean() - mu)
    tau_b = np.arange(1, window_bins + 1, dtype=float)
    keep = rates > 0
    if keep.sum() < 6:
        return OmoriResult(
            n_main=int(len(valid)),
            mu=mu,
            sigma=sig,
            threshold=threshold,
            error="no excess post-shock rate (Omori absent)",
            extra={"n_pos_excess_bins": int(keep.sum())},
        )

    t_fit = tau_b[keep]
    r_fit = rates[keep]
    x = np.log10(t_fit + 1.0)
    y = np.log10(r_fit)
    slope, intercept = np.polyfit(x, y, 1)
    pred = slope * x + intercept
    ss_res = float(np.sum((y - pred) ** 2))
    ss_tot = float(np.sum((y - y.mean()) ** 2))
    r2 = 1 - ss_res / ss_tot if ss_tot > 0 else None
    return OmoriResult(
        p=float(-slope),
        logK=float(intercept),
        R2=float(r2) if r2 is not None else None,
        n_main=int(len(valid)),
        mu=mu,
        sigma=sig,
        threshold=threshold,
        n_bins_used=int(keep.sum()),
        extra={
            "n_pos_excess_bins": int(keep.sum()),
            "n_total_bins": int(n_bins),
            "bin_seconds": float(bin_seconds),
        },
    )

Gutenberg-Richter b-value

fit_b_value

fit_b_value(magnitudes: ndarray, mc: float | None = None, bin_width: float = 0.1, bootstrap: bool = False, n_boot: int = 500, seed: int = 42) -> BValueResult

Fit a Gutenberg-Richter b-value using Aki 1965 MLE.

Parameters:

Name Type Description Default
magnitudes ndarray

1-D array of earthquake magnitudes.

required
mc float | None

Magnitude of completeness; if None, estimated by max-curvature.

None
bin_width float

Magnitude binning step (used for bias correction).

0.1
bootstrap bool

If True, compute 95% bootstrap CI on b.

False
n_boot int

Number of bootstrap resamples.

500
seed int

RNG seed for the bootstrap.

42

Returns:

Type Description
BValueResult

BValueResult dataclass.

Source code in packages/soc-pipeline/src/soc_pipeline/b_value.py
def fit_b_value(
    magnitudes: np.ndarray,
    mc: float | None = None,
    bin_width: float = 0.1,
    bootstrap: bool = False,
    n_boot: int = 500,
    seed: int = 42,
) -> BValueResult:
    """Fit a Gutenberg-Richter b-value using Aki 1965 MLE.

    Args:
        magnitudes: 1-D array of earthquake magnitudes.
        mc: Magnitude of completeness; if None, estimated by max-curvature.
        bin_width: Magnitude binning step (used for bias correction).
        bootstrap: If True, compute 95% bootstrap CI on b.
        n_boot: Number of bootstrap resamples.
        seed: RNG seed for the bootstrap.

    Returns:
        BValueResult dataclass.
    """
    mags = np.asarray(magnitudes, dtype=float)
    mags = mags[np.isfinite(mags)]
    if len(mags) < 50:
        return BValueResult(error=f"too few magnitudes: {len(mags)}")

    if mc is None:
        mc = estimate_mc_maxc(mags, bin_width=bin_width)
    above = mags[mags >= mc]
    n = int(len(above))
    if n < 50:
        return BValueResult(error=f"too few events above Mc={mc}: n={n}")

    mean_mag = float(np.mean(above))
    b = math.log10(math.e) / (mean_mag - (mc - bin_width / 2))
    var = float(np.sum((above - mean_mag) ** 2) / (n * (n - 1)))
    sigma_b = 2.3 * (b ** 2) * math.sqrt(var)

    ci_low: float | None = None
    ci_high: float | None = None
    if bootstrap:
        rng = np.random.default_rng(seed)
        bs = np.empty(n_boot)
        for i in range(n_boot):
            sample = rng.choice(above, size=n, replace=True)
            ms = float(np.mean(sample))
            bs[i] = math.log10(math.e) / (ms - (mc - bin_width / 2))
        ci_low = float(np.percentile(bs, 2.5))
        ci_high = float(np.percentile(bs, 97.5))

    return BValueResult(
        b=float(b),
        sigma_b=float(sigma_b),
        mc=float(mc),
        n_above_mc=n,
        ci_low=ci_low,
        ci_high=ci_high,
        alpha_equivalent=b_to_clauset_alpha(b),
    )

b_to_clauset_alpha

b_to_clauset_alpha(b: float) -> float

Map G-R b-value to equivalent Clauset alpha on energy.

Energy scaling s = 10^(1.5 M) (Hanks-Kanamori). The CDF transformation gives alpha = 1 + b/1.5.

Source code in packages/soc-pipeline/src/soc_pipeline/b_value.py
def b_to_clauset_alpha(b: float) -> float:
    """Map G-R b-value to equivalent Clauset alpha on energy.

    Energy scaling s = 10^(1.5 M) (Hanks-Kanamori). The CDF transformation
    gives alpha = 1 + b/1.5.
    """
    return 1.0 + b / 1.5

Time-resolution sweep

time_resolution_sweep

time_resolution_sweep(event_times: ndarray, bin_sizes_sec: Sequence[float] = (60.0, 300.0, 900.0, 3600.0, 21600.0, 86400.0), stability_threshold: float = 0.15) -> dict[str, Any]

Bin events at multiple time resolutions and fit alpha to each.

Parameters:

Name Type Description Default
event_times ndarray

1-D array of event timestamps in seconds.

required
bin_sizes_sec Sequence[float]

Sequence of bin sizes (seconds) to sweep.

(60.0, 300.0, 900.0, 3600.0, 21600.0, 86400.0)
stability_threshold float

Max alpha spread (max-min) for "stable" verdict.

0.15

Returns:

Type Description
dict[str, Any]

dict with:

dict[str, Any]
  • sweep: list of {bin_seconds, alpha, n_tail, xmin, error?}
dict[str, Any]
  • alpha_min, alpha_max, alpha_spread
dict[str, Any]
  • is_stable: True if spread < threshold
Notes

Genuinely SOC processes should be roughly bin-invariant. Sharp alpha dependence on bin size suggests artifacts (e.g., Poisson smoothing from undersampling).

Source code in packages/soc-pipeline/src/soc_pipeline/time_resolution.py
def time_resolution_sweep(
    event_times: np.ndarray,
    bin_sizes_sec: Sequence[float] = (60.0, 300.0, 900.0, 3600.0, 21600.0, 86400.0),
    stability_threshold: float = 0.15,
) -> dict[str, Any]:
    """Bin events at multiple time resolutions and fit alpha to each.

    Args:
        event_times: 1-D array of event timestamps in seconds.
        bin_sizes_sec: Sequence of bin sizes (seconds) to sweep.
        stability_threshold: Max alpha spread (max-min) for "stable" verdict.

    Returns:
        dict with:

        - ``sweep``: list of ``{bin_seconds, alpha, n_tail, xmin, error?}``
        - ``alpha_min``, ``alpha_max``, ``alpha_spread``
        - ``is_stable``: True if spread < threshold

    Notes:
        Genuinely SOC processes should be roughly bin-invariant. Sharp
        alpha dependence on bin size suggests artifacts (e.g., Poisson
        smoothing from undersampling).
    """
    times = np.asarray(event_times, dtype=float)
    times = times[np.isfinite(times)]
    if len(times) < 100:
        return {"error": f"too few events: {len(times)}"}

    t0 = times.min()
    span = times.max() - t0
    sweep: list[dict[str, Any]] = []
    alphas: list[float] = []
    for bs in bin_sizes_sec:
        n_bins = int(span / bs) + 1
        if n_bins < 50:
            sweep.append({"bin_seconds": float(bs), "error": f"too few bins: {n_bins}"})
            continue
        counts = np.zeros(n_bins, dtype=np.int64)
        idx = ((times - t0) / bs).astype(np.int64)
        idx = np.clip(idx, 0, n_bins - 1)
        np.add.at(counts, idx, 1)
        nonzero = counts[counts > 0].astype(float)
        if len(nonzero) < 100:
            sweep.append({"bin_seconds": float(bs), "error": f"too few nonzero bins: {len(nonzero)}"})
            continue
        fit = fit_clauset_powerlaw(nonzero, name=f"bin_{bs}", discrete=True)
        entry: dict[str, Any] = {"bin_seconds": float(bs)}
        if fit.error:
            entry["error"] = fit.error
        else:
            entry["alpha"] = fit.alpha
            entry["xmin"] = fit.xmin
            entry["n_tail"] = fit.n_tail
            if fit.alpha is not None:
                alphas.append(fit.alpha)
        sweep.append(entry)

    if not alphas:
        return {"sweep": sweep, "error": "no successful fits"}

    arr = np.asarray(alphas)
    spread = float(arr.max() - arr.min())
    return {
        "sweep": sweep,
        "alpha_min": float(arr.min()),
        "alpha_max": float(arr.max()),
        "alpha_spread": spread,
        "alpha_median": float(np.median(arr)),
        "is_stable": spread < stability_threshold,
    }

Utilities

empirical_ccdf

empirical_ccdf(vals: ndarray, n_points: int = 200) -> tuple[np.ndarray | None, np.ndarray | None]

Compute the complementary CDF P(X > s) on a log-spaced grid.

Parameters:

Name Type Description Default
vals ndarray

1-D array of positive values.

required
n_points int

Number of log-spaced sample points.

200

Returns:

Type Description
ndarray | None

(grid, ccdf) — both shape (n_points,). Returns (None, None) if input

ndarray | None

is empty after filtering for positive finite values.

Source code in packages/soc-pipeline/src/soc_pipeline/utils.py
def empirical_ccdf(
    vals: np.ndarray, n_points: int = 200
) -> tuple[np.ndarray | None, np.ndarray | None]:
    """Compute the complementary CDF P(X > s) on a log-spaced grid.

    Args:
        vals: 1-D array of positive values.
        n_points: Number of log-spaced sample points.

    Returns:
        (grid, ccdf) — both shape (n_points,). Returns (None, None) if input
        is empty after filtering for positive finite values.
    """
    vals = np.asarray(vals, dtype=float)
    vals = vals[np.isfinite(vals) & (vals > 0)]
    if len(vals) == 0:
        return None, None
    vals = np.sort(vals)
    grid = np.geomspace(vals.min() * 1.001, vals.max(), n_points)
    n = len(vals)
    ccdf = np.array([(vals > g).sum() / n for g in grid])
    return grid, ccdf

verdict_from_alpha_band

verdict_from_alpha_band(alpha: float | None, predicted: tuple[float, float], literature: tuple[float, float] | None = None) -> str

Standard 3-tier SOC verdict.

Parameters:

Name Type Description Default
alpha float | None

Fitted alpha value or None.

required
predicted tuple[float, float]

(low, high) first-principles prediction band.

required
literature tuple[float, float] | None

(low, high) wider literature-acceptance band. Optional.

None

Returns:

Type Description
str

'CONFIRMED' | 'CONFIRMED (literature band)' | 'DEVIATING' | 'INCONCLUSIVE'

Source code in packages/soc-pipeline/src/soc_pipeline/utils.py
def verdict_from_alpha_band(
    alpha: float | None,
    predicted: tuple[float, float],
    literature: tuple[float, float] | None = None,
) -> str:
    """Standard 3-tier SOC verdict.

    Args:
        alpha: Fitted alpha value or None.
        predicted: (low, high) first-principles prediction band.
        literature: (low, high) wider literature-acceptance band. Optional.

    Returns:
        'CONFIRMED' | 'CONFIRMED (literature band)' | 'DEVIATING' | 'INCONCLUSIVE'
    """
    if alpha is None:
        return "INCONCLUSIVE"
    if predicted[0] <= alpha <= predicted[1]:
        return "CONFIRMED"
    if literature and literature[0] <= alpha <= literature[1]:
        return "CONFIRMED (literature band)"
    return "DEVIATING"

Pandas accessor

The package registers a .soc accessor on pandas.Series:

import pandas as pd
import soc_pipeline  # registers .soc accessor as side-effect

s = pd.Series([...])
v = s.soc.validate(label="my_series", expected_band=(2.0, 3.0))

SocAccessor

SocAccessor(pandas_obj: Series)

Pandas Series .soc accessor — see module docstring for usage.

Source code in packages/soc-pipeline/src/soc_pipeline/pandas_accessor.py
def __init__(self, pandas_obj: pd.Series) -> None:
    self._validate_dtype(pandas_obj)
    self._obj = pandas_obj

validate

validate(label: str | None = None, expected_band: tuple[float, float] | None = None, n_boot: int | None = 200, discrete: bool = False, seed: int = 42, min_samples: int = 100) -> Verdict

Full SOC validation on this series. See module-level validate().

Source code in packages/soc-pipeline/src/soc_pipeline/pandas_accessor.py
def validate(
    self,
    label: str | None = None,
    expected_band: tuple[float, float] | None = None,
    n_boot: int | None = 200,
    discrete: bool = False,
    seed: int = 42,
    min_samples: int = 100,
) -> Verdict:
    """Full SOC validation on this series. See module-level `validate()`."""
    name = label or (self._obj.name if self._obj.name is not None else "series")
    return validate(
        self._obj.values,
        label=str(name),
        expected_band=expected_band,
        n_boot=n_boot,
        discrete=discrete,
        seed=seed,
        min_samples=min_samples,
    )

fit_alpha

fit_alpha(discrete: bool = False, min_samples: int = 100) -> float | None

Convenience: return just the point-estimate alpha (skip bootstrap).

Source code in packages/soc-pipeline/src/soc_pipeline/pandas_accessor.py
def fit_alpha(self, discrete: bool = False, min_samples: int = 100) -> float | None:
    """Convenience: return just the point-estimate alpha (skip bootstrap)."""
    v = self.validate(n_boot=0, discrete=discrete, min_samples=min_samples)
    return v.alpha

is_pass

is_pass(expected_band: tuple[float, float], n_boot: int | None = 0, discrete: bool = False) -> bool

Convenience: True iff verdict is PASS.

Source code in packages/soc-pipeline/src/soc_pipeline/pandas_accessor.py
def is_pass(
    self,
    expected_band: tuple[float, float],
    n_boot: int | None = 0,
    discrete: bool = False,
) -> bool:
    """Convenience: True iff verdict is PASS."""
    return (
        self.validate(
            expected_band=expected_band, n_boot=n_boot, discrete=discrete
        ).verdict
        == "PASS"
    )