Skip to main content

Utilities Module

The utilities module provides core technical calculations, data processing functions, and supporting tools for PVTools optimization. This comprehensive guide covers all utility modules and their practical applications.

Module Organization

utilities/
├── __init__.py # Module initialization
├── technical.py # Grid integration and billing calculations
├── finance.py # Financial modeling and analysis
├── bess.py # Battery energy storage system modeling
├── processes.py # Data processing and API integration
├── profile_estimation.py # Load profile estimation algorithms
├── projection.py # Future projections and forecasting
├── validation.py # Data validation and error checking
└── tools.py # General utility functions

technical.py - Grid Integration

Core Functions

Bill Calculation Engine

The heart of TNB tariff calculations with support for all Malaysian tariff structures:

def calculate_bill(c: Dict, tariff_category: str, total_consumption: float,
peak_power_demand: Optional[float] = None,
peak_fraction: Optional[float] = None) -> float:
"""
Calculate TNB electricity bill with comprehensive tariff support.

Supports all TNB tariff categories:
- Domestic (B): Tiered energy charges
- Commercial (C1-C3): Energy + demand charges
- Industrial (D-F): Complex multi-part tariffs
- Agricultural (G): Specialized farming rates

Parameters:
c: Configuration with tariff data
tariff_category: TNB tariff code (B, C1, C2, etc.)
total_consumption: Monthly energy consumption (kWh)
peak_power_demand: Peak demand for commercial tariffs (kW)
peak_fraction: Peak hour consumption ratio (0.0-1.0)

Returns:
Total monthly bill in RM

Example:
# Residential tariff (tiered structure)
bill = calculate_bill(config, "B", 350.0)

# Commercial tariff with demand charges
bill = calculate_bill(config, "C1", 2500.0, peak_power_demand=15.0)

# Time-of-use tariff
bill = calculate_bill(config, "C2", 1800.0, peak_fraction=0.6)
"""

Implementation Details:

def calculate_bill(c, tariff_category, total_consumption, peak_power_demand=None, peak_fraction=None):
# Handle negative consumption (export scenarios)
if total_consumption < 0:
return total_consumption * c["standard_parameters"]["SMP_export"]["rate"]

tariff_data = c["grid_tariffs"][tariff_category]
charge_type = tariff_data.get("charge_type", "energy_only")

if charge_type == "tiered":
return calculate_tiered_bill(tariff_data, total_consumption)
elif charge_type == "energy_demand":
return calculate_energy_demand_bill(tariff_data, total_consumption, peak_power_demand)
elif charge_type == "time_of_use":
return calculate_tou_bill(tariff_data, total_consumption, peak_fraction)
else:
raise ValueError(f"Unsupported charge type: {charge_type}")

Tiered Billing Calculation

def calculate_tiered_bill(tariff_data: Dict, consumption: float) -> float:
"""Calculate bill for tiered tariff structure (e.g., Domestic B)."""
total_charge = 0.0
remaining_consumption = consumption

for tier in tariff_data["tiers"]:
tier_limit = tier["limit"]
tier_rate = tier["rate"]

if tier_limit is None: # Unlimited top tier
tier_consumption = remaining_consumption
else:
tier_consumption = min(remaining_consumption, tier_limit)

total_charge += tier_consumption * tier_rate
remaining_consumption -= tier_consumption

if remaining_consumption <= 0:
break

# Add service charge
total_charge += tariff_data.get("service_charge", 0)

return total_charge

Energy + Demand Billing

def calculate_energy_demand_bill(tariff_data: Dict, consumption: float, peak_demand: float) -> float:
"""Calculate bill for energy + demand tariff structure (e.g., Commercial C1)."""
# Energy charge
energy_rate = tariff_data["energy_rate"]
energy_charge = consumption * energy_rate

# Demand charge
demand_rate = tariff_data["demand_rate"]
demand_charge = peak_demand * demand_rate

# Service charge
service_charge = tariff_data.get("service_charge", 0)

return energy_charge + demand_charge + service_charge

Time-of-Use Billing

def calculate_tou_bill(tariff_data: Dict, consumption: float, peak_fraction: float) -> float:
"""Calculate bill for time-of-use tariff structure."""
if peak_fraction is None:
peak_fraction = 0.5 # Default assumption

peak_consumption = consumption * peak_fraction
off_peak_consumption = consumption * (1 - peak_fraction)

