|
30 | 30 | class RollRule: |
31 | 31 | offset_business_days: int |
32 | 32 | anchor: str |
| 33 | + contract_months: Optional[Tuple[int, ...]] = None |
| 34 | + |
| 35 | + |
| 36 | +_DEFAULT_CONTRACT_MONTHS: Tuple[int, ...] = (3, 6, 9, 12) |
33 | 37 |
|
34 | 38 |
|
35 | 39 | ROLL_RULES: Dict[str, RollRule] = { |
36 | | - symbol: RollRule(offset_business_days=8, anchor="third_friday") |
| 40 | + symbol: RollRule(offset_business_days=8, anchor="third_friday", contract_months=_DEFAULT_CONTRACT_MONTHS) |
37 | 41 | for symbol in {"ES", "MES", "NQ", "MNQ", "YM", "MYM"} |
38 | 42 | } |
39 | 43 |
|
| 44 | +ROLL_RULES.update( |
| 45 | + { |
| 46 | + "GC": RollRule( |
| 47 | + offset_business_days=7, |
| 48 | + anchor="third_last_business_day", |
| 49 | + contract_months=(2, 4, 6, 8, 10, 12), |
| 50 | + ), |
| 51 | + "SI": RollRule( |
| 52 | + offset_business_days=7, |
| 53 | + anchor="third_last_business_day", |
| 54 | + contract_months=(1, 3, 5, 7, 9, 12), |
| 55 | + ), |
| 56 | + } |
| 57 | +) |
| 58 | + |
40 | 59 | YearMonth = Tuple[int, int] |
41 | 60 |
|
42 | 61 |
|
@@ -72,25 +91,61 @@ def _subtract_business_days(dt: datetime, days: int) -> datetime: |
72 | 91 | return result |
73 | 92 |
|
74 | 93 |
|
| 94 | +def _third_last_business_day(year: int, month: int) -> datetime: |
| 95 | + if month == 12: |
| 96 | + next_month = 1 |
| 97 | + next_year = year + 1 |
| 98 | + else: |
| 99 | + next_month = month + 1 |
| 100 | + next_year = year |
| 101 | + |
| 102 | + last_day = _to_timezone(datetime(next_year, next_month, 1)) - timedelta(days=1) |
| 103 | + |
| 104 | + remaining = 3 |
| 105 | + cursor = last_day |
| 106 | + while remaining > 0: |
| 107 | + if cursor.weekday() < 5: |
| 108 | + remaining -= 1 |
| 109 | + if remaining == 0: |
| 110 | + break |
| 111 | + cursor -= timedelta(days=1) |
| 112 | + return cursor.replace(hour=0, minute=0, second=0, microsecond=0) |
| 113 | + |
| 114 | + |
75 | 115 | def _calculate_roll_trigger(year: int, month: int, rule: RollRule) -> datetime: |
76 | 116 | if rule.anchor == "third_friday": |
77 | 117 | anchor = _third_friday(year, month) |
| 118 | + elif rule.anchor == "third_last_business_day": |
| 119 | + anchor = _third_last_business_day(year, month) |
78 | 120 | else: |
79 | 121 | anchor = _to_timezone(datetime(year, month, 15)) |
80 | 122 | if rule.offset_business_days <= 0: |
81 | 123 | return anchor |
82 | 124 | return _subtract_business_days(anchor, rule.offset_business_days) |
83 | 125 |
|
84 | 126 |
|
85 | | -def _advance_quarter(current_month: int, current_year: int) -> YearMonth: |
86 | | - quarter_months = [3, 6, 9, 12] |
87 | | - idx = quarter_months.index(current_month) |
88 | | - next_idx = (idx + 1) % len(quarter_months) |
89 | | - next_month = quarter_months[next_idx] |
90 | | - next_year = current_year + (1 if next_idx == 0 else 0) |
| 127 | +def _get_contract_months(rule: Optional[RollRule]) -> Tuple[int, ...]: |
| 128 | + if rule and rule.contract_months: |
| 129 | + return tuple(sorted(rule.contract_months)) |
| 130 | + return _DEFAULT_CONTRACT_MONTHS |
| 131 | + |
| 132 | + |
| 133 | +def _advance_contract(current_month: int, current_year: int, months: Tuple[int, ...]) -> YearMonth: |
| 134 | + months_sorted = tuple(sorted(months)) |
| 135 | + idx = months_sorted.index(current_month) |
| 136 | + next_idx = (idx + 1) % len(months_sorted) |
| 137 | + next_month = months_sorted[next_idx] |
| 138 | + next_year = current_year + (1 if next_idx <= idx else 0) |
91 | 139 | return next_year, next_month |
92 | 140 |
|
93 | 141 |
|
| 142 | +def _select_contract(year: int, month: int, months: Tuple[int, ...]) -> YearMonth: |
| 143 | + for candidate in sorted(months): |
| 144 | + if month <= candidate: |
| 145 | + return year, candidate |
| 146 | + return year + 1, sorted(months)[0] |
| 147 | + |
| 148 | + |
94 | 149 | def _legacy_mid_month(reference_date: datetime) -> YearMonth: |
95 | 150 | quarter_months = [3, 6, 9, 12] |
96 | 151 | year = reference_date.year |
@@ -118,27 +173,22 @@ def determine_contract_year_month(symbol: str, reference_date: Optional[datetime |
118 | 173 | ref = _normalize_reference_date(reference_date) |
119 | 174 | symbol_upper = symbol.upper() |
120 | 175 | rule = ROLL_RULES.get(symbol_upper) |
121 | | - |
122 | | - quarter_months = [3, 6, 9, 12] |
123 | 176 | year = ref.year |
124 | 177 | month = ref.month |
125 | 178 |
|
126 | 179 | if rule is None: |
127 | 180 | return _legacy_mid_month(ref) |
128 | 181 |
|
129 | | - if month in quarter_months: |
| 182 | + contract_months = _get_contract_months(rule) |
| 183 | + |
| 184 | + if month in contract_months: |
130 | 185 | target_year, target_month = year, month |
131 | | - roll_point = _calculate_roll_trigger(target_year, target_month, rule) |
132 | | - if ref >= roll_point: |
133 | | - target_year, target_month = _advance_quarter(target_month, target_year) |
134 | 186 | else: |
135 | | - candidates = [m for m in quarter_months if m > month] |
136 | | - if candidates: |
137 | | - target_month = candidates[0] |
138 | | - target_year = year |
139 | | - else: |
140 | | - target_month = quarter_months[0] |
141 | | - target_year = year + 1 |
| 187 | + target_year, target_month = _select_contract(year, month, contract_months) |
| 188 | + |
| 189 | + roll_point = _calculate_roll_trigger(target_year, target_month, rule) |
| 190 | + if ref >= roll_point: |
| 191 | + target_year, target_month = _advance_contract(target_month, target_year, contract_months) |
142 | 192 |
|
143 | 193 | return target_year, target_month |
144 | 194 |
|
@@ -201,6 +251,7 @@ def build_roll_schedule(asset, start: datetime, end: datetime, year_digits: int |
201 | 251 |
|
202 | 252 | symbol_upper = asset.symbol.upper() |
203 | 253 | rule = ROLL_RULES.get(symbol_upper) |
| 254 | + contract_months = _get_contract_months(rule) |
204 | 255 |
|
205 | 256 | schedule = [] |
206 | 257 | cursor = start |
|
0 commit comments