Source code for unravel.soccer.models.pressing_intensity

import numpy as np
import polars as pl

from dataclasses import dataclass, field

from typing import Literal, List, Union

from ..dataset.kloppy_polars import (
    KloppyPolarsDataset,
    MetricPitchDimensions,
    Group,
    Column,
    Constant,
)

from .utils import time_to_intercept, probability_to_intercept


[docs] @dataclass class PressingIntensity: """Compute pressing intensity metrics for soccer tracking data. Pressing Intensity quantifies the defensive pressure applied to ball carriers by measuring spatial coverage, defender proximity, and velocity components. The metric computes time-to-intercept and probability-to-intercept matrices between players, capturing how effectively defenders can close down passing options. The model outputs two matrices per frame: - **Time-to-Intercept (TTI)**: Time in seconds for each defender to reach each attacker, accounting for positions, velocities, and reaction time. - **Probability-to-Intercept (PTI)**: Probability (0-1) that a defender can successfully press each attacker, derived from TTI using a sigmoid function. These matrices enable analysis of: - Defensive compactness and coverage - Pressing triggers and coordination - Passing lane availability - Individual pressing effectiveness The implementation is based on tracking data research and extends concepts from pitch control and space occupation models. Args: dataset (KloppyPolarsDataset): Dataset containing soccer tracking data with positions, velocities, and ball ownership information. chunk_size (int, optional): Number of frames to process in each batch for memory efficiency. Defaults to 20000. Attributes: output (pl.DataFrame): Computed pressing intensity matrices with columns: - frame_id, period_id, timestamp: Frame identifiers - time_to_intercept: List[List[float]] - TTI matrix (rows × columns) - probability_to_intercept: List[List[float]] - PTI matrix (rows × columns) - columns: List[str] - Object IDs for column players (typically attackers) - rows: List[str] - Object IDs for row players (typically defenders) Raises: ValueError: If dataset is not of type KloppyPolarsDataset. Example: >>> from unravel.soccer.dataset import KloppyPolarsDataset >>> from unravel.soccer.models import PressingIntensity >>> from kloppy import datasets >>> >>> # Load tracking data >>> dataset = datasets.load( ... provider="skillcorner", ... match_id="123", ... competition="EPL" ... ) >>> soccer_data = KloppyPolarsDataset(kloppy_dataset=dataset) >>> >>> # Initialize pressing intensity model >>> pi = PressingIntensity(dataset=soccer_data) >>> >>> # Compute pressing intensity for all frames >>> pi.fit( ... method="teams", # 11x11 matrix (attackers × defenders) ... ball_method="max", # Merge ball and ball carrier ... reaction_time=0.7, # 0.7 second defender reaction time ... time_threshold=1.5, # 1.5 second pressing window ... sigma=0.45 # Sigmoid steepness parameter ... ) >>> >>> # Access results >>> print(pi.output) >>> # Shows time_to_intercept and probability_to_intercept matrices per frame >>> >>> # Compute pressing intensity for specific period >>> pi.fit( ... start_time=pl.duration(minutes=0), ... end_time=pl.duration(minutes=5), ... period_id=1, ... method="teams" ... ) Note: - The model requires velocity data. Ensure your dataset has computed velocities via :meth:`KloppyPolarsDataset.load` with appropriate smoothing parameters. - Time-to-intercept assumes defenders accelerate optimally toward attackers from their current positions, bounded by max_player_speed. - Probability values near 1.0 indicate high pressing pressure; values near 0.0 indicate low pressure or distant defenders. See Also: :class:`~unravel.soccer.dataset.KloppyPolarsDataset`: Data loading and preprocessing. :meth:`fit`: Configure and compute pressing intensity metrics. :doc:`../tutorials/pressing_intensity`: Tutorial on pressing intensity analysis. """ dataset: KloppyPolarsDataset chunk_size: int = field(init=True, repr=False, default=2_0000) _method: str = field(init=False, repr=False, default="teams") _ball_method: str = field(init=False, repr=False, default="max") _speed_threshold: float = field(init=False, repr=False, default=None) _reaction_time: float = field(init=False, repr=False, default=0.7) _sigma: float = field(init=False, repr=False, default=0.45) _time_threshold: float = field(init=False, repr=False, default=1.5) _orient: str = field(init=False, repr=False, default="ball_owning") _line_method: str = field(init=False, repr=False, default=None) def __post_init__(self): if not isinstance(self.dataset, KloppyPolarsDataset): raise ValueError("dataset should be of type KloppyPolarsDataset...") self.settings = self.dataset.settings self.dataset = self.dataset.data def __repr__(self): n_frames = ( self.output[Column.FRAME_ID].n_unique() if hasattr(self, "output") else None ) return f"PressingIntensity(n_frames={n_frames})" @property def __exprs_variables(self): return [ Column.X, Column.Y, Column.Z, Column.VX, Column.VY, Column.VZ, Column.SPEED, Column.TEAM_ID, Column.BALL_OWNING_TEAM_ID, Column.OBJECT_ID, Column.IS_BALL_CARRIER, ] def __compute(self, args: List[pl.Series]) -> dict: def _set_minimum(matrix, ball_carrier_idx, ball_idx): # Take the element-wise maximum of the ball carrier and the ball matrix[:, ball_carrier_idx] = np.minimum( matrix[:, ball_carrier_idx], matrix[:, ball_idx] ) # Delete ball column matrix = np.delete(matrix, ball_idx, axis=1) return matrix d = {col: args[i].to_numpy() for i, col in enumerate(self.__exprs_variables)} ball_idx, ball_carrier_idx = None, None if self._ball_method in ["max", "include"]: ball_mask = d[Column.TEAM_ID] == Constant.BALL ball_owning_mask = (d[Column.TEAM_ID] == d[Column.BALL_OWNING_TEAM_ID]) | ( ball_mask ) non_ball_owning_mask = ~ball_owning_mask elif self._ball_method == "exclude": ball_mask = d[Column.TEAM_ID] != Constant.BALL ball_owning_mask = (d[Column.TEAM_ID] == d[Column.BALL_OWNING_TEAM_ID]) & ( ball_mask ) non_ball_owning_mask = ( d[Column.TEAM_ID] != d[Column.BALL_OWNING_TEAM_ID] ) & ball_mask if self._method == "teams": ball_owning_idxs = np.where(ball_owning_mask)[0] non_ball_owning_idxs = np.where(non_ball_owning_mask)[0] if self._ball_method == "max": ball_idx = np.where( d[Column.TEAM_ID][ball_owning_idxs] == Constant.BALL )[0][0] ball_carrier_idx = np.where( d[Column.IS_BALL_CARRIER][ball_owning_idxs] )[0][0] xs1, ys1, zs1 = ( d[Column.X][ball_owning_idxs], d[Column.Y][ball_owning_idxs], d[Column.Z][ball_owning_idxs], ) xs2, ys2, zs2 = ( d[Column.X][non_ball_owning_idxs], d[Column.Y][non_ball_owning_idxs], d[Column.Z][non_ball_owning_idxs], ) vxs1, vys1, vzs1 = ( d[Column.VX][ball_owning_idxs], d[Column.VY][ball_owning_idxs], d[Column.VZ][ball_owning_idxs], ) vxs2, vys2, vzs2 = ( d[Column.VX][non_ball_owning_idxs], d[Column.VY][non_ball_owning_idxs], d[Column.VZ][non_ball_owning_idxs], ) column_objects, row_objects = ( d[Column.OBJECT_ID][ball_owning_idxs], d[Column.OBJECT_ID][non_ball_owning_idxs], ) if self._speed_threshold: column_mask = d[Column.SPEED][ball_owning_idxs] < self._speed_threshold row_mask = d[Column.SPEED][non_ball_owning_idxs] < self._speed_threshold elif self._method == "full": if self._ball_method == "exclude": mask = np.where(ball_mask)[0] else: mask = np.where(d[Column.TEAM_ID] == d[Column.TEAM_ID])[0] if self._ball_method == "max": ball_idx = np.where(ball_mask)[0][0] ball_carrier_idx = np.where(d[Column.IS_BALL_CARRIER][mask])[0][0] xs1, ys1, zs1 = xs2, ys2, zs2 = ( d[Column.X][mask], d[Column.Y][mask], d[Column.Z][mask], ) vxs1, vys1, vzs1 = vxs2, vys2, vzs2 = ( d[Column.VX][mask], d[Column.VY][mask], d[Column.VZ][mask], ) column_objects, row_objects = ( d[Column.OBJECT_ID][mask], d[Column.OBJECT_ID][mask], ) if self._speed_threshold: column_mask = d[Column.SPEED][mask] < self._speed_threshold row_mask = d[Column.SPEED][mask] < self._speed_threshold if ball_idx is not None: column_objects = np.delete(column_objects, ball_idx, axis=0) if self._speed_threshold: column_mask = np.delete(column_mask, ball_idx, axis=0) if self._line_method is not None: if self._line_method == "touchline": pass elif self._line_method == "byline": pass elif self._line_method == "all": pass p1 = np.stack((xs1, ys1, zs1), axis=-1) p2 = np.stack((xs2, ys2, zs2), axis=-1) v1 = np.stack((vxs1, vys1, vzs1), axis=-1) v2 = np.stack((vxs2, vys2, vzs2), axis=-1) tti = time_to_intercept( p1=p1, p2=p2, v1=v1, v2=v2, reaction_time=self._reaction_time, max_object_speed=self.settings.max_player_speed, ) if self._ball_method == "max": tti = _set_minimum( matrix=tti, ball_carrier_idx=ball_carrier_idx, ball_idx=ball_idx ) if self._method == "full": tti = np.delete(tti, ball_idx, axis=0) row_objects = np.delete(row_objects, ball_idx, axis=0) if self._speed_threshold: row_mask = np.delete(row_mask, ball_idx, axis=0) pti = probability_to_intercept( time_to_intercept=tti, tti_sigma=self._sigma, tti_time_threshold=self._time_threshold, ) if self._method == "full": np.fill_diagonal(tti, np.inf) np.fill_diagonal(tti, 0.0) if self._speed_threshold: pti[row_mask, :] = 0.0 pti[:, column_mask] = 0.0 if ( ( (self._orient == "away_home") & (d[Column.BALL_OWNING_TEAM_ID][0] != self.settings.home_team_id) ) | ( (self._orient == "home_away") & (d[Column.BALL_OWNING_TEAM_ID][0] == self.settings.home_team_id) ) | (self._orient == "pressing") ): return { "time_to_intercept": tti.T.tolist(), "probability_to_intercept": pti.T.tolist(), "columns": row_objects.tolist(), "rows": column_objects.tolist(), } return { "time_to_intercept": tti.tolist(), "probability_to_intercept": pti.tolist(), "columns": column_objects.tolist(), "rows": row_objects.tolist(), } @property def __get_return_dtype(self): return pl.Struct( { "time_to_intercept": pl.List(pl.List(pl.Float64)), "probability_to_intercept": pl.List(pl.List(pl.Float64)), "columns": pl.List(pl.String), "rows": pl.List(pl.String), } )
[docs] def fit( self, start_time: pl.duration = None, end_time: pl.duration = None, period_id: int = None, speed_threshold: float = None, reaction_time: float = 0.7, time_threshold: float = 1.5, sigma: float = 0.45, method: Literal["teams", "full"] = "teams", ball_method: Literal["include", "exclude", "max"] = "max", orient: Literal[ "ball_owning", "pressing", "home_away", "away_home" ] = "ball_owning", line_method: Union[None, Literal["touchline", "byline", "all"]] = None, ): """Compute pressing intensity metrics for tracking data. Calculates time-to-intercept (TTI) and probability-to-intercept (PTI) matrices quantifying defensive pressure. For each frame, computes how quickly defenders can reach attackers and the likelihood of successful pressing actions. The computation considers: - Player positions and velocities - Reaction time delays - Maximum acceleration capabilities - Ball position and ball carrier proximity Args: start_time (pl.duration, optional): Start time for analysis window. Must be specified together with end_time and period_id. Defaults to None (processes all frames). end_time (pl.duration, optional): End time for analysis window. Defaults to None. period_id (int, optional): Period ID to analyze (e.g., 1 for first half). Defaults to None. speed_threshold (float, optional): Minimum player speed (m/s) to include in pressing calculations. Players below this threshold are masked out (PTI set to 0.0). Useful for analyzing active pressing vs passive coverage. Defaults to None (no filtering). reaction_time (float, optional): Defender reaction time in seconds before accelerating toward target. Models decision-making and perception delay. Defaults to 0.7 seconds. time_threshold (float, optional): Time window (seconds) for pressing opportunities. TTI values beyond this are considered low-pressure situations. Affects sigmoid conversion to probabilities. Defaults to 1.5 seconds. sigma (float, optional): Sigmoid steepness parameter for TTI → PTI conversion. Higher values create sharper transitions between high/low pressure. Defaults to 0.45. method (Literal["teams", "full"], optional): Matrix structure: - "teams": 11×11 matrix (ball-owning team × non-owning team) - "full": 22×22 matrix (all players × all players) Defaults to "teams". ball_method (Literal["include", "exclude", "max"], optional): Ball handling: - "include": Add ball as separate node (creates 11×12 or 22×23 matrix) - "exclude": Ignore ball entirely - "max": Merge ball with ball carrier using max(ball_tti, carrier_tti), preserving matrix dimensions Defaults to "max" (recommended). orient (Literal["ball_owning", "pressing", "home_away", "away_home"], optional): Matrix orientation perspective: - "ball_owning": Rows = ball-owning team, Cols = non-owning team - "pressing": Rows = non-owning team, Cols = ball-owning team (transpose) - "home_away": Rows = home team, Cols = away team - "away_home": Rows = away team, Cols = home team Defaults to "ball_owning". line_method (Union[None, Literal["touchline", "byline", "all"]], optional): Reserved for future development (include pitch boundaries in calculations). Currently has no effect. Defaults to None. Returns: PressingIntensity: Self, with computed results stored in :attr:`output`. Raises: TypeError: If period_id is not an integer. ValueError: If method, ball_method, orient, or line_method have invalid values. TypeError: If reaction_time, speed_threshold, time_threshold, or sigma have invalid types. ValueError: If start_time, end_time, and period_id are partially specified (must be all or none). Example: >>> # Basic usage: compute pressing intensity for all frames >>> pi = PressingIntensity(dataset=soccer_data) >>> pi.fit(method="teams", ball_method="max") >>> print(pi.output.columns) ['frame_id', 'period_id', 'timestamp', 'time_to_intercept', 'probability_to_intercept', 'columns', 'rows'] >>> >>> # Analyze specific time window >>> pi.fit( ... start_time=pl.duration(minutes=10), ... end_time=pl.duration(minutes=15), ... period_id=1, ... method="teams" ... ) >>> >>> # Filter for active pressing (players moving > 2 m/s) >>> pi.fit( ... method="teams", ... speed_threshold=2.0, ... reaction_time=0.5, ... time_threshold=1.0 ... ) >>> >>> # Full 22x22 matrix with ball as separate node >>> pi.fit(method="full", ball_method="include") >>> >>> # Extract pressing intensity for frame 1000 >>> frame_data = pi.output.filter(pl.col("frame_id") == 1000) >>> tti_matrix = np.array(frame_data["time_to_intercept"][0]) >>> pti_matrix = np.array(frame_data["probability_to_intercept"][0]) >>> print(f"Max pressing probability: {pti_matrix.max():.2f}") Note: - Time windows (start_time, end_time, period_id) must be specified together or all set to None. Partial specification raises ValueError. - The output DataFrame contains nested lists for TTI and PTI matrices. Use `.to_numpy()` or indexing to extract arrays for analysis. - Matrix dimensions depend on method and ball_method: - "teams" + "max": 11×11 - "teams" + "include": 11×12 - "full" + "max": 22×22 - "full" + "include": 22×23 - Player IDs in "columns" and "rows" correspond to matrix dimensions and indicate which player occupies each position. See Also: :class:`PressingIntensity`: Class documentation with conceptual overview. :doc:`../tutorials/pressing_intensity`: Complete tutorial with visualizations. """ if period_id is not None and not isinstance(period_id, int): raise TypeError("period_id should be of type integer") if method not in ["teams", "full"]: raise ValueError("method should be 'teams' or 'full'") if ball_method not in ["include", "exclude", "max"]: raise ValueError("ball_method should be 'include', 'exclude' or 'max'") if orient not in ["ball_owning", "pressing", "home_away", "away_home"]: raise ValueError( "orient should be 'ball_owning', 'pressing', 'home_away', 'away_home'" ) if line_method is not None and line_method not in [ "touchline", "byline", "all", ]: raise ValueError( "line_method should be 'touchline', 'byline', 'all' or None" ) if not isinstance(reaction_time, Union[float, int]): raise TypeError("reaction_time should be of type float") if speed_threshold is not None and not isinstance( speed_threshold, Union[float, int] ): raise TypeError("speed_threshold should be of type float (or None)") if not isinstance(time_threshold, Union[float, int]): raise TypeError("time_threshold should be of type float") if not isinstance(sigma, Union[float, int]): raise TypeError("sigma should be of type float") self._method = method self._ball_method = ball_method self._speed_threshold = speed_threshold self._reaction_time = reaction_time self._time_threshold = time_threshold self._sigma = sigma self._orient = orient self._line_method = line_method if all(x is None for x in [start_time, end_time, period_id]): df = self.dataset elif all(x is not None for x in [start_time, end_time, period_id]): df = self.dataset.filter( (pl.col(Column.TIMESTAMP).is_between(start_time, end_time)) & (pl.col(Column.PERIOD_ID) == period_id) ) else: raise ValueError( "Please specificy all of start_time, end_time and period_id or none of them..." ) sort_descending = [False] * len(Group.BY_TIMESTAMP) if self._orient in ["home_away", "away_home"]: alias = "is_home" sort_by = Group.BY_TIMESTAMP + [alias] sort_descending = sort_descending + ( [True] if self._orient == "home_away" else [False] ) with_columns = [ pl.when(pl.col(Column.TEAM_ID) == self.settings.home_team_id) .then(True) .when(pl.col(Column.TEAM_ID) == Constant.BALL) .then(None) .otherwise(False) .alias(alias) ] elif self._orient in ["ball_owning", "pressing"]: alias = "is_ball_owning" sort_by = Group.BY_TIMESTAMP + [alias] sort_descending = sort_descending + ( [True] if self._orient == "ball_owning" else [False] ) with_columns = [ pl.when(pl.col(Column.TEAM_ID) == pl.col(Column.BALL_OWNING_TEAM_ID)) .then(True) .when(pl.col(Column.TEAM_ID) == Constant.BALL) .then(None) .otherwise(False) .alias(alias) ] self.output = ( df.with_columns(with_columns) .sort(by=sort_by, descending=sort_descending, nulls_last=True) .group_by(Group.BY_TIMESTAMP, maintain_order=True) .agg( pl.map_groups( exprs=self.__exprs_variables, function=self.__compute, return_dtype=self.__get_return_dtype, returns_scalar=True, ).alias("results") ) .unnest("results") ) return self