peak_rate = tariff_data["peak_rate"]
off_peak_rate = tariff_data["off_peak_rate"]

total_charge = (peak_consumption * peak_rate +
off_peak_consumption * off_peak_rate)

# Add service charge
total_charge += tariff_data.get("service_charge", 0)

return total_charge

Inverter Sizing Algorithms

Optimal Inverter Configuration

def size_inverters(system_capacity: float, inverter_data: Dict,
design_constraints: Dict = None) -> Dict:
"""
Determine optimal inverter configuration for system capacity.

Considers:
- Cost optimization
- Redundancy requirements
- String configuration
- Electrical code compliance

Parameters:
system_capacity: Total system capacity (kWp)
inverter_data: Available inverter specifications
design_constraints: Optional design requirements

Returns:
Dictionary with inverter configuration and costs
"""

available_inverters = inverter_data["models"]
configurations = []

# Evaluate all possible inverter combinations
for inverter_model in available_inverters:
config = optimize_inverter_combination(system_capacity, inverter_model)
if config["feasible"]:
configurations.append(config)

# Select optimal configuration based on cost and performance
optimal_config = min(configurations, key=lambda x: x["total_cost"])

return optimal_config

def optimize_inverter_combination(system_capacity: float, inverter_spec: Dict) -> Dict:
"""Optimize combination of specific inverter model."""
inverter_capacity = inverter_spec["capacity_kw"]
inverter_cost = inverter_spec["cost"]

# Calculate number of inverters needed
num_inverters = math.ceil(system_capacity / inverter_capacity)
total_inverter_capacity = num_inverters * inverter_capacity

# Check sizing ratio (typically 1.1-1.3 for optimal performance)
sizing_ratio = system_capacity / total_inverter_capacity

if 0.8 <= sizing_ratio <= 1.3: # Acceptable range
return {
"model": inverter_spec["model"],
"quantity": num_inverters,
"total_capacity": total_inverter_capacity,
"total_cost": num_inverters * inverter_cost,
"sizing_ratio": sizing_ratio,
"feasible": True
}
else:
return {"feasible": False}

Reverse Bill Calculation

def solve_pre_surcharge_bill(c: Dict, tariff_category: str, target_bill: float,
peak_demand: float, peak_fraction: float) -> float:
"""
Reverse-engineer energy consumption from target bill amount.

Used for load profile estimation when only bill amount is known.
Uses numerical optimization to find consumption that matches target bill.

Parameters:
c: Configuration with tariff data
tariff_category: TNB tariff code
target_bill: Target monthly bill (RM)
peak_demand: Peak power demand (kW)
peak_fraction: Peak hour consumption ratio

Returns:
Monthly energy consumption (kWh)
"""

def bill_error(consumption):
"""Calculate error between calculated and target bill."""
calculated_bill = calculate_bill(c, tariff_category, consumption[0],
peak_demand, peak_fraction)
return (calculated_bill - target_bill) ** 2

# Initial guess based on simple rate
simple_rate = 0.40 # Rough RM/kWh estimate
initial_guess = [target_bill / simple_rate]

# Optimize to find consumption
result = minimize(bill_error, initial_guess,
bounds=[(0, 10000)], method='L-BFGS-B')

if result.success:
return result.x[0]
else:
logger.warning("Failed to converge in bill reversal calculation")
return initial_guess[0]

finance.py - Financial Modeling

Core Financial Functions

Net Present Value Calculations

def calculate_npv(cash_flows: List[float], discount_rate: float) -> float:
"""
Calculate Net Present Value with proper error handling.

Parameters:
cash_flows: List of annual cash flows (year 0 = initial investment)
discount_rate: Annual discount rate (e.g., 0.08 for 8%)

Returns:
Net Present Value in same currency as cash flows

Example:
# Solar system investment analysis
initial_cost = -50000 # Negative = cash outflow
annual_savings = [5000] * 20 # 20 years of savings
cash_flows = [initial_cost] + annual_savings
npv = calculate_npv(cash_flows, 0.08)
"""

if not cash_flows:
raise ValueError("Cash flows list cannot be empty")

npv = 0.0
for year, cash_flow in enumerate(cash_flows):
discount_factor = (1 + discount_rate) ** year
npv += cash_flow / discount_factor

return npv

