Multi-Dimensional Aggregation¶
Aggregate cost-per-view across two dimensions (item and time-of-day) from CSV data, the kind of multi-dimensional rollup you constantly build in feature engineering pipelines.
Problem¶
Given two CSV files:
times.csv: named time windows (e.g., Morning, Afternoon, Prime) with start/end timesdata.csv: transaction records with timestamp, item ID, cost, and output (views)
Produce two summaries:
- Cost per view by item, total cost / total views for each item
- Cost per view by time window and date, the same metric grouped by (time window, date)
No external libraries allowed beyond the standard library.
times.csv
"Start","End","Name"
"6:00 AM","12:00 PM","Morning"
"12:00 PM","4:00 PM","Afternoon"
"3:00 PM","8:00 PM","Prime"
data.csv
"Date","Time","Item","Cost","Output"
"01/02/2016","8:30 AM","ITEM001",120.50,100
"01/02/2016","11:30 AM","ITEM001",240.50,110
"01/02/2016","3:30 PM","ITEM002",500,80
...
Approach¶
- Parse the time windows into
(start, end, name)tuples so we can match any timestamp to its window. - Stream through the data rows. For each row, parse its time, find the matching window, then accumulate cost and views into two separate groupings.
- Use a helper class (
CostCounter) to track running totals and compute cost-per-view on demand. This avoids a second pass over the data.
This is essentially a manual GROUP BY with two different key expressions, something pandas handles in one line, but implementing it from scratch shows you understand the underlying mechanics.
Implementation¶
import csv
from datetime import datetime
from collections import namedtuple
ROTATIONS_FILE_PATH = 'rotations.csv'
SPOTS_FILE_PATH = 'spots.csv'
Rotation = namedtuple('Rotation', ['start', 'end', 'name'])
class CostCounter:
def __init__(self):
self.total_cost = 0.0
self.total_views = 0
def add_spot(self, spot):
self.total_cost += float(spot['Cost'])
self.total_views += int(spot['Output'])
@property
def cost(self):
return self.total_cost / self.total_views if self.total_views else float('inf')
def load_csv_data(file_path):
with open(file_path, mode='r', newline='', encoding='utf-8') as file:
return list(csv.DictReader(file))
def parse_time(time_str):
return datetime.strptime(time_str, '%I:%M %p').time()
def get_rotation(time, rotations):
for rotation in rotations:
if rotation.start <= time <= rotation.end:
return rotation.name
return None
def calculate_cost_metrics(spots, rotations):
cost_by_item = {}
cost_by_rotation_day = {}
for spot in spots:
spot['Rotation'] = get_rotation(parse_time(spot['Time']), rotations)
item = spot['Item']
rotation_day_key = (spot['Rotation'], spot['Date'])
cost_by_item.setdefault(item, CostCounter()).add_spot(spot)
cost_by_rotation_day.setdefault(rotation_day_key, CostCounter()).add_spot(spot)
return cost_by_item, cost_by_rotation_day
def main():
rotations = [Rotation(parse_time(rot['Start']), parse_time(rot['End']), rot['Name'])
for rot in load_csv_data(ROTATIONS_FILE_PATH)]
spots = load_csv_data(SPOTS_FILE_PATH)
cost_by_item, cost_by_rotation_day = calculate_cost_metrics(spots, rotations)
print({k: v.cost for k, v in cost_by_item.items()})
print({k: v.cost for k, v in cost_by_rotation_day.items()})
if __name__ == "__main__":
main()
Output:
{'ITEM001': 2.8096153846153844, 'ITEM002': 3.404255319148936}
{('Morning', '01/02/2016'): 1.719..., ('Afternoon', '01/02/2016'): 4.642..., ...}
Complexity¶
- Time: O(n * m) where n = data rows and m = time windows (linear scan per row to match window)
- Space: O(k) where k = number of unique group keys
Takeaway¶
Building a GROUP BY from scratch clarifies what pandas does under the hood. It's also the right move when you need a lightweight, dependency-free computation in a Lambda or edge deployment where pulling in pandas isn't worth it.