import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import t
from reportlab.platypus import (
SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image, PageBreak
)
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib import colors
# ==========================================================
# Monte-Carlo-Simulation zur Risikoanalyse einer Trading-Strategie
# - t-Verteilung, Normalverteilung, Reshuffle (Permutation), Bootstrap
# - Kennzahlen: MDD, CAR, Drawdown Duration (Trades)
# - Performance: komplett vektorisiert (keine inneren Trade-Loops)
# ==========================================================
# ------------------------- Datei --------------------------
# Passe den Dateinamen an
#file_path = '25025_LIT_Portfolio_moderat.xlsx'
#file_path = '25025_LIT_Portfolio_defensiv.xlsx'
file_path = '25025_LIT_Portfolio_agressiv.xlsx'
#file_path ='2026_01_04_GuV_RalfS.xlsx'
BACKTEST_DD_PCT = 12.6
BACKTEST_DUR_TRADES = 145
df = pd.read_excel(file_path, usecols=[0, 1])
# Historische Returns (2. Spalte)
hist_returns = df.iloc[:, 1].dropna().to_numpy(dtype=float)
mu = float(hist_returns.mean())
sigma = float(hist_returns.std(ddof=1))
print("Der Mittelwert der zweiten Spalte ist:", mu)
print("Die Standardabweichung der zweiten Spalte ist:", sigma)
# ------------------------- Inputs --------------------------
num_simulation = int(input("Bitte die Anzahl der Simulationen eingeben: "))
num_trades = int(input("Bitte die Anzahl der Trades/Simulation eingeben: "))
dff = int(input("Anzahl der Freiheitsgrade (default 7): ") or "7")
run_t = input("Monte Carlo mit t-Verteilung durchführen? (y/n): ").lower() == "y"
run_n = input("Monte Carlo mit Normalverteilung durchführen? (y/n): ").lower() == "y"
run_r = input("Monte Carlo mit Reshuffle durchführen? (y/n): ").lower() == "y"
run_b = input("Monte Carlo mit Bootstrap (Resample mit Zurücklegen) durchführen? (y/n): ").lower() == "y"
# ------------------------- Einstellungen -------------------
START_CAPITAL = 10_000.0
YEARS = 23
# Plot-Option: wie viele Pfade pro Panel anzeigen? (0 = gar nicht)
PLOT_PATHS = 50
# Progress-Ausgabe nur alle X Simulationen (print ist teuer)
PRINT_EVERY = 250
# ========================= Helpers =========================
def equity_from_returns(returns: np.ndarray, start_capital: float) -> np.ndarray:
"""Equity-Kurve aus relativen Returns, vektorisiert."""
# equity[t] = start * prod_{k<=t} (1 + r_k)
return start_capital * np.cumprod(1.0 + returns)
def mdd_from_equity(equity: np.ndarray) -> float:
"""Maximum Drawdown (als Anteil, z.B. 0.25 = 25%)."""
peak = np.maximum.accumulate(equity)
dd = (peak - equity) / peak
return float(dd.max())
def drawdown_duration_max(equity: np.ndarray) -> int:
"""
Längste Drawdown-Phase in Trades:
Anzahl Schritte zwischen DD-Start (erstmals unter Peak) bis Recovery (wieder >= Peak).
"""
peak = np.maximum.accumulate(equity)
underwater = equity < peak
if not underwater.any():
return 0
# Wechselpunkte finden
changes = np.diff(underwater.astype(np.int8))
starts = np.where(changes == 1)[0] + 1
ends = np.where(changes == -1)[0] + 1
# wenn es direkt im DD startet
if underwater[0]:
starts = np.r_[0, starts]
# wenn es im DD endet
if underwater[-1]:
ends = np.r_[ends, len(equity)]
durations = ends - starts
return int(durations.max()) if durations.size else 0
def car_from_equity(equity: np.ndarray, start_capital: float, years: float) -> float:
return float((equity[-1] / start_capital) ** (1.0 / years) - 1.0)
# ========================= Runner ==========================
def run_mc(generator_fn, title: str, ax_paths, plot_paths: int):
"""
generator_fn(i) -> returns ndarray of length num_trades
"""
MDD = np.empty(num_simulation, dtype=float)
CAR = np.empty(num_simulation, dtype=float)
DDUR = np.empty(num_simulation, dtype=np.int32)
for i in range(num_simulation):
if (i + 1) % PRINT_EVERY == 0:
print(f"{title}: {i+1}/{num_simulation}")
rets = generator_fn(i)
equity = equity_from_returns(rets, START_CAPITAL)
MDD[i] = mdd_from_equity(equity)
CAR[i] = car_from_equity(equity, START_CAPITAL, YEARS)
DDUR[i] = drawdown_duration_max(equity)
if ax_paths is not None and (i + 1) <= plot_paths:
ax_paths.plot(equity, alpha=0.30)
return MDD, CAR, DDUR
# ========================= Plots: Pfade ====================
fig_paths, (ax1, ax2, ax3, ax4) = plt.subplots(1, 4, figsize=(16, 5))
ax1.set_title("MCS - Randomized t-Distribution")
ax2.set_title("MCS - Randomized Gauss-Distribution")
ax3.set_title("MCS - Reshuffle")
ax4.set_title("MCS - Bootstrap")
# ========================= Simulation ======================
MDD_t = CAR_t = DDUR_t = None
MDD_n = CAR_n = DDUR_n = None
MDD_r = CAR_r = DDUR_r = None
MDD_b = CAR_b = DDUR_b = None
# ---- t-Distribution ----
if run_t:
print("*************************************")
print("MCS Simulation - Randomized t-Verteil")
def gen_t(_):
return t.rvs(df=dff, loc=mu, scale=sigma, size=num_trades)
MDD_t, CAR_t, DDUR_t = run_mc(gen_t, "t-Dist", ax1, PLOT_PATHS)
else:
print("t-Verteilung Monte Carlo übersprungen")
# ---- Normal (Gauss) ----
if run_n:
print("*************************************")
print("MCS Simulation - Randomized Norm")
def gen_n(_):
# schneller & identisch zur ppf(uniform)-Methode
return np.random.normal(loc=mu, scale=sigma, size=num_trades)
MDD_n, CAR_n, DDUR_n = run_mc(gen_n, "Gauss", ax2, PLOT_PATHS)
else:
print("Gauss Monte Carlo übersprungen")
# ---- Reshuffle (Permutation) ----
if run_r:
print("*************************************")
print("MCS Simulation - Reshuffle (STRICT)")
# strict reshuffle: immer ALLE historischen Trades
num_trades_backup = num_trades
num_trades = len(hist_returns)
def gen_r(_):
# reine Permutation – keine Auswahl, kein Resize
return np.random.permutation(hist_returns)
MDD_r, CAR_r, DDUR_r = run_mc(
gen_r,
"Reshuffle STRICT",
ax3,
PLOT_PATHS
)
# ursprünglichen Wert wiederherstellen
num_trades = num_trades_backup
else:
print("Reshuffle Monte Carlo übersprungen")
# ---- Bootstrap (Resample mit Zurücklegen) ----
if run_b:
print("*************************************")
print("MCS Simulation - Bootstrap (Resample mit Zurücklegen)")
def gen_b(_):
return np.random.choice(hist_returns, size=num_trades, replace=True)
MDD_b, CAR_b, DDUR_b = run_mc(gen_b, "Bootstrap", ax4, PLOT_PATHS)
else:
print("Bootstrap Monte Carlo übersprungen")
plt.show()
# ========================= Results: MDD =====================
print("")
print("*************************************")
print(" Results MDD ")
print("*************************************")
def print_dist(name, arr):
print(f"{name}")
print("1% Perzentil: ", round(np.percentile(arr, 1) * 100, 2), "%")
print("50% Perzentil:", round(np.percentile(arr, 50) * 100, 2), "%")
print("99% Perzentil:", round(np.percentile(arr, 99) * 100, 2), "%")
print("Max DD:", round(np.max(arr) * 100, 2), "%")
print("")
if run_t: print_dist("MDD results Randomized t-Dist", MDD_t)
if run_n: print_dist("MDD results Gauss-Dist", MDD_n)
if run_r: print_dist("MDD results Reshuffle-Dist", MDD_r)
if run_b: print_dist("MDD results Bootstrap", MDD_b)
# ========================= Hist: MDD ========================
fig_mdd, (ax5, ax6, ax7, ax8) = plt.subplots(1, 4, figsize=(16, 5))
ax5.set_title("MDD - t-Distribution")
ax6.set_title("MDD - Gauss")
ax7.set_title("MDD - Reshuffle")
ax8.set_title("MDD - Bootstrap")
if run_t: ax5.hist(MDD_t, bins=30, alpha=0.7, edgecolor='black')
if run_n: ax6.hist(MDD_n, bins=30, alpha=0.7, edgecolor='black')
if run_r: ax7.hist(MDD_r, bins=30, alpha=0.7, edgecolor='black')
if run_b: ax8.hist(MDD_b, bins=30, alpha=0.7, edgecolor='black')
plt.show()
# ========================= Results: CAR =====================
print("*************************************")
print(" Results CAR ")
print("*************************************")
def print_dist_car(name, arr):
print(f"{name}")
print("1% Perzentil: ", round(np.percentile(arr, 1) * 100, 2), "%")
print("50% Perzentil:", round(np.percentile(arr, 50) * 100, 2), "%")
print("99% Perzentil:", round(np.percentile(arr, 99) * 100, 2), "%")
print("Max CAR:", round(np.max(arr) * 100, 2), "%")
print("Min CAR:", round(np.min(arr) * 100, 2), "%")
print("")
if run_t: print_dist_car("CAR results Randomized t-Dist", CAR_t)
if run_n: print_dist_car("CAR results Gauss-Dist", CAR_n)
if run_r: print_dist_car("CAR results Reshuffle-Dist", CAR_r)
if run_b: print_dist_car("CAR results Bootstrap-Dist", CAR_b)
# ========================= Hist: CAR ========================
fig_car, (ax9, ax10, ax11, ax12) = plt.subplots(1, 4, figsize=(16, 5))
ax9.set_title("CAR - t-Distribution")
ax10.set_title("CAR - Gauss")
ax11.set_title("CAR - Reshuffle")
ax12.set_title("CAR - Bootstrap")
if run_t: ax9.hist(CAR_t, bins=30, alpha=0.7, edgecolor='black')
if run_n: ax10.hist(CAR_n, bins=30, alpha=0.7, edgecolor='black')
if run_r: ax11.hist(CAR_r, bins=30, alpha=0.7, edgecolor='black')
if run_b: ax12.hist(CAR_b, bins=30, alpha=0.7, edgecolor='black')
plt.show()
# ========================= Results: DD Duration =============
print("*************************************")
print(" Results Drawdown Duration (Trades) ")
print("*************************************")
def print_ddur(name, arr):
print(name)
print("1% Perzentil:", int(np.percentile(arr, 1)))
print("50% Perzentil:", int(np.percentile(arr, 50)))
print("99% Perzentil:", int(np.percentile(arr, 99)))
print("Max:", int(np.max(arr)))
print("")
if run_t: print_ddur("DD Duration t-Dist (max pro Simulation)", DDUR_t)
if run_n: print_ddur("DD Duration Gauss (max pro Simulation)", DDUR_n)
if run_r: print_ddur("DD Duration Reshuffle (max pro Simulation)", DDUR_r)
if run_b: print_ddur("DD Duration Bootstrap (max pro Simulation)", DDUR_b)
# ========================= Hist: DDUR =======================
fig_ddur, (ax13, ax14, ax15, ax16) = plt.subplots(1, 4, figsize=(16, 5))
ax13.set_title("DDUR - t-Distribution")
ax14.set_title("DDUR - Gauss")
ax15.set_title("DDUR - Reshuffle")
ax16.set_title("DDUR - Bootstrap")
if run_t: ax13.hist(DDUR_t, bins=30, alpha=0.7, edgecolor='black')
if run_n: ax14.hist(DDUR_n, bins=30, alpha=0.7, edgecolor='black')
if run_r: ax15.hist(DDUR_r, bins=30, alpha=0.7, edgecolor='black')
if run_b: ax16.hist(DDUR_b, bins=30, alpha=0.7, edgecolor='black')
plt.show()
# ========================= 2D Plot: DD Tiefe vs DD Dauer =======================
# Daten übergeben (nur die, die aktiv waren)
method_data = [
("t-Dist", MDD_t, DDUR_t),
("Gauss", MDD_n, DDUR_n),
("Reshuffle", MDD_r, DDUR_r),
("Bootstrap", MDD_b, DDUR_b),
]
def plot_dd_2d_scatter_all(
method_data,
trades_per_month=28.0,
show_quantile_lines=(50, 95, 99),
anomaly_mode="auto_ref", # "auto_ref" | "per_panel" | "fixed"
fixed_anomaly_dd=None, # nur bei anomaly_mode="fixed"
fixed_anomaly_dur=None, # nur bei anomaly_mode="fixed"
anomaly_dd_ref="t-Dist", # DD-Schwelle aus dieser Methode (z.B. t-Dist)
anomaly_dur_ref="Reshuffle", # Dauer-Schwelle aus dieser Methode (z.B. Reshuffle)
anomaly_quantile=99, # z.B. 99
show_anomaly_zone=True,
portfolio_point=None, # (dd_pct, dur_trades) oder None
portfolio_label="Portfolio (real)"
):
"""
Erstellt eine 2D-Grafik (Scatter) für Drawdown-Tiefe (%) vs Drawdown-Dauer (Trades),
je Methode als eigenes Panel.
method_data: list of tuples (name, MDD_array, DDUR_array)
- MDD_array: max drawdown als Anteil (0.25 = 25%)
- DDUR_array: max drawdown duration in trades (int)
anomaly_mode:
- "auto_ref": Nutzt DD- und Dauer-Schwellen aus Referenzmethoden (empfohlen)
- "per_panel": Nutzt je Panel die eigenen Quantile als Schwellen (nur Analyse)
- "fixed": Nutzt fixed_anomaly_dd / fixed_anomaly_dur als feste Schwellen
"""
import numpy as np
import matplotlib.pyplot as plt
# ----------------- Daten filtern (nur Methoden mit Daten) -----------------
filtered = []
for name, mdd, ddur in method_data:
if mdd is None or ddur is None:
continue
if len(mdd) == 0 or len(ddur) == 0:
continue
filtered.append((name, mdd, ddur))
if not filtered:
print("Keine Daten für 2D-Plot vorhanden.")
return
# ----------------- Helper: Trades <-> Jahre -----------------
def trades_to_years(x):
return x / (trades_per_month * 12.0)
def years_to_trades(x):
return x * (trades_per_month * 12.0)
# ----------------- Helper: Referenz-Arrays holen -----------------
def _get_method_arrays(ref_name):
for n, mdd, ddur in filtered:
if n == ref_name:
dd_vals = np.asarray(mdd, dtype=float) * 100.0
dur_vals = np.asarray(ddur, dtype=float)
dd_mask = np.isfinite(dd_vals)
dur_mask = np.isfinite(dur_vals)
return dd_vals[dd_mask], dur_vals[dur_mask]
raise ValueError(
f"Referenzmethode '{ref_name}' nicht gefunden. Verfügbar: {[n for n,_,_ in filtered]}"
)
# ----------------- Anomalie-Schwellen bestimmen -----------------
dd_thr_global = None
dur_thr_global = None
if anomaly_mode == "auto_ref":
dd_vals_ref, _ = _get_method_arrays(anomaly_dd_ref)
_, dur_vals_ref = _get_method_arrays(anomaly_dur_ref)
dd_thr_global = float(np.percentile(dd_vals_ref, anomaly_quantile))
dur_thr_global = float(np.percentile(dur_vals_ref, anomaly_quantile))
elif anomaly_mode == "fixed":
if fixed_anomaly_dd is None or fixed_anomaly_dur is None:
raise ValueError("Bei anomaly_mode='fixed' müssen fixed_anomaly_dd und fixed_anomaly_dur gesetzt sein.")
dd_thr_global = float(fixed_anomaly_dd)
dur_thr_global = float(fixed_anomaly_dur)
elif anomaly_mode == "per_panel":
# Schwellen werden je Panel berechnet
pass
else:
raise ValueError("anomaly_mode muss 'auto_ref', 'fixed' oder 'per_panel' sein.")
print(f"[Anomaly thresholds] mode={anomaly_mode} | "
f"DD_thr={dd_thr_global if dd_thr_global is not None else 'per-panel'} | "
f"DUR_thr={dur_thr_global if dur_thr_global is not None else 'per-panel'}")
# ----------------- Figure Setup -----------------
n_panels = len(filtered)
fig, axes = plt.subplots(1, n_panels, figsize=(6 * n_panels, 5.6), sharey=True)
if n_panels == 1:
axes = [axes]
# ----------------- Panels -----------------
for ax, (name, mdd, ddur) in zip(axes, filtered):
mdd_pct = np.asarray(mdd, dtype=float) * 100.0
ddur_tr = np.asarray(ddur, dtype=float)
mask = np.isfinite(mdd_pct) & np.isfinite(ddur_tr)
mdd_pct = mdd_pct[mask]
ddur_tr = ddur_tr[mask]
# Scatter
ax.scatter(ddur_tr, mdd_pct, s=10, alpha=0.25)
ax.set_title(f"DD Tiefe vs. DD Dauer – {name}")
ax.set_xlabel("Drawdown-Dauer (Trades) – max pro Simulation")
ax.grid(True)
# 2. Achse: Jahre
secax = ax.secondary_xaxis('top', functions=(trades_to_years, years_to_trades))
secax.set_xlabel("Drawdown-Dauer (Jahre)")
# Quantil-Linien pro Panel (optional)
if show_quantile_lines:
qs = list(show_quantile_lines)
yq = np.percentile(mdd_pct, qs)
xq = np.percentile(ddur_tr, qs)
for q, y in zip(qs, yq):
ax.axhline(y, linewidth=1.2)
if q == 99:
ax.text(ax.get_xlim()[0] + 5, y, f"{q}% DD: {y:.2f}%", va="bottom")
for q, x in zip(qs, xq):
ax.axvline(x, linewidth=1.2)
if q == 99:
ax.text(x, ax.get_ylim()[0] + 0.5, f"{q}% Dauer: {int(x)}", rotation=90, va="bottom")
# Anomalie-Zone schattieren
if show_anomaly_zone:
if anomaly_mode == "per_panel":
dd_thr = float(np.percentile(mdd_pct, anomaly_quantile))
dur_thr = float(np.percentile(ddur_tr, anomaly_quantile))
else:
dd_thr = dd_thr_global
dur_thr = dur_thr_global
x_min, x_max = ax.get_xlim()
y_min, y_max = ax.get_ylim()
if dur_thr < x_max and dd_thr < y_max:
ax.axvspan(dur_thr, x_max, alpha=0.08)
ax.axhspan(dd_thr, y_max, alpha=0.08)
ax.text(
dur_thr + 0.02 * (x_max - x_min),
dd_thr + 0.02 * (y_max - y_min),
f"Anomalie-Zone\n(>{dd_thr:.2f}% & >{int(dur_thr)} Trades)",
fontsize=9,
va="bottom"
)
# ----------------- Backtest / Portfolio-Punkt (hervorgehoben) -----------------
if portfolio_point is not None:
dd_p, dur_p = portfolio_point # (dd_pct, dur_trades)
ax.scatter(
[dur_p],
[dd_p],
s=180,
marker="X",
color="red",
edgecolor="black",
linewidth=1.3,
zorder=10,
label=portfolio_label
)
ax.text(
dur_p,
dd_p,
f" {portfolio_label}",
va="center",
fontsize=10,
fontweight="bold",
color="black",
zorder=10
)
axes[0].set_ylabel("Max. Drawdown-Tiefe (%) – max pro Simulation")
# Suptitle + Layout-Fix (damit nichts abgeschnitten wird)
fig.suptitle("Monte Carlo 2D: Drawdown-Tiefe × Drawdown-Dauer", fontsize=14, y=0.98)
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.show()
# --------------------------
# Beispiel-Aufruf (deine Variablen MDD_t, DDUR_t, etc. existieren ja schon)
# --------------------------
method_data = [
("t-Dist", MDD_t, DDUR_t),
("Gauss", MDD_n, DDUR_n),
("Reshuffle", MDD_r, DDUR_r),
("Bootstrap", MDD_b, DDUR_b),
]
def mc_thresholds(method_data, dd_method="t-Dist", dur_method="Reshuffle", q=99):
dd_vals = None
dur_vals = None
for name, mdd, ddur in method_data:
if name == dd_method:
dd_vals = np.asarray(mdd, dtype=float) * 100.0
if name == dur_method:
dur_vals = np.asarray(ddur, dtype=float)
if dd_vals is None or dur_vals is None:
raise ValueError("Referenz-Methode für Schwellen nicht gefunden.")
dd_thr = float(np.percentile(dd_vals[np.isfinite(dd_vals)], q))
dur_thr = float(np.percentile(dur_vals[np.isfinite(dur_vals)], q))
return dd_thr, dur_thr
dd_thr, dur_thr = mc_thresholds(method_data, "t-Dist", "Reshuffle", 99)
print(f"[CHECK] 99% Schwellen: DD={dd_thr:.2f}% | Dauer={dur_thr:.0f} Trades (~{dur_thr/(28*12):.2f} Jahre)")
# ========================= Backtest Punkt aus XLSX =========================
# Equity-Kurve des realen Backtests aus den historischen Returns (2. Spalte)
equity_bt = equity_from_returns(hist_returns, START_CAPITAL)
# Max DD (als Anteil) -> in %
BACKTEST_DD_PCT = mdd_from_equity(equity_bt) * 100.0
# Max DD Dauer (Trades)
BACKTEST_DUR_TRADES = drawdown_duration_max(equity_bt)
print(f"[BACKTEST] Max DD: {BACKTEST_DD_PCT:.2f}% | Max DD-Dauer: {BACKTEST_DUR_TRADES} Trades")
plot_dd_2d_scatter_all(
method_data=method_data,
trades_per_month=28.0,
show_quantile_lines=(50, 95, 99),
anomaly_mode="auto_ref", # Plot nimmt intern dieselben Referenzen
anomaly_dd_ref="t-Dist",
anomaly_dur_ref="Reshuffle",
anomaly_quantile=99,
show_anomaly_zone=True,
portfolio_point=(BACKTEST_DD_PCT, BACKTEST_DUR_TRADES),
portfolio_label="Backtest"
)
def _percentiles(arr, qs=(1, 50, 95, 99)):
a = np.asarray(arr, dtype=float)
a = a[np.isfinite(a)]
if a.size == 0:
return {q: np.nan for q in qs}, np.nan
vals = np.percentile(a, qs)
return {q: float(v) for q, v in zip(qs, vals)}, float(np.max(a))
def generate_mc_report_pdf(
method_data,
hist_returns,
start_capital,
trades_per_month=28.0,
anomaly_dd_ref="t-Dist",
anomaly_dur_ref="Reshuffle",
anomaly_quantile=99,
persist_fraction=0.25, # Persistenz-Fenster = 25% von DUR99 (empfohlen)
out_pdf="MonteCarlo_Drawdown_Report_Auto.pdf",
out_plot_png="mc_dd_2d.png",
):
"""
Vollautomatischer PDF-Report:
- berechnet MC-Perzentile (DD und Dauer) je Methode
- berechnet Backtest-Punkt aus hist_returns (Equity -> max DD, max DD-Dauer)
- berechnet Stress-/Pause-Schwellen (95% / 99%) aus Referenzmethoden
- berechnet Persistenz-Fenster für Abschalten
- erstellt 2D-Plot (DD Tiefe x DD Dauer) als PNG inkl. Backtest-Punkt
- erzeugt PDF inkl. Tabellen, Risikologik, Abschaltkriterien, Grafik
Voraussetzungen (müssen im Skript existieren):
- equity_from_returns(returns, start_capital) -> equity array
- mdd_from_equity(equity) -> max DD als Anteil (z.B. 0.234)
- drawdown_duration_max(equity) -> max DD-Dauer in Trades (int)
- evaluate_portfolio_state(...) (aus dem Helper oben)
"""
import os
import numpy as np
import matplotlib.pyplot as plt
from reportlab.platypus import (
SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image
)
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib import colors
# ---------------- Helper: Perzentile ----------------
def _percentiles(arr, qs=(1, 50, 95, 99)):
a = np.asarray(arr, dtype=float)
a = a[np.isfinite(a)]
if a.size == 0:
return {q: np.nan for q in qs}, np.nan
vals = np.percentile(a, qs)
return {q: float(v) for q, v in zip(qs, vals)}, float(np.max(a))
# ---------------- Backtest-Punkt ----------------
equity_bt = equity_from_returns(hist_returns, start_capital)
bt_dd_pct = float(mdd_from_equity(equity_bt) * 100.0) # in %
bt_dur_trades = int(drawdown_duration_max(equity_bt)) # in Trades
bt_years = bt_dur_trades / (trades_per_month * 12.0)
# ---------------- Schwellen aus Referenzen (99% + 95%) ----------------
dd99 = None
dur99 = None
dd95 = None
dur95 = None
for name, mdd, ddur in method_data:
if name == anomaly_dd_ref:
mdd_pct = np.asarray(mdd, dtype=float) * 100.0
mdd_pct = mdd_pct[np.isfinite(mdd_pct)]
dd99 = float(np.percentile(mdd_pct, anomaly_quantile))
dd95 = float(np.percentile(mdd_pct, 95))
if name == anomaly_dur_ref:
dur = np.asarray(ddur, dtype=float)
dur = dur[np.isfinite(dur)]
dur99 = float(np.percentile(dur, anomaly_quantile))
dur95 = float(np.percentile(dur, 95))
if dd99 is None or dur99 is None:
raise ValueError("99%-Schwellen konnten nicht berechnet werden. Prüfe anomaly_dd_ref/anomaly_dur_ref.")
if dd95 is None or dur95 is None:
raise ValueError("95%-Schwellen konnten nicht berechnet werden. Prüfe anomaly_dd_ref/anomaly_dur_ref.")
dur99_years = dur99 / (trades_per_month * 12.0)
dur95_years = dur95 / (trades_per_month * 12.0)
# Persistenz-Fenster (Abschalten)
persist_trades = int(np.ceil(persist_fraction * dur99))
persist_years = persist_trades / (trades_per_month * 12.0)
# Status & Risikofaktor (hier: bezogen auf Backtest-Punkt)
state, risk_factor = evaluate_portfolio_state(
dd_now=bt_dd_pct,
dur_now=bt_dur_trades,
dd95=dd95, dur95=dur95,
dd99=dd99, dur99=dur99
)
# ---------------- 2D-Plot als PNG speichern ----------------
n_panels = len(method_data)
fig, axes = plt.subplots(1, n_panels, figsize=(6 * n_panels, 5.6), sharey=True)
if n_panels == 1:
axes = [axes]
def trades_to_years(x):
return x / (trades_per_month * 12.0)
def years_to_trades(x):
return x * (trades_per_month * 12.0)
for ax, (name, mdd, ddur) in zip(axes, method_data):
mdd_pct = np.asarray(mdd, dtype=float) * 100.0
ddur_tr = np.asarray(ddur, dtype=float)
mask = np.isfinite(mdd_pct) & np.isfinite(ddur_tr)
mdd_pct = mdd_pct[mask]
ddur_tr = ddur_tr[mask]
ax.scatter(ddur_tr, mdd_pct, s=10, alpha=0.25)
ax.set_title(f"DD Tiefe vs DD Dauer – {name}")
ax.set_xlabel("DD-Dauer (Trades) – max pro Simulation")
ax.grid(True)
secax = ax.secondary_xaxis('top', functions=(trades_to_years, years_to_trades))
secax.set_xlabel("DD-Dauer (Jahre)")
# Quantil-Linien (50/95/99) je Panel
qs = [50, 95, 99]
yq = np.percentile(mdd_pct, qs)
xq = np.percentile(ddur_tr, qs)
for y in yq:
ax.axhline(y, linewidth=1.2)
for x in xq:
ax.axvline(x, linewidth=1.2)
# Anomalie-Zone (global: dd99 & dur99)
x_min, x_max = ax.get_xlim()
y_min, y_max = ax.get_ylim()
if dur99 < x_max and dd99 < y_max:
ax.axvspan(dur99, x_max, alpha=0.08)
ax.axhspan(dd99, y_max, alpha=0.08)
# Backtest-Punkt (rot, prominent)
ax.scatter([bt_dur_trades], [bt_dd_pct], s=220, marker="X",
color="red", edgecolor="black", linewidth=1.6, zorder=100)
ax.text(bt_dur_trades, bt_dd_pct, " Backtest", va="center",
fontsize=10, fontweight="bold", color="black", zorder=101)
axes[0].set_ylabel("Max. Drawdown-Tiefe (%) – max pro Simulation")
fig.suptitle("Monte Carlo 2D: Drawdown-Tiefe × Drawdown-Dauer", fontsize=14, y=0.98)
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.savefig(out_plot_png, dpi=200)
plt.close(fig)
# ---------------- PDF erzeugen ----------------
styles = getSampleStyleSheet()
doc = SimpleDocTemplate(
out_pdf, pagesize=A4,
rightMargin=36, leftMargin=36,
topMargin=36, bottomMargin=36
)
story = []
# Titel + Kontext
story.append(Paragraph("Monte-Carlo Drawdown Report", styles["Title"]))
story.append(Spacer(1, 10))
story.append(Paragraph(
f"""
Dieser Report fasst die Monte-Carlo-Auswertung deines EA-Portfolios zusammen.
Analysiert wurden (1) maximale Drawdown-Tiefe und (2) maximale Drawdown-Dauer (in Trades).
Eine zweite Achse zeigt die Dauer in Jahren auf Basis von {trades_per_month:.0f} Trades/Monat.
""".strip(),
styles["Normal"]
))
story.append(Spacer(1, 12))
# Backtest Summary
story.append(Paragraph("Backtest-Kennzahlen (real)", styles["Heading2"]))
bt_table = [
["Kennzahl", "Wert"],
["Max Drawdown (Backtest)", f"{bt_dd_pct:.2f} %"],
["Max Drawdown-Dauer (Backtest)", f"{bt_dur_trades} Trades (~{bt_years:.2f} Jahre)"],
]
t_bt = Table(bt_table, colWidths=[220, 260])
t_bt.setStyle(TableStyle([
("BACKGROUND", (0,0), (-1,0), colors.lightgrey),
("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
("GRID", (0,0), (-1,-1), 0.5, colors.grey),
("VALIGN", (0,0), (-1,-1), "MIDDLE"),
("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.whitesmoke, colors.white]),
]))
story.append(t_bt)
story.append(Spacer(1, 12))
# Schwellenwerte (95/99)
story.append(Paragraph("Risikosteuerung & Abschaltkriterien", styles["Heading2"]))
story.append(Spacer(1, 6))
risk_table = [
["Stufe", "Bedingung", "Maßnahme"],
["Normalbetrieb", "DD < DD95 UND DUR < DUR95", "Risiko 100%"],
["Stress-Modus", "DD ≥ DD95 ODER DUR ≥ DUR95", "Risiko 50% (halbieren)"],
["Pause-Modus", "DD ≥ DD99 UND DUR ≥ DUR99", "Keine neuen Trades"],
["Abschalten", f"Pause + Persistenz ≥ {persist_trades} Trades (~{persist_years:.2f} Jahre)\nODER Strukturbruch", "System deaktivieren"],
]
t_risk = Table(risk_table, colWidths=[90, 250, 140])
t_risk.setStyle(TableStyle([
("BACKGROUND", (0,0), (-1,0), colors.lightgrey),
("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
("GRID", (0,0), (-1,-1), 0.5, colors.grey),
("VALIGN", (0,0), (-1,-1), "TOP"),
("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.whitesmoke, colors.white]),
]))
story.append(t_risk)
story.append(Spacer(1, 10))
story.append(Paragraph(
f"""
Schwellenwerte (dynamisch aus Monte-Carlo):
DD95 = {dd95:.2f}% | DUR95 = {dur95:.0f} Trades (~{dur95_years:.2f} Jahre)
DD99 = {dd99:.2f}% | DUR99 = {dur99:.0f} Trades (~{dur99_years:.2f} Jahre)
""".strip(),
styles["Normal"]
))
story.append(Spacer(1, 8))
story.append(Paragraph(
f"""
Aktueller Status (auf Basis Backtest-Punkt):
Zustand: {state}
Risikofaktor: {risk_factor:.2f}
Backtest: DD = {bt_dd_pct:.2f}% | Dauer = {bt_dur_trades} Trades (~{bt_years:.2f} Jahre)
""".strip(),
styles["Normal"]
))
story.append(Spacer(1, 10))
story.append(Paragraph("Strukturbruch (Abschalten, wenn mind. 2 von 4 erfüllt):", styles["Normal"]))
story.append(Paragraph(
"""
1) Erwartungswert bricht: Mean(Return) < 0 und Profit Factor < 0.9 (Rolling 100 Trades)
2) Trefferquote signifikant schlechter als historisch (z. B. < p_hist − 2σ)
3) Diversifikation bricht: Ø-Korrelation > 0.75 (Rolling)
4) Execution-Bruch: Spread/Slippage dauerhaft deutlich über historischem Niveau
""".strip(),
styles["Normal"]
))
story.append(Spacer(1, 12))
# Perzentile je Methode (DD)
story.append(Paragraph("Perzentile je Methode", styles["Heading2"]))
dd_rows = [["Methode", "DD 1%", "DD 50%", "DD 95%", "DD 99%", "Max DD"]]
for name, mdd, _ in method_data:
p, mx = _percentiles(np.asarray(mdd, dtype=float) * 100.0, qs=(1, 50, 95, 99))
dd_rows.append([name, f"{p[1]:.2f}%", f"{p[50]:.2f}%", f"{p[95]:.2f}%", f"{p[99]:.2f}%", f"{mx:.2f}%"])
t_dd = Table(dd_rows, colWidths=[90, 70, 70, 70, 70, 70])
t_dd.setStyle(TableStyle([
("BACKGROUND", (0,0), (-1,0), colors.lightgrey),
("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
("GRID", (0,0), (-1,-1), 0.5, colors.grey),
("VALIGN", (0,0), (-1,-1), "MIDDLE"),
("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.whitesmoke, colors.white]),
]))
story.append(Paragraph("A) Max Drawdown-Tiefe", styles["Heading3"]))
story.append(t_dd)
story.append(Spacer(1, 12))
# Perzentile je Methode (Dauer)
dur_rows = [["Methode", "DUR 1%", "DUR 50%", "DUR 95%", "DUR 99%", "Max DUR"]]
for name, _, ddur in method_data:
p, mx = _percentiles(np.asarray(ddur, dtype=float), qs=(1, 50, 95, 99))
dur_rows.append([name, f"{p[1]:.0f}", f"{p[50]:.0f}", f"{p[95]:.0f}", f"{p[99]:.0f}", f"{mx:.0f}"])
t_dur = Table(dur_rows, colWidths=[90, 70, 70, 70, 70, 70])
t_dur.setStyle(TableStyle([
("BACKGROUND", (0,0), (-1,0), colors.lightgrey),
("FONTNAME", (0,0), (-1,0), "Helvetica-Bold"),
("GRID", (0,0), (-1,-1), 0.5, colors.grey),
("VALIGN", (0,0), (-1,-1), "MIDDLE"),
("ROWBACKGROUNDS", (0,1), (-1,-1), [colors.whitesmoke, colors.white]),
]))
story.append(Paragraph("B) Max Drawdown-Dauer (Trades)", styles["Heading3"]))
story.append(t_dur)
story.append(Spacer(1, 12))
# Grafik
story.append(Paragraph("2D-Visualisierung: Drawdown-Tiefe × Drawdown-Dauer", styles["Heading2"]))
story.append(Spacer(1, 6))
story.append(Image(out_plot_png, width=520, height=300))
story.append(Spacer(1, 10))
story.append(Paragraph(
"Hinweis: Der rote X-Punkt markiert den realen Backtest (max DD, max DD-Dauer). "
"Die schattierte Ecke entspricht der Anomalie-Zone (gleichzeitige Überschreitung beider 99%-Schwellen).",
styles["Normal"]
))
doc.build(story)
return {
"pdf": os.path.abspath(out_pdf),
"plot_png": os.path.abspath(out_plot_png),
"bt_dd_pct": bt_dd_pct,
"bt_dur_trades": bt_dur_trades,
"dd95": dd95,
"dur95": dur95,
"dd99": dd99,
"dur99": dur99,
"persist_trades": persist_trades,
"state": state,
"risk_factor": risk_factor,
}
method_data = [
("t-Dist", MDD_t, DDUR_t),
("Gauss", MDD_n, DDUR_n),
("Reshuffle", MDD_r, DDUR_r),
("Bootstrap", MDD_b, DDUR_b),
]
def evaluate_portfolio_state(dd_now, dur_now, dd95, dur95, dd99, dur99):
"""
Bestimmt den Portfolio-Zustand und den Risikofaktor anhand von DD- und Dauer-Schwellen.
dd_now : aktueller Drawdown in % (z.B. 23.4)
dur_now : aktuelle DD-Dauer in Trades (int)
dd95/dur95 : Stress-Schwellen (95%-Quantile)
dd99/dur99 : Pause-Schwellen (99%-Quantile)
Returns:
(state, risk_factor)
state in {"NORMAL","STRESS","PAUSE"}
risk_factor in {1.0, 0.5, 0.0}
"""
# Pause: nur wenn BEIDE 99%-Schwellen gleichzeitig überschritten werden
if (dd_now >= dd99) and (dur_now >= dur99):
return "PAUSE", 0.0
# Stress: sobald EINE 95%-Schwelle überschritten wird (aber nicht Pause)
if (dd_now >= dd95) or (dur_now >= dur95):
return "STRESS", 0.5
return "NORMAL", 1.0
result = generate_mc_report_pdf(
method_data=method_data,
hist_returns=hist_returns, # kommt aus deiner XLSX
start_capital=START_CAPITAL,
trades_per_month=28.0,
anomaly_dd_ref="t-Dist",
anomaly_dur_ref="Reshuffle",
anomaly_quantile=99,
out_pdf="MonteCarlo_Drawdown_Report_Auto.pdf",
out_plot_png="mc_dd_2d.png",
)
print("PDF gespeichert unter:", result["pdf"])
print("Plot gespeichert unter:", result["plot_png"])