Utilities Module
The utilities module provides core technical calculations, data processing functions, and supporting tools for PVTools optimization. This comprehensive guide covers all utility modules and their practical applications.
Module Organization
utilities/
├── __init__.py # Module initialization
├── technical.py # Grid integration and billing calculations
├── finance.py # Financial modeling and analysis
├── bess.py # Battery energy storage system modeling
├── processes.py # Data processing and API integration
├── profile_estimation.py # Load profile estimation algorithms
├── projection.py # Future projections and forecasting
├── validation.py # Data validation and error checking
└── tools.py # General utility functions
technical.py - Grid Integration
Core Functions
Bill Calculation Engine
The heart of TNB tariff calculations with support for all Malaysian tariff structures:
def calculate_bill(c: Dict, tariff_category: str, total_consumption: float,
peak_power_demand: Optional[float] = None,
peak_fraction: Optional[float] = None) -> float:
"""
Calculate TNB electricity bill with comprehensive tariff support.
Supports all TNB tariff categories:
- Domestic (B): Tiered energy charges
- Commercial (C1-C3): Energy + demand charges
- Industrial (D-F): Complex multi-part tariffs
- Agricultural (G): Specialized farming rates
Parameters:
c: Configuration with tariff data
tariff_category: TNB tariff code (B, C1, C2, etc.)
total_consumption: Monthly energy consumption (kWh)
peak_power_demand: Peak demand for commercial tariffs (kW)
peak_fraction: Peak hour consumption ratio (0.0-1.0)
Returns:
Total monthly bill in RM
Example:
# Residential tariff (tiered structure)
bill = calculate_bill(config, "B", 350.0)
# Commercial tariff with demand charges
bill = calculate_bill(config, "C1", 2500.0, peak_power_demand=15.0)
# Time-of-use tariff
bill = calculate_bill(config, "C2", 1800.0, peak_fraction=0.6)
"""
Implementation Details:
def calculate_bill(c, tariff_category, total_consumption, peak_power_demand=None, peak_fraction=None):
# Handle negative consumption (export scenarios)
if total_consumption < 0:
return total_consumption * c["standard_parameters"]["SMP_export"]["rate"]
tariff_data = c["grid_tariffs"][tariff_category]
charge_type = tariff_data.get("charge_type", "energy_only")
if charge_type == "tiered":
return calculate_tiered_bill(tariff_data, total_consumption)
elif charge_type == "energy_demand":
return calculate_energy_demand_bill(tariff_data, total_consumption, peak_power_demand)
elif charge_type == "time_of_use":
return calculate_tou_bill(tariff_data, total_consumption, peak_fraction)
else:
raise ValueError(f"Unsupported charge type: {charge_type}")
Tiered Billing Calculation
def calculate_tiered_bill(tariff_data: Dict, consumption: float) -> float:
"""Calculate bill for tiered tariff structure (e.g., Domestic B)."""
total_charge = 0.0
remaining_consumption = consumption
for tier in tariff_data["tiers"]:
tier_limit = tier["limit"]
tier_rate = tier["rate"]
if tier_limit is None: # Unlimited top tier
tier_consumption = remaining_consumption
else:
tier_consumption = min(remaining_consumption, tier_limit)
total_charge += tier_consumption * tier_rate
remaining_consumption -= tier_consumption
if remaining_consumption <= 0:
break
# Add service charge
total_charge += tariff_data.get("service_charge", 0)
return total_charge
Energy + Demand Billing
def calculate_energy_demand_bill(tariff_data: Dict, consumption: float, peak_demand: float) -> float:
"""Calculate bill for energy + demand tariff structure (e.g., Commercial C1)."""
# Energy charge
energy_rate = tariff_data["energy_rate"]
energy_charge = consumption * energy_rate
# Demand charge
demand_rate = tariff_data["demand_rate"]
demand_charge = peak_demand * demand_rate
# Service charge
service_charge = tariff_data.get("service_charge", 0)
return energy_charge + demand_charge + service_charge
Time-of-Use Billing
def calculate_tou_bill(tariff_data: Dict, consumption: float, peak_fraction: float) -> float:
"""Calculate bill for time-of-use tariff structure."""
if peak_fraction is None:
peak_fraction = 0.5 # Default assumption
peak_consumption = consumption * peak_fraction
off_peak_consumption = consumption * (1 - peak_fraction)
peak_rate = tariff_data["peak_rate"]
off_peak_rate = tariff_data["off_peak_rate"]
total_charge = (peak_consumption * peak_rate +
off_peak_consumption * off_peak_rate)
# Add service charge
total_charge += tariff_data.get("service_charge", 0)
return total_charge
Inverter Sizing Algorithms
Optimal Inverter Configuration
def size_inverters(system_capacity: float, inverter_data: Dict,
design_constraints: Dict = None) -> Dict:
"""
Determine optimal inverter configuration for system capacity.
Considers:
- Cost optimization
- Redundancy requirements
- String configuration
- Electrical code compliance
Parameters:
system_capacity: Total system capacity (kWp)
inverter_data: Available inverter specifications
design_constraints: Optional design requirements
Returns:
Dictionary with inverter configuration and costs
"""
available_inverters = inverter_data["models"]
configurations = []
# Evaluate all possible inverter combinations
for inverter_model in available_inverters:
config = optimize_inverter_combination(system_capacity, inverter_model)
if config["feasible"]:
configurations.append(config)
# Select optimal configuration based on cost and performance
optimal_config = min(configurations, key=lambda x: x["total_cost"])
return optimal_config
def optimize_inverter_combination(system_capacity: float, inverter_spec: Dict) -> Dict:
"""Optimize combination of specific inverter model."""
inverter_capacity = inverter_spec["capacity_kw"]
inverter_cost = inverter_spec["cost"]
# Calculate number of inverters needed
num_inverters = math.ceil(system_capacity / inverter_capacity)
total_inverter_capacity = num_inverters * inverter_capacity
# Check sizing ratio (typically 1.1-1.3 for optimal performance)
sizing_ratio = system_capacity / total_inverter_capacity
if 0.8 <= sizing_ratio <= 1.3: # Acceptable range
return {
"model": inverter_spec["model"],
"quantity": num_inverters,
"total_capacity": total_inverter_capacity,
"total_cost": num_inverters * inverter_cost,
"sizing_ratio": sizing_ratio,
"feasible": True
}
else:
return {"feasible": False}
Reverse Bill Calculation
def solve_pre_surcharge_bill(c: Dict, tariff_category: str, target_bill: float,
peak_demand: float, peak_fraction: float) -> float:
"""
Reverse-engineer energy consumption from target bill amount.
Used for load profile estimation when only bill amount is known.
Uses numerical optimization to find consumption that matches target bill.
Parameters:
c: Configuration with tariff data
tariff_category: TNB tariff code
target_bill: Target monthly bill (RM)
peak_demand: Peak power demand (kW)
peak_fraction: Peak hour consumption ratio
Returns:
Monthly energy consumption (kWh)
"""
def bill_error(consumption):
"""Calculate error between calculated and target bill."""
calculated_bill = calculate_bill(c, tariff_category, consumption[0],
peak_demand, peak_fraction)
return (calculated_bill - target_bill) ** 2
# Initial guess based on simple rate
simple_rate = 0.40 # Rough RM/kWh estimate
initial_guess = [target_bill / simple_rate]
# Optimize to find consumption
result = minimize(bill_error, initial_guess,
bounds=[(0, 10000)], method='L-BFGS-B')
if result.success:
return result.x[0]
else:
logger.warning("Failed to converge in bill reversal calculation")
return initial_guess[0]
finance.py - Financial Modeling
Core Financial Functions
Net Present Value Calculations
def calculate_npv(cash_flows: List[float], discount_rate: float) -> float:
"""
Calculate Net Present Value with proper error handling.
Parameters:
cash_flows: List of annual cash flows (year 0 = initial investment)
discount_rate: Annual discount rate (e.g., 0.08 for 8%)
Returns:
Net Present Value in same currency as cash flows
Example:
# Solar system investment analysis
initial_cost = -50000 # Negative = cash outflow
annual_savings = [5000] * 20 # 20 years of savings
cash_flows = [initial_cost] + annual_savings
npv = calculate_npv(cash_flows, 0.08)
"""
if not cash_flows:
raise ValueError("Cash flows list cannot be empty")
npv = 0.0
for year, cash_flow in enumerate(cash_flows):
discount_factor = (1 + discount_rate) ** year
npv += cash_flow / discount_factor
return npv
def calculate_npv_with_escalation(initial_cash_flow: float, annual_growth: float,
years: int, discount_rate: float,
initial_investment: float = 0) -> float:
"""Calculate NPV with escalating cash flows."""
cash_flows = [-initial_investment] if initial_investment > 0 else []
for year in range(years):
if year == 0:
cash_flows.append(initial_cash_flow)
else:
escalated_flow = initial_cash_flow * (1 + annual_growth) ** year
cash_flows.append(escalated_flow)
return calculate_npv(cash_flows, discount_rate)
Internal Rate of Return
def calculate_irr(cash_flows: List[float], guess: float = 0.1) -> float:
"""
Calculate Internal Rate of Return using numerical methods.
Parameters:
cash_flows: List of annual cash flows
guess: Initial guess for IRR (default 10%)
Returns:
IRR as decimal (e.g., 0.12 for 12%)
Raises:
ValueError: If no solution found
"""
def npv_function(rate):
"""NPV function for root finding."""
return sum(cf / (1 + rate) ** i for i, cf in enumerate(cash_flows))
try:
from scipy.optimize import fsolve
irr = fsolve(npv_function, guess)[0]
# Validate solution
if abs(npv_function(irr)) > 1e-6:
raise ValueError("IRR calculation did not converge")
return irr
except ImportError:
# Fallback to manual Newton-Raphson method
return newton_raphson_irr(cash_flows, guess)
def newton_raphson_irr(cash_flows: List[float], guess: float,
max_iterations: int = 100, tolerance: float = 1e-6) -> float:
"""Manual IRR calculation using Newton-Raphson method."""
def npv(rate):
return sum(cf / (1 + rate) ** i for i, cf in enumerate(cash_flows))
def npv_derivative(rate):
return sum(-i * cf / (1 + rate) ** (i + 1) for i, cf in enumerate(cash_flows))
rate = guess
for iteration in range(max_iterations):
npv_value = npv(rate)
if abs(npv_value) < tolerance:
return rate
npv_deriv = npv_derivative(rate)
if abs(npv_deriv) < tolerance:
raise ValueError("Derivative too small, cannot continue")
rate = rate - npv_value / npv_deriv
raise ValueError(f"IRR did not converge after {max_iterations} iterations")
Loan Calculations
def calculate_loan_payment(principal: float, annual_rate: float, years: int) -> float:
"""Calculate monthly loan payment."""
monthly_rate = annual_rate / 12
num_payments = years * 12
if monthly_rate == 0: # Interest-free loan
return principal / num_payments
payment = principal * (monthly_rate * (1 + monthly_rate) ** num_payments) / \
((1 + monthly_rate) ** num_payments - 1)
return payment
def generate_amortization_schedule(principal: float, annual_rate: float,
years: int) -> pd.DataFrame:
"""Generate complete loan amortization schedule."""
monthly_payment = calculate_loan_payment(principal, annual_rate, years)
monthly_rate = annual_rate / 12
num_payments = years * 12
schedule = []
remaining_balance = principal
for payment_num in range(1, num_payments + 1):
interest_payment = remaining_balance * monthly_rate
principal_payment = monthly_payment - interest_payment
remaining_balance -= principal_payment
schedule.append({
'payment_number': payment_num,
'payment_amount': monthly_payment,
'principal_payment': principal_payment,
'interest_payment': interest_payment,
'remaining_balance': max(0, remaining_balance), # Avoid negative due to rounding
'cumulative_interest': sum(row['interest_payment'] for row in schedule) + interest_payment,
'cumulative_principal': principal - remaining_balance
})
return pd.DataFrame(schedule)
Advanced Financial Analysis
Risk Analysis
def calculate_var(returns: List[float], confidence_level: float = 0.05) -> float:
"""Calculate Value at Risk for investment returns."""
import numpy as np
sorted_returns = sorted(returns)
index = int(confidence_level * len(sorted_returns))
return sorted_returns[index]
def monte_carlo_npv_analysis(base_cash_flows: List[float],
volatility: float, discount_rate: float,
num_simulations: int = 10000) -> Dict:
"""Monte Carlo analysis for NPV uncertainty."""
import numpy as np
npv_results = []
for _ in range(num_simulations):
# Add random variation to cash flows
random_factors = np.random.normal(1.0, volatility, len(base_cash_flows))
simulated_cash_flows = [cf * factor for cf, factor in zip(base_cash_flows, random_factors)]
npv = calculate_npv(simulated_cash_flows, discount_rate)
npv_results.append(npv)
return {
'mean_npv': np.mean(npv_results),
'std_npv': np.std(npv_results),
'percentile_5': np.percentile(npv_results, 5),
'percentile_95': np.percentile(npv_results, 95),
'probability_positive': sum(1 for npv in npv_results if npv > 0) / len(npv_results)
}
bess.py - Battery Storage Modeling
Battery System Classes
Comprehensive Battery Model
class BatteryModel:
"""
Comprehensive battery energy storage system model.
Features:
- State of charge tracking
- Efficiency modeling
- Cycle life calculations
- Temperature effects
- Calendar aging
"""
def __init__(self, capacity_kwh: float, power_rating_kw: float,
efficiency: float = 0.90, min_soc: float = 0.20,
max_soc: float = 1.0, cycle_life: int = 6000):
"""
Initialize battery model.
Parameters:
capacity_kwh: Usable battery capacity (kWh)
power_rating_kw: Maximum charge/discharge power (kW)
efficiency: Round-trip efficiency (0.0-1.0)
min_soc: Minimum state of charge (0.0-1.0)
max_soc: Maximum state of charge (0.0-1.0)
cycle_life: Expected number of full cycles
"""
self.capacity = capacity_kwh
self.power_rating = power_rating_kw
self.efficiency = efficiency
self.min_soc = min_soc
self.max_soc = max_soc
self.cycle_life = cycle_life
# State variables
self.soc = 0.5 # Start at 50%
self.total_cycles = 0.0
self.total_throughput = 0.0 # Total energy throughput (kWh)
# Performance tracking
self.charge_events = []
self.discharge_events = []
def charge(self, power_kw: float, duration_hours: float) -> float:
"""
Charge battery with given power for specified duration.
Parameters:
power_kw: Charging power (positive value)
duration_hours: Charging duration
Returns:
Actual energy stored (kWh)
"""
# Limit power to rating
actual_power = min(power_kw, self.power_rating)
# Calculate energy input
energy_input = actual_power * duration_hours
# Apply charging efficiency
energy_stored = energy_input * self.efficiency
# Calculate new SOC
soc_increase = energy_stored / self.capacity
new_soc = min(self.soc + soc_increase, self.max_soc)
# Calculate actual energy stored (limited by SOC)
actual_stored = (new_soc - self.soc) * self.capacity
# Update state
self.soc = new_soc
self.total_throughput += actual_stored
# Track charging event
self.charge_events.append({
'timestamp': datetime.now(),
'power': actual_power,
'duration': duration_hours,
'energy_stored': actual_stored
})
return actual_stored
def discharge(self, power_kw: float, duration_hours: float) -> float:
"""
Discharge battery with given power for specified duration.
Parameters:
power_kw: Discharge power (positive value)
duration_hours: Discharge duration
Returns:
Actual energy delivered (kWh)
"""
# Limit power to rating
actual_power = min(power_kw, self.power_rating)
# Calculate available energy
available_energy = (self.soc - self.min_soc) * self.capacity
# Calculate energy to deliver
energy_requested = actual_power * duration_hours
energy_deliverable = min(energy_requested, available_energy)
# Apply discharge efficiency
energy_delivered = energy_deliverable * self.efficiency
# Update SOC
soc_decrease = energy_deliverable / self.capacity
self.soc = max(self.soc - soc_decrease, self.min_soc)
# Update throughput and cycles
self.total_throughput += energy_deliverable
cycle_equivalent = energy_deliverable / self.capacity
self.total_cycles += cycle_equivalent
# Track discharge event
self.discharge_events.append({
'timestamp': datetime.now(),
'power': actual_power,
'duration': duration_hours,
'energy_delivered': energy_delivered
})
return energy_delivered
def get_degradation_factor(self) -> float:
"""Calculate current capacity degradation due to cycling."""
if self.total_cycles <= self.cycle_life:
# Linear degradation model (simplified)
degradation = 0.2 * (self.total_cycles / self.cycle_life) # 20% degradation at EOL
return max(0.8, 1.0 - degradation) # Minimum 80% capacity
else:
return 0.8 # End of life
def get_effective_capacity(self) -> float:
"""Get current effective capacity considering degradation."""
return self.capacity * self.get_degradation_factor()
Battery Optimization Algorithms
Peak Shaving Optimization
def optimize_peak_shaving(load_profile: List[float], battery: BatteryModel,
demand_charge_rate: float) -> Tuple[List[float], float]:
"""
Optimize battery dispatch for peak demand reduction.
Parameters:
load_profile: Hourly load profile (kW)
battery: Battery model instance
demand_charge_rate: Demand charge rate (RM/kW)
Returns:
Tuple of (optimal_battery_schedule, monthly_savings)
"""
import cvxpy as cp
hours = len(load_profile)
# Decision variables
battery_power = cp.Variable(hours) # Positive = discharge, negative = charge
soc = cp.Variable(hours + 1)
peak_demand = cp.Variable()
# Objective: minimize peak demand
objective = cp.Minimize(peak_demand)
# Constraints
constraints = []
# Peak demand constraint
for t in range(hours):
net_load = load_profile[t] - battery_power[t]
constraints.append(peak_demand >= net_load)
# Battery SOC dynamics
for t in range(hours):
efficiency = battery.efficiency if battery_power[t] <= 0 else 1/battery.efficiency
constraints.append(soc[t+1] == soc[t] - battery_power[t] * efficiency / battery.capacity)
# SOC bounds
constraints.append(soc >= battery.min_soc)
constraints.append(soc <= battery.max_soc)
constraints.append(soc[0] == battery.soc) # Initial SOC
constraints.append(soc[-1] == battery.soc) # End at same SOC
# Power bounds
constraints.append(battery_power >= -battery.power_rating) # Charge limit
constraints.append(battery_power <= battery.power_rating) # Discharge limit
# Solve optimization
problem = cp.Problem(objective, constraints)
problem.solve()
if problem.status == cp.OPTIMAL:
optimal_schedule = battery_power.value
optimized_peak = peak_demand.value
original_peak = max(load_profile)
monthly_savings = (original_peak - optimized_peak) * demand_charge_rate
return optimal_schedule, monthly_savings
else:
logger.warning("Peak shaving optimization failed")
return [0] * hours, 0.0
Energy Arbitrage Optimization
def optimize_energy_arbitrage(tariff_schedule: List[float], battery: BatteryModel,
base_load: List[float] = None) -> Tuple[List[float], float]:
"""
Optimize battery for energy arbitrage (buy low, sell high).
Parameters:
tariff_schedule: Hourly electricity rates (RM/kWh)
battery: Battery model instance
base_load: Base load profile (optional)
Returns:
Tuple of (optimal_battery_schedule, daily_savings)
"""
import cvxpy as cp
hours = len(tariff_schedule)
if base_load is None:
base_load = [0] * hours
# Decision variables
battery_power = cp.Variable(hours)
soc = cp.Variable(hours + 1)
grid_import = cp.Variable(hours)
# Objective: minimize electricity costs
objective = cp.Minimize(cp.sum(cp.multiply(tariff_schedule, grid_import)))
# Constraints
constraints = []
# Energy balance
for t in range(hours):
constraints.append(grid_import[t] == base_load[t] - battery_power[t])
# No export to grid (non-negative import)
constraints.append(grid_import >= 0)
# Battery SOC dynamics
for t in range(hours):
# Efficiency depends on charge/discharge direction
efficiency = cp.Variable()
constraints.append(efficiency == cp.where(battery_power[t] >= 0,
battery.efficiency,
1/battery.efficiency))
constraints.append(soc[t+1] == soc[t] + battery_power[t] * efficiency / battery.capacity)
# SOC bounds and cycling
constraints.append(soc >= battery.min_soc)
constraints.append(soc <= battery.max_soc)
constraints.append(soc[0] == battery.soc)
constraints.append(soc[-1] == battery.soc) # Daily cycling
# Power bounds
constraints.append(battery_power >= -battery.power_rating)
constraints.append(battery_power <= battery.power_rating)
# Solve optimization
problem = cp.Problem(objective, constraints)
problem.solve()
if problem.status == cp.OPTIMAL:
optimal_schedule = battery_power.value
optimized_cost = problem.value
baseline_cost = sum(rate * load for rate, load in zip(tariff_schedule, base_load))
daily_savings = baseline_cost - optimized_cost
return optimal_schedule, daily_savings
else:
logger.warning("Energy arbitrage optimization failed")
return [0] * hours, 0.0
This comprehensive utilities documentation provides developers with detailed understanding of the technical calculations, financial modeling, and battery optimization algorithms that power PVTools' optimization capabilities.