def calculate_npv_with_escalation(initial_cash_flow: float, annual_growth: float,
years: int, discount_rate: float,
initial_investment: float = 0) -> float:
"""Calculate NPV with escalating cash flows."""
cash_flows = [-initial_investment] if initial_investment > 0 else []

for year in range(years):
if year == 0:
cash_flows.append(initial_cash_flow)
else:
escalated_flow = initial_cash_flow * (1 + annual_growth) ** year
cash_flows.append(escalated_flow)

return calculate_npv(cash_flows, discount_rate)

Internal Rate of Return

def calculate_irr(cash_flows: List[float], guess: float = 0.1) -> float:
"""
Calculate Internal Rate of Return using numerical methods.

Parameters:
cash_flows: List of annual cash flows
guess: Initial guess for IRR (default 10%)

Returns:
IRR as decimal (e.g., 0.12 for 12%)

Raises:
ValueError: If no solution found
"""

def npv_function(rate):
"""NPV function for root finding."""
return sum(cf / (1 + rate) ** i for i, cf in enumerate(cash_flows))

try:
from scipy.optimize import fsolve
irr = fsolve(npv_function, guess)[0]

# Validate solution
if abs(npv_function(irr)) > 1e-6:
raise ValueError("IRR calculation did not converge")

return irr

except ImportError:
# Fallback to manual Newton-Raphson method
return newton_raphson_irr(cash_flows, guess)

def newton_raphson_irr(cash_flows: List[float], guess: float,
max_iterations: int = 100, tolerance: float = 1e-6) -> float:
"""Manual IRR calculation using Newton-Raphson method."""

def npv(rate):
return sum(cf / (1 + rate) ** i for i, cf in enumerate(cash_flows))

def npv_derivative(rate):
return sum(-i * cf / (1 + rate) ** (i + 1) for i, cf in enumerate(cash_flows))

rate = guess

for iteration in range(max_iterations):
npv_value = npv(rate)

if abs(npv_value) < tolerance:
return rate

npv_deriv = npv_derivative(rate)
if abs(npv_deriv) < tolerance:
raise ValueError("Derivative too small, cannot continue")

rate = rate - npv_value / npv_deriv

raise ValueError(f"IRR did not converge after {max_iterations} iterations")

Loan Calculations

def calculate_loan_payment(principal: float, annual_rate: float, years: int) -> float:
"""Calculate monthly loan payment."""
monthly_rate = annual_rate / 12
num_payments = years * 12

if monthly_rate == 0: # Interest-free loan
return principal / num_payments

payment = principal * (monthly_rate * (1 + monthly_rate) ** num_payments) / \
((1 + monthly_rate) ** num_payments - 1)

return payment

def generate_amortization_schedule(principal: float, annual_rate: float,
years: int) -> pd.DataFrame:
"""Generate complete loan amortization schedule."""

monthly_payment = calculate_loan_payment(principal, annual_rate, years)
monthly_rate = annual_rate / 12
num_payments = years * 12

schedule = []
remaining_balance = principal

for payment_num in range(1, num_payments + 1):
interest_payment = remaining_balance * monthly_rate
principal_payment = monthly_payment - interest_payment
remaining_balance -= principal_payment

schedule.append({
'payment_number': payment_num,
'payment_amount': monthly_payment,
'principal_payment': principal_payment,
'interest_payment': interest_payment,
'remaining_balance': max(0, remaining_balance), # Avoid negative due to rounding
'cumulative_interest': sum(row['interest_payment'] for row in schedule) + interest_payment,
'cumulative_principal': principal - remaining_balance
})

return pd.DataFrame(schedule)

Advanced Financial Analysis

Risk Analysis

def calculate_var(returns: List[float], confidence_level: float = 0.05) -> float:
"""Calculate Value at Risk for investment returns."""
import numpy as np

sorted_returns = sorted(returns)
index = int(confidence_level * len(sorted_returns))

return sorted_returns[index]

def monte_carlo_npv_analysis(base_cash_flows: List[float],
volatility: float, discount_rate: float,
num_simulations: int = 10000) -> Dict:
"""Monte Carlo analysis for NPV uncertainty."""
import numpy as np

npv_results = []

for _ in range(num_simulations):
# Add random variation to cash flows
random_factors = np.random.normal(1.0, volatility, len(base_cash_flows))
simulated_cash_flows = [cf * factor for cf, factor in zip(base_cash_flows, random_factors)]

npv = calculate_npv(simulated_cash_flows, discount_rate)
npv_results.append(npv)

return {
'mean_npv': np.mean(npv_results),
'std_npv': np.std(npv_results),
'percentile_5': np.percentile(npv_results, 5),
'percentile_95': np.percentile(npv_results, 95),
'probability_positive': sum(1 for npv in npv_results if npv > 0) / len(npv_results)
}

bess.py - Battery Storage Modeling

Battery System Classes

Comprehensive Battery Model

class BatteryModel:
"""
Comprehensive battery energy storage system model.

Features:
- State of charge tracking
- Efficiency modeling
- Cycle life calculations
- Temperature effects
- Calendar aging
"""

def __init__(self, capacity_kwh: float, power_rating_kw: float,
efficiency: float = 0.90, min_soc: float = 0.20,
max_soc: float = 1.0, cycle_life: int = 6000):
"""
Initialize battery model.

Parameters:
capacity_kwh: Usable battery capacity (kWh)
power_rating_kw: Maximum charge/discharge power (kW)
efficiency: Round-trip efficiency (0.0-1.0)
min_soc: Minimum state of charge (0.0-1.0)
max_soc: Maximum state of charge (0.0-1.0)
cycle_life: Expected number of full cycles
"""

self.capacity = capacity_kwh
self.power_rating = power_rating_kw
self.efficiency = efficiency
self.min_soc = min_soc
self.max_soc = max_soc
self.cycle_life = cycle_life

# State variables
self.soc = 0.5 # Start at 50%
self.total_cycles = 0.0
self.total_throughput = 0.0 # Total energy throughput (kWh)

# Performance tracking
self.charge_events = []
self.discharge_events = []

def charge(self, power_kw: float, duration_hours: float) -> float:
"""
Charge battery with given power for specified duration.

Parameters:
power_kw: Charging power (positive value)
duration_hours: Charging duration

Returns:
Actual energy stored (kWh)
"""

# Limit power to rating
actual_power = min(power_kw, self.power_rating)

# Calculate energy input
energy_input = actual_power * duration_hours

# Apply charging efficiency
energy_stored = energy_input * self.efficiency

# Calculate new SOC
soc_increase = energy_stored / self.capacity
new_soc = min(self.soc + soc_increase, self.max_soc)

# Calculate actual energy stored (limited by SOC)
actual_stored = (new_soc - self.soc) * self.capacity

# Update state
self.soc = new_soc
self.total_throughput += actual_stored

# Track charging event
self.charge_events.append({
'timestamp': datetime.now(),
'power': actual_power,
'duration': duration_hours,
'energy_stored': actual_stored
})

return actual_stored

def discharge(self, power_kw: float, duration_hours: float) -> float:
"""
Discharge battery with given power for specified duration.

Parameters:
power_kw: Discharge power (positive value)
duration_hours: Discharge duration

Returns:
Actual energy delivered (kWh)
"""

# Limit power to rating
actual_power = min(power_kw, self.power_rating)

# Calculate available energy
available_energy = (self.soc - self.min_soc) * self.capacity

# Calculate energy to deliver
energy_requested = actual_power * duration_hours
energy_deliverable = min(energy_requested, available_energy)

# Apply discharge efficiency
energy_delivered = energy_deliverable * self.efficiency

# Update SOC
soc_decrease = energy_deliverable / self.capacity
self.soc = max(self.soc - soc_decrease, self.min_soc)

# Update throughput and cycles
self.total_throughput += energy_deliverable
cycle_equivalent = energy_deliverable / self.capacity
self.total_cycles += cycle_equivalent

# Track discharge event
self.discharge_events.append({
'timestamp': datetime.now(),
'power': actual_power,
'duration': duration_hours,
'energy_delivered': energy_delivered
})

return energy_delivered

def get_degradation_factor(self) -> float:
"""Calculate current capacity degradation due to cycling."""
if self.total_cycles <= self.cycle_life:
# Linear degradation model (simplified)
degradation = 0.2 * (self.total_cycles / self.cycle_life) # 20% degradation at EOL
return max(0.8, 1.0 - degradation) # Minimum 80% capacity
else:
return 0.8 # End of life

def get_effective_capacity(self) -> float:
"""Get current effective capacity considering degradation."""
return self.capacity * self.get_degradation_factor()

Battery Optimization Algorithms

Peak Shaving Optimization

def optimize_peak_shaving(load_profile: List[float], battery: BatteryModel,
demand_charge_rate: float) -> Tuple[List[float], float]:
"""
Optimize battery dispatch for peak demand reduction.

Parameters:
load_profile: Hourly load profile (kW)
battery: Battery model instance
demand_charge_rate: Demand charge rate (RM/kW)

Returns:
Tuple of (optimal_battery_schedule, monthly_savings)
"""

import cvxpy as cp

hours = len(load_profile)

# Decision variables
battery_power = cp.Variable(hours) # Positive = discharge, negative = charge
soc = cp.Variable(hours + 1)
peak_demand = cp.Variable()

# Objective: minimize peak demand
objective = cp.Minimize(peak_demand)

# Constraints
constraints = []

# Peak demand constraint
for t in range(hours):
net_load = load_profile[t] - battery_power[t]
constraints.append(peak_demand >= net_load)

# Battery SOC dynamics
for t in range(hours):
efficiency = battery.efficiency if battery_power[t] <= 0 else 1/battery.efficiency
constraints.append(soc[t+1] == soc[t] - battery_power[t] * efficiency / battery.capacity)

# SOC bounds
constraints.append(soc >= battery.min_soc)
constraints.append(soc <= battery.max_soc)
constraints.append(soc[0] == battery.soc) # Initial SOC
constraints.append(soc[-1] == battery.soc) # End at same SOC

# Power bounds
constraints.append(battery_power >= -battery.power_rating) # Charge limit
constraints.append(battery_power <= battery.power_rating) # Discharge limit

# Solve optimization
problem = cp.Problem(objective, constraints)
problem.solve()

if problem.status == cp.OPTIMAL:
optimal_schedule = battery_power.value
optimized_peak = peak_demand.value
original_peak = max(load_profile)
monthly_savings = (original_peak - optimized_peak) * demand_charge_rate

return optimal_schedule, monthly_savings
else:
logger.warning("Peak shaving optimization failed")
return [0] * hours, 0.0

Energy Arbitrage Optimization

def optimize_energy_arbitrage(tariff_schedule: List[float], battery: BatteryModel,
base_load: List[float] = None) -> Tuple[List[float], float]:
"""
Optimize battery for energy arbitrage (buy low, sell high).

Parameters:
tariff_schedule: Hourly electricity rates (RM/kWh)
battery: Battery model instance
base_load: Base load profile (optional)

Returns:
Tuple of (optimal_battery_schedule, daily_savings)
"""

import cvxpy as cp

hours = len(tariff_schedule)
if base_load is None:
base_load = [0] * hours

# Decision variables
battery_power = cp.Variable(hours)
soc = cp.Variable(hours + 1)
grid_import = cp.Variable(hours)

# Objective: minimize electricity costs
objective = cp.Minimize(cp.sum(cp.multiply(tariff_schedule, grid_import)))

# Constraints
constraints = []

# Energy balance
for t in range(hours):
constraints.append(grid_import[t] == base_load[t] - battery_power[t])

# No export to grid (non-negative import)
constraints.append(grid_import >= 0)

# Battery SOC dynamics
for t in range(hours):
# Efficiency depends on charge/discharge direction
efficiency = cp.Variable()
constraints.append(efficiency == cp.where(battery_power[t] >= 0,
battery.efficiency,
1/battery.efficiency))
constraints.append(soc[t+1] == soc[t] + battery_power[t] * efficiency / battery.capacity)

# SOC bounds and cycling
constraints.append(soc >= battery.min_soc)
constraints.append(soc <= battery.max_soc)
constraints.append(soc[0] == battery.soc)
constraints.append(soc[-1] == battery.soc) # Daily cycling

# Power bounds
constraints.append(battery_power >= -battery.power_rating)
constraints.append(battery_power <= battery.power_rating)

# Solve optimization
problem = cp.Problem(objective, constraints)
problem.solve()

if problem.status == cp.OPTIMAL:
optimal_schedule = battery_power.value
optimized_cost = problem.value
baseline_cost = sum(rate * load for rate, load in zip(tariff_schedule, base_load))
daily_savings = baseline_cost - optimized_cost

return optimal_schedule, daily_savings
else:
logger.warning("Energy arbitrage optimization failed")
return [0] * hours, 0.0

This comprehensive utilities documentation provides developers with detailed understanding of the technical calculations, financial modeling, and battery optimization algorithms that power PVTools' optimization capabilities.