import numpy as np
from scipy import sparse
from dataclasses import dataclass
from typing import Literal
class AdjacenyMatrixConnectType:
"""
BALL: all players are connected to the ball
BALL_CARRIER: only ball carrier is connected to the ball
NO_CONNECTION: no connection between ball and players
"""
BALL = "ball"
BALL_CARRIER = "ball_carrier"
NO_CONNECTION = "no_connection"
class AdjacencyMatrixType:
"""
DELAUNAY: connect via Delaunay matrix (https://en.wikipedia.org/wiki/Delaunay_triangulation)
SPLIT_BY_TEAM: connect all players from a team with all players on the same team
DENSE: connect all players
DENSE_ATTACKING_PLAYERS: connect only the attacking team
DENSE_DEFENSIVE_PLAYERS: connect only the defending team
"""
DELAUNAY = "delaunay"
SPLIT_BY_TEAM = "split_by_team"
DENSE = "dense"
DENSE_AP = "dense_ap"
DENSE_DP = "dense_dp"
class PredictionLabelType:
BINARY = "binary"
@dataclass
class Pad:
max_nodes: int
max_edges: int
n_players: int = 11
[docs]
def graph_feature(feature_type: Literal["edge", "node"], is_custom: bool = False):
"""
A decorator factory that returns a decorator function.
Args:
feature_type: The type of feature ("edge" or "node")
is_custom: Whether this is a custom feature
Returns:
A decorator function that will mark the decorated function
"""
def decorator(func):
"""
The actual decorator that marks a function.
Args:
func: The function to be decorated
Returns:
The original function with marker attributes added
"""
func.is_custom = is_custom
func.feature_type = feature_type
return func
return decorator
def normalize_angles(angle):
old_max = np.pi
old_min = -np.pi
new_max = 1
new_min = 0
old_range = old_max - old_min
new_range = new_max - new_min
return (((angle - old_min) * new_range) / old_range) + new_min
def normalize_between(min_value, max_value, value):
return (value - min_value) / (max_value - min_value)
def normalize_distance(value, max_distance):
return value / max_distance
def unit_vector(vector):
norm = np.linalg.norm(vector)
if norm == 0:
return np.zeros_like(vector, dtype=float)
return vector / norm
def unit_vectors(vectors):
magnitudes = np.linalg.norm(vectors, axis=1, keepdims=True)
magnitudes[magnitudes == 0] = 1
unit_vectors = vectors / magnitudes
return unit_vectors
def normalize_coords(value, max_value):
return value / max_value
def normalize_sincos(value):
return (value + 1) / 2
def angle_between(v):
"""Returns the angle in radians between vectors 'v1' and 'v2'::
# >>> angle_between((1, 0, 0), (0, 1, 0))
# 1.5707963267948966
# >>> angle_between((1, 0, 0), (1, 0, 0))
# 0.0
# >>> angle_between((1, 0, 0), (-1, 0, 0))
3.141592653589793
"""
v1 = v[0:2]
v2 = v[2:4]
v1_u = unit_vector(v1)
v2_u = unit_vector(v2)
return np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))
def non_zeros(A):
nonzero_idxs = np.where(A == 1)
nonzero_a = np.count_nonzero(A)
return nonzero_idxs, nonzero_a
def reindex(m, non_zero_idxs, len_a):
return m[non_zero_idxs].reshape(len_a, 1)
def make_sparse(a):
A = sparse.csr_matrix(a)
return np.nan_to_num(A)
def unit_vector_from_angle(value, angle_radians):
# Compute velocity components
value = np.nan_to_num(value, nan=0.0)
angle_radians = np.nan_to_num(angle_radians, nan=0.0)
v_x = value * np.cos(angle_radians)
v_y = value * np.sin(angle_radians)
# Create velocity vector
velocity = np.array([v_x, v_y])
# Normalize the vector (get unit vector)
norm = np.linalg.norm(velocity)
if norm == 0:
return np.zeros_like(velocity, dtype=float)
return velocity / norm
def normalize_speed(value, max_speed):
x = value / max_speed
return np.clip(x, 0, 1)
def normalize_acceleration(value, max_acceleration):
x = value / max_acceleration
return np.clip(x, -1, 1)
def normalize_speeds(v, team_id, ball_id, settings):
ball_mask = team_id == ball_id
s_normed = np.zeros_like(v, dtype=float)
s_normed[ball_mask] = normalize_speed(v[ball_mask], settings.max_ball_speed)
s_normed[~ball_mask] = normalize_speed(v[~ball_mask], settings.max_player_speed)
return s_normed
def normalize_speed_differences(v, team_id, ball_id, settings, **kwargs):
return normalize_speeds(v, team_id, ball_id, settings) * np.sign(v)
def normalize_accelerations_nfl(acceleration, team, ball_id, settings):
ball_mask = team == ball_id
a_normed = np.zeros_like(acceleration, dtype=float)
a_normed[ball_mask] = normalize_acceleration(
acceleration[ball_mask], settings.max_ball_acceleration
)
a_normed[~ball_mask] = normalize_acceleration(
acceleration[~ball_mask], settings.max_player_acceleration
)
return a_normed
def flatten_to_reshaped_array(arr, s0, s1, as_list=False):
# Convert the structure into a list of arrays
flattened_list = [item for sublist in arr for item in sublist]
# Concatenate the arrays into one single array
result_array = np.concatenate(flattened_list).reshape(s0, s1)
return result_array if not as_list else result_array.tolist()
def flatten_to_reshaped_array(arr, s0, s1, as_list=False):
# Convert to numpy array directly
result_array = np.array(arr).reshape(s0, s1)
return result_array if not as_list else result_array.tolist()
def reshape_array(arr):
return np.array([a for a in arr.to_numpy()])
def reshape_from_size(arr, s0, s1):
return np.array([item for sublist in arr for item in sublist]).reshape(s0, s1)
def distance_to_ball(
x: np.array, y: np.array, team: np.array, ball_id: str, z: np.array = None
):
if z is not None:
position = np.stack((x, y, z), axis=-1)
else:
position = np.stack((x, y), axis=-1)
if np.where(team == ball_id)[0].size >= 1:
ball_index = np.where(team == ball_id)[0]
ball_position = position[ball_index][0]
else:
if z is not None:
ball_position = np.asarray([0.0, 0.0, 0.0])
else:
ball_position = np.asarray([0.0, 0.0])
dist_to_ball = np.linalg.norm(position - ball_position, axis=1)
return position, ball_position, dist_to_ball
def get_ball_carrier_idx(x, y, z, team, possession_team, ball_id, threshold):
_, _, dist_to_ball = distance_to_ball(x=x, y=y, z=z, team=team, ball_id=ball_id)
filtered_distances = np.where(
(team != possession_team) | (dist_to_ball <= threshold), np.inf, dist_to_ball
)
ball_carrier_idx = (
np.argmin(filtered_distances) if np.isfinite(filtered_distances).any() else None
)
return ball_carrier_idx