"""Credit Enhancement Collateral (CEC) contract implementation.
This module implements credit enhancement collateral contracts that track
collateral value versus covered contract exposure. Similar to margin accounts,
CEC contracts compare collateral provided by covering contracts against the
exposure from covered contracts.
Key Features:
- Two sets of contracts: covered and covering
- Covering contracts provide collateral value
- Covered contracts represent exposure
- Compares collateral vs required amount
- Releases excess or seizes shortfall
- Credit event triggers evaluation
Example:
>>> from jactus.contracts import CreditEnhancementCollateralContract
>>> from jactus.core import ContractAttributes, ActusDateTime
>>> from jactus.observers import ConstantRiskFactorObserver, MockChildContractObserver
>>>
>>> # Create collateral contract for a loan
>>> attrs = ContractAttributes(
... contract_id="CEC-001",
... contract_type=ContractType.CEC,
... contract_role=ContractRole.RPA,
... status_date=ActusDateTime(2024, 1, 1, 0, 0, 0),
... maturity_date=ActusDateTime(2029, 1, 1, 0, 0, 0),
... coverage=1.2, # 120% collateral requirement
... credit_enhancement_guarantee_extent="NO", # Notional only
... contract_structure='{"CoveredContract": "LOAN-001", "CoveringContract": "STK-001"}',
... )
>>> rf_obs = ConstantRiskFactorObserver(0.03)
>>> child_obs = MockChildContractObserver()
>>> cec = CreditEnhancementCollateralContract(attrs, rf_obs, child_obs)
>>> cashflows = cec.simulate(rf_obs, child_obs)
References:
ACTUS Technical Specification v1.1, Section 7.18
"""
import json
from typing import Any
import jax.numpy as jnp
from jactus.contracts.base import BaseContract, SimulationHistory
from jactus.core import (
ActusDateTime,
ContractAttributes,
ContractEvent,
ContractPerformance,
ContractState,
ContractType,
EventSchedule,
EventType,
)
from jactus.core.types import DayCountConvention
from jactus.functions import BasePayoffFunction, BaseStateTransitionFunction
from jactus.observers import ChildContractObserver, RiskFactorObserver
from jactus.observers.behavioral import BehaviorRiskFactorObserver
from jactus.observers.child_contract import SimulatedChildContractObserver
from jactus.observers.scenario import Scenario
[docs]
class CECPayoffFunction(BasePayoffFunction):
"""Payoff function for CEC contracts.
CEC payoffs represent collateral settlement (return or seizure).
"""
[docs]
def calculate_payoff(
self,
event_type: EventType,
state: ContractState,
attributes: ContractAttributes,
time: ActusDateTime,
risk_factor_observer: RiskFactorObserver,
) -> jnp.ndarray:
"""Calculate payoff for collateral events.
Args:
event_type: Type of event
state: Current contract state
attributes: Contract attributes
time: Event time
risk_factor_observer: Risk factor observer
Returns:
Payoff amount (collateral settlement)
"""
# All payoffs are calculated in the event schedule
# based on covered and covering contract states
return jnp.array(0.0, dtype=jnp.float32)
[docs]
class CECStateTransitionFunction(BaseStateTransitionFunction):
"""State transition function for CEC contracts.
CEC state tracks collateral value vs exposure.
"""
[docs]
def transition_state(
self,
event_type: EventType,
state_pre: ContractState,
attributes: ContractAttributes,
time: ActusDateTime,
risk_factor_observer: RiskFactorObserver,
) -> ContractState:
"""Calculate state transition for collateral events.
Args:
event_type: Type of event
state_pre: State before event
attributes: Contract attributes
time: Event time
risk_factor_observer: Risk factor observer
Returns:
Updated contract state
"""
# State updates handled per event type
# Most state is in the child contracts
return ContractState(
tmd=state_pre.tmd,
sd=time,
nt=state_pre.nt,
ipnr=state_pre.ipnr,
ipac=state_pre.ipac,
feac=state_pre.feac,
nsc=state_pre.nsc,
isc=state_pre.isc,
prf=state_pre.prf,
)
[docs]
class CreditEnhancementCollateralContract(BaseContract):
"""Credit Enhancement Collateral (CEC) contract.
A collateral contract that tracks collateral value from covering contracts
against exposure from covered contracts. When credit events occur or at
maturity, compares collateral vs required amount and settles appropriately.
Attributes:
attributes: Contract terms and conditions
risk_factor_observer: Observer for market rates
child_contract_observer: Observer for covered/covering contracts (required)
"""
[docs]
def __init__(
self,
attributes: ContractAttributes,
risk_factor_observer: RiskFactorObserver,
child_contract_observer: ChildContractObserver | None = None,
):
"""Initialize CEC contract.
Args:
attributes: Contract attributes
risk_factor_observer: Observer for market data
child_contract_observer: Observer for covered/covering contracts (required)
Raises:
ValueError: If required attributes are missing or invalid
"""
# Validate contract type
if attributes.contract_type != ContractType.CEC:
raise ValueError(f"Expected contract_type=CEC, got {attributes.contract_type}")
# Validate child contract observer is provided
if child_contract_observer is None:
raise ValueError("child_contract_observer is required for CEC contracts")
# Validate contract structure contains both covered and covering references
if attributes.contract_structure is None:
raise ValueError(
"contract_structure (CTST) is required and must contain "
"CoveredContract and CoveringContract references"
)
# Parse contract structure (JSON string)
try:
ctst = json.loads(attributes.contract_structure)
except (json.JSONDecodeError, TypeError) as e:
raise ValueError(f"contract_structure must be valid JSON: {e}") from e
if not isinstance(ctst, dict):
raise ValueError("contract_structure must be a JSON object (dictionary)")
# Check for both covered and covering contracts
has_covered = "CoveredContract" in ctst or "CoveredContracts" in ctst
has_covering = "CoveringContract" in ctst or "CoveringContracts" in ctst
if not has_covered:
raise ValueError(
"contract_structure must contain 'CoveredContract' or 'CoveredContracts' key"
)
if not has_covering:
raise ValueError(
"contract_structure must contain 'CoveringContract' or 'CoveringContracts' key"
)
# Default coverage to 1.0 (full coverage) if not specified
if attributes.coverage is None:
attributes.coverage = 1.0
# Default credit event type to "DF" if not specified
if attributes.credit_event_type is None:
attributes.credit_event_type = ContractPerformance("DF")
# Default guarantee extent to "NO" (notional only) if not specified
if attributes.credit_enhancement_guarantee_extent is None:
attributes.credit_enhancement_guarantee_extent = "NO"
if attributes.credit_enhancement_guarantee_extent not in ["NO", "NI", "MV"]:
raise ValueError(
f"credit_enhancement_guarantee_extent must be NO, NI, or MV, "
f"got {attributes.credit_enhancement_guarantee_extent}"
)
super().__init__(attributes, risk_factor_observer, child_contract_observer)
def _parse_contract_structure(self) -> dict[str, Any]:
"""Parse contract_structure JSON string into dictionary.
Returns:
Dictionary with CoveredContract and CoveringContract keys
"""
return dict(json.loads(self.attributes.contract_structure or "{}"))
def _get_covered_contract_ids(self) -> list[str]:
"""Get list of covered contract IDs.
Returns:
List of covered contract IDs
"""
ctst = self._parse_contract_structure()
# Handle single or multiple covered contracts
if "CoveredContract" in ctst:
return [ctst["CoveredContract"]]
if "CoveredContracts" in ctst:
contracts = ctst["CoveredContracts"]
if isinstance(contracts, list):
return contracts
if isinstance(contracts, str):
return [contracts]
raise ValueError(f"CoveredContracts must be list or string, got {type(contracts)}")
raise ValueError("contract_structure must contain CoveredContract or CoveredContracts")
def _get_covering_contract_ids(self) -> list[str]:
"""Get list of covering contract IDs.
Returns:
List of covering contract IDs
"""
ctst = self._parse_contract_structure()
# Handle single or multiple covering contracts
if "CoveringContract" in ctst:
return [ctst["CoveringContract"]]
if "CoveringContracts" in ctst:
contracts = ctst["CoveringContracts"]
if isinstance(contracts, list):
return contracts
if isinstance(contracts, str):
return [contracts]
raise ValueError(f"CoveringContracts must be list or string, got {type(contracts)}")
raise ValueError("contract_structure must contain CoveringContract or CoveringContracts")
def _calculate_collateral_value(self, time: ActusDateTime) -> float:
"""Calculate total collateral value from covering contracts.
Args:
time: Time at which to calculate collateral value
Returns:
Total collateral value
"""
assert self.child_contract_observer is not None
covering_ids = self._get_covering_contract_ids()
total_value = 0.0
for contract_id in covering_ids:
# Query covering contract state
state = self.child_contract_observer.observe_state(
contract_id,
time,
None, # State
None, # Attributes (child has its own)
)
# Use notional as proxy for market value
# In production, would query actual market value
value = float(state.nt) if hasattr(state, "nt") else 0.0
total_value += abs(value)
return total_value
def _calculate_exposure(self, time: ActusDateTime) -> float:
"""Calculate total exposure from covered contracts.
Args:
time: Time at which to calculate exposure
Returns:
Total exposure amount
"""
assert self.child_contract_observer is not None
covered_ids = self._get_covered_contract_ids()
cege = self.attributes.credit_enhancement_guarantee_extent
total_exposure = 0.0
for contract_id in covered_ids:
# Query covered contract state
state = self.child_contract_observer.observe_state(
contract_id,
time,
None, # State
None, # Attributes
)
# Calculate exposure based on CEGE mode
if cege == "NO":
# Notional only
exposure = float(state.nt) if hasattr(state, "nt") else 0.0
elif cege == "NI":
# Notional + interest
nt = float(state.nt) if hasattr(state, "nt") else 0.0
ipac = float(state.ipac) if hasattr(state, "ipac") else 0.0
exposure = nt + ipac
elif cege == "MV":
# Market value (approximated as notional for now)
exposure = float(state.nt) if hasattr(state, "nt") else 0.0
else:
exposure = 0.0
total_exposure += abs(exposure)
return total_exposure
def _check_collateral_sufficiency(self, time: ActusDateTime) -> tuple[bool, float]:
"""Check if collateral is sufficient to cover exposure.
Args:
time: Time at which to check sufficiency
Returns:
Tuple of (is_sufficient, shortfall_or_excess)
- is_sufficient: True if collateral >= required
- shortfall_or_excess: Negative if shortfall, positive if excess
"""
collateral_value = self._calculate_collateral_value(time)
exposure = self._calculate_exposure(time)
coverage_ratio = float(self.attributes.coverage or 0.0)
required_collateral = coverage_ratio * exposure
difference = collateral_value - required_collateral
return (difference >= 0, difference)
[docs]
def generate_event_schedule(self) -> EventSchedule:
"""Generate event schedule for CEC contract.
The schedule includes:
1. Analysis dates (AD) if specified
2. Credit event detection (XD) if covered contract defaults
3. Settlement event (STD) after credit event or at maturity
4. Maturity event (MD) if no credit event occurs
Returns:
EventSchedule with collateral events
"""
events = []
# Add analysis dates if specified
if self.attributes.analysis_dates:
for ad_time in self.attributes.analysis_dates:
events.append(
ContractEvent(
event_type=EventType.AD,
event_time=ad_time,
payoff=jnp.array(0.0, dtype=jnp.float32),
currency=self.attributes.currency or "USD",
)
)
# Check for credit events (simplified - in production would observe from children)
# For now, we don't generate XD/STD events as they're event-driven
# They would be detected during simulation when querying covered contracts
# Add termination date if specified
if self.attributes.termination_date:
events.append(
ContractEvent(
event_type=EventType.TD,
event_time=self.attributes.termination_date,
payoff=jnp.array(0.0, dtype=jnp.float32),
currency=self.attributes.currency or "USD",
)
)
# Add maturity event
if self.attributes.maturity_date:
# At maturity, settle collateral
is_sufficient, difference = self._check_collateral_sufficiency(
self.attributes.maturity_date
)
# Generate STD event for settlement
# Positive payoff = return excess collateral
# Negative payoff = seize shortfall
settlement_amount = min(
self._calculate_collateral_value(self.attributes.maturity_date),
(self.attributes.coverage or 0.0)
* self._calculate_exposure(self.attributes.maturity_date),
)
events.append(
ContractEvent(
event_type=EventType.STD,
event_time=self.attributes.maturity_date,
payoff=jnp.array(settlement_amount, dtype=jnp.float32),
currency=self.attributes.currency or "USD",
)
)
events.append(
ContractEvent(
event_type=EventType.MD,
event_time=self.attributes.maturity_date,
payoff=jnp.array(0.0, dtype=jnp.float32),
currency=self.attributes.currency or "USD",
)
)
# Sort events by time
events.sort(
key=lambda e: (e.event_time.year, e.event_time.month, e.event_time.day, e.sequence)
)
return EventSchedule(
contract_id=self.attributes.contract_id,
events=tuple(events),
)
[docs]
def initialize_state(self) -> ContractState:
"""Initialize contract state at status date.
State includes collateral value compared to required amount.
Returns:
Initial ContractState
"""
# Calculate initial collateral value and exposure
collateral_value = self._calculate_collateral_value(self.attributes.status_date)
exposure = self._calculate_exposure(self.attributes.status_date)
coverage_ratio = float(self.attributes.coverage or 0.0)
# Nt = min(collateral_value, CECV × exposure)
required_collateral = coverage_ratio * exposure
nt = min(collateral_value, required_collateral)
return ContractState(
tmd=self.attributes.maturity_date or self.attributes.status_date,
sd=self.attributes.status_date,
nt=jnp.array(nt, dtype=jnp.float32),
ipnr=jnp.array(0.0, dtype=jnp.float32),
ipac=jnp.array(0.0, dtype=jnp.float32),
feac=jnp.array(0.0, dtype=jnp.float32),
nsc=jnp.array(1.0, dtype=jnp.float32),
isc=jnp.array(1.0, dtype=jnp.float32),
prf=self.attributes.contract_performance or ContractPerformance.PF,
)
[docs]
def get_payoff_function(self, event_type: Any) -> CECPayoffFunction:
"""Get payoff function for CEC contract.
Args:
event_type: Type of event (not used, kept for interface compatibility)
Returns:
CECPayoffFunction instance
"""
return CECPayoffFunction(
contract_role=self.attributes.contract_role,
currency=self.attributes.currency,
)
[docs]
def get_state_transition_function(self, event_type: Any) -> CECStateTransitionFunction:
"""Get state transition function for CEC contract.
Args:
event_type: Type of event (not used, kept for interface compatibility)
Returns:
CECStateTransitionFunction instance
"""
return CECStateTransitionFunction()
def _get_child_dcc(self, child_id: str) -> DayCountConvention:
"""Get day count convention for a child contract."""
if isinstance(self.child_contract_observer, SimulatedChildContractObserver):
try:
child_attrs = self.child_contract_observer._attributes.get(child_id)
if child_attrs and child_attrs.day_count_convention:
return child_attrs.day_count_convention
except (AttributeError, KeyError):
pass
return DayCountConvention.A365
def _calculate_coverage_with_accrual(self, time: ActusDateTime) -> float:
"""Calculate coverage amount with proper accrued interest for NI mode."""
from jactus.utilities.conventions import year_fraction
assert self.child_contract_observer is not None
covered_ids = self._get_covered_contract_ids()
cege = self.attributes.credit_enhancement_guarantee_extent
coverage_ratio = float(self.attributes.coverage or 0.0)
total = 0.0
for cid in covered_ids:
try:
state = self.child_contract_observer.observe_state(cid, time, None, None)
except (KeyError, ValueError):
continue
nt = abs(float(state.nt))
if cege == "NI":
ipnr = abs(float(state.ipnr)) if state.ipnr is not None else 0.0
ipac = abs(float(state.ipac)) if state.ipac is not None else 0.0
dcc = self._get_child_dcc(cid)
yf = year_fraction(state.sd, time, dcc)
accrued = ipac + yf * ipnr * nt
total += nt + accrued
else:
total += nt
return coverage_ratio * total
def _get_collateral_market_value(self, time: ActusDateTime) -> float:
"""Get total collateral value from covering contracts at a given time.
For COM-backed collateral, queries market price * quantity.
For other types, uses the child's notional.
"""
assert self.child_contract_observer is not None
covering_ids = self._get_covering_contract_ids()
total = 0.0
for cid in covering_ids:
child_attrs: ContractAttributes | None = None
if isinstance(self.child_contract_observer, SimulatedChildContractObserver):
try:
child_attrs = self.child_contract_observer._attributes.get(cid)
except (AttributeError, KeyError):
child_attrs = None
# For COM contracts, get market value = quantity * market_price
ct_val = (
getattr(
getattr(child_attrs, "contract_type", None),
"value",
str(getattr(child_attrs, "contract_type", "")),
)
if child_attrs
else ""
)
if child_attrs and ct_val == "COM":
moc = getattr(child_attrs, "market_object_code", None)
qty = float(getattr(child_attrs, "quantity", 1) or 1)
if moc:
try:
price = float(
self.risk_factor_observer.observe_risk_factor(moc, time, None, None)
)
total += abs(qty * price)
continue
except Exception:
pass
# Fallback: use child state notional
try:
state = self.child_contract_observer.observe_state(cid, time, None, None)
total += abs(float(state.nt))
except (KeyError, ValueError):
pass
return total
def _adjust_business_day(self, time: ActusDateTime) -> ActusDateTime:
"""Adjust date to next business day if on weekend."""
from jactus.core.types import Calendar
calendar = self.attributes.calendar
if calendar in (Calendar.NO_CALENDAR, None):
return time
dt = time.to_datetime()
while dt.weekday() >= 5:
from datetime import timedelta
dt += timedelta(days=1)
return ActusDateTime(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second)
def _compute_settlement_time(self, xd_time: ActusDateTime) -> ActusDateTime:
"""Compute settlement time from exercise time + settlement period."""
import re
sp = self.attributes.settlement_period
if not sp:
return self._adjust_business_day(xd_time)
sp_str = sp[1:] if sp.startswith("P") else sp
if "L" in sp_str:
sp_str = sp_str[: sp_str.index("L")]
m = re.match(r"(\d+)([DWMY])", sp_str)
if not m:
return self._adjust_business_day(xd_time)
n, unit = int(m.group(1)), m.group(2)
if n == 0:
return self._adjust_business_day(xd_time)
from dateutil.relativedelta import relativedelta
xd_py = xd_time.to_datetime()
delta_map = {
"D": relativedelta(days=n),
"W": relativedelta(weeks=n),
"M": relativedelta(months=n),
"Y": relativedelta(years=n),
}
std_py = xd_py + delta_map.get(unit, relativedelta())
result = ActusDateTime(
std_py.year, std_py.month, std_py.day, std_py.hour, std_py.minute, std_py.second
)
return self._adjust_business_day(result)
[docs]
def simulate(
self,
risk_factor_observer: RiskFactorObserver | None = None,
child_contract_observer: ChildContractObserver | None = None,
scenario: Scenario | None = None, # noqa: ARG002
behavior_observers: list[BehaviorRiskFactorObserver] | None = None, # noqa: ARG002
) -> SimulationHistory:
"""Simulate CEC contract with comprehensive event generation.
Generates XD, STD, and MD events based on covered/covering contract
states and credit events observed through the child observer.
"""
assert self.child_contract_observer is not None
role_sign = self.attributes.contract_role.get_sign()
currency = self.attributes.currency or "USD"
events: list[ContractEvent] = []
covered_ids = self._get_covered_contract_ids()
# Determine effective maturity from covered children
effective_maturity = self.attributes.maturity_date
if effective_maturity is None and isinstance(
self.child_contract_observer, SimulatedChildContractObserver
):
for cid in covered_ids:
try:
child_attrs = self.child_contract_observer._attributes.get(cid)
if child_attrs and child_attrs.maturity_date:
child_md = child_attrs.maturity_date
if effective_maturity is None or child_md > effective_maturity:
effective_maturity = child_md
except (AttributeError, KeyError):
pass
# Calculate coverage at a time when children are funded
coverage_time = self.attributes.purchase_date or self.attributes.status_date
# For CNT children whose IED > status_date, try using IED
if isinstance(self.child_contract_observer, SimulatedChildContractObserver):
for cid in covered_ids:
try:
child_attrs = self.child_contract_observer._attributes.get(cid)
if child_attrs and child_attrs.initial_exchange_date:
ied = child_attrs.initial_exchange_date
if ied > coverage_time:
coverage_time = ied
break
except (AttributeError, KeyError):
pass
coverage_amount = self._calculate_coverage_with_accrual(coverage_time)
current_nt = role_sign * coverage_amount
def _make_state(
time: ActusDateTime,
nt: float,
prf: ContractPerformance = ContractPerformance.PF,
) -> ContractState:
return ContractState(
tmd=effective_maturity or time,
sd=time,
nt=jnp.array(nt, dtype=jnp.float32),
ipnr=jnp.array(0.0, dtype=jnp.float32),
ipac=jnp.array(0.0, dtype=jnp.float32),
feac=jnp.array(0.0, dtype=jnp.float32),
nsc=jnp.array(1.0, dtype=jnp.float32),
isc=jnp.array(1.0, dtype=jnp.float32),
prf=prf,
)
exercised = False
# Detect credit events from child observer
target_perf = self.attributes.credit_event_type or "DF"
ce_time = None
for cid in covered_ids:
try:
child_events = self.child_contract_observer.observe_events(
cid, self.attributes.status_date, None
)
except (KeyError, ValueError):
continue
for ce in child_events:
if ce.event_type == EventType.CE and ce.state_post is not None:
prf_match = str(ce.state_post.prf) == target_perf or (
hasattr(ce.state_post.prf, "value")
and ce.state_post.prf.value == target_perf
)
if not prf_match:
continue
if effective_maturity and ce.event_time > effective_maturity:
continue
ce_time = ce.event_time
break
if ce_time:
break
if ce_time is not None:
exercised = True
# Calculate coverage and collateral at CE time
ce_coverage = self._calculate_coverage_with_accrual(ce_time)
collateral_value = self._get_collateral_market_value(ce_time)
exercise_amount = min(ce_coverage, collateral_value)
xd_nt = role_sign * ce_coverage
xd_state = _make_state(ce_time, xd_nt)
events.append(
ContractEvent(
event_type=EventType.XD,
event_time=ce_time,
payoff=jnp.array(0.0, dtype=jnp.float32),
currency=currency,
state_pre=xd_state,
state_post=xd_state,
)
)
std_time = self._compute_settlement_time(ce_time)
std_payoff = role_sign * exercise_amount
std_state = _make_state(std_time, 0.0)
events.append(
ContractEvent(
event_type=EventType.STD,
event_time=std_time,
payoff=jnp.array(std_payoff, dtype=jnp.float32),
currency=currency,
state_pre=xd_state,
state_post=std_state,
)
)
# MD event at effective maturity (if not exercised)
if effective_maturity and not exercised:
md_state = _make_state(effective_maturity, 0.0)
events.append(
ContractEvent(
event_type=EventType.MD,
event_time=effective_maturity,
payoff=jnp.array(0.0, dtype=jnp.float32),
currency=currency,
state_pre=_make_state(effective_maturity, current_nt),
state_post=md_state,
)
)
events.sort(
key=lambda e: (e.event_time.year, e.event_time.month, e.event_time.day, e.sequence)
)
initial_state = _make_state(self.attributes.status_date, current_nt)
states = [e.state_post for e in events if e.state_post is not None]
final_state = states[-1] if states else initial_state
return SimulationHistory(
events=events,
states=states,
initial_state=initial_state,
final_state=final_state,
)