Multi-Dimensional Aggregation¶
Aggregate cost-per-view across two dimensions (item and time-of-day) from CSV data -- the kind of multi-dimensional rollup you constantly build in feature engineering pipelines.
Problem¶
Given two CSV files:
- times.csv -- named time windows (e.g., Morning, Afternoon, Prime) with start/end times
- data.csv -- transaction records with timestamp, item ID, cost, and output (views)
Produce two summaries:
- Cost per view by item -- total cost / total views for each item
- Cost per view by time window and date -- same metric grouped by (time window, date)
No external libraries allowed beyond the standard library.
times.csv
"Start","End","Name"
"6:00 AM","12:00 PM","Morning"
"12:00 PM","4:00 PM","Afternoon"
"3:00 PM","8:00 PM","Prime"
data.csv
"Date","Time","Item","Cost","Output"
"01/02/2016","8:30 AM","ITEM001",120.50,100
"01/02/2016","11:30 AM","ITEM001",240.50,110
"01/02/2016","3:30 PM","ITEM002",500,80
...
Approach¶
- Parse time windows into
(start, end, name)tuples so we can match any timestamp to its window. - Stream through data rows -- for each row, parse its time, find the matching window, then accumulate cost and views into two separate groupings.
- Use a helper class (
CostCounter) to track running totals and compute cost-per-view on demand. This avoids a second pass over the data.
This is essentially a manual GROUP BY with two different key expressions -- something pandas handles in one line, but implementing it from scratch shows you understand the underlying mechanics.
Implementation¶
import csv
from datetime import datetime
from collections import namedtuple
ROTATIONS_FILE_PATH = 'rotations.csv'
SPOTS_FILE_PATH = 'spots.csv'
Rotation = namedtuple('Rotation', ['start', 'end', 'name'])
class CostCounter:
def __init__(self):
self.total_cost = 0.0
self.total_views = 0
def add_spot(self, spot):
self.total_cost += float(spot['Cost'])
self.total_views += int(spot['Output'])
@property
def cost(self):
return self.total_cost / self.total_views if self.total_views else float('inf')
def load_csv_data(file_path):
with open(file_path, mode='r', newline='', encoding='utf-8') as file:
return list(csv.DictReader(file))
def parse_time(time_str):
return datetime.strptime(time_str, '%I:%M %p').time()
def get_rotation(time, rotations):
for rotation in rotations:
if rotation.start <= time <= rotation.end:
return rotation.name
return None
def calculate_cost_metrics(spots, rotations):
cost_by_item = {}
cost_by_rotation_day = {}
for spot in spots:
spot['Rotation'] = get_rotation(parse_time(spot['Time']), rotations)
item = spot['Item']
rotation_day_key = (spot['Rotation'], spot['Date'])
cost_by_item.setdefault(item, CostCounter()).add_spot(spot)
cost_by_rotation_day.setdefault(rotation_day_key, CostCounter()).add_spot(spot)
return cost_by_item, cost_by_rotation_day
def main():
rotations = [Rotation(parse_time(rot['Start']), parse_time(rot['End']), rot['Name'])
for rot in load_csv_data(ROTATIONS_FILE_PATH)]
spots = load_csv_data(SPOTS_FILE_PATH)
cost_by_item, cost_by_rotation_day = calculate_cost_metrics(spots, rotations)
print({k: v.cost for k, v in cost_by_item.items()})
print({k: v.cost for k, v in cost_by_rotation_day.items()})
if __name__ == "__main__":
main()
Output
{'ITEM001': 2.8096153846153844, 'ITEM002': 3.404255319148936}
{('Morning', '01/02/2016'): 1.719..., ('Afternoon', '01/02/2016'): 4.642..., ...}
Complexity¶
- Time: O(n * m) where n = data rows and m = time windows (linear scan per row to match window)
- Space: O(k) where k = number of unique group keys
Takeaway¶
Building a GROUP BY from scratch clarifies what pandas does under the hood. It's also the right move when you need a lightweight, dependency-free computation in a Lambda or edge deployment where pulling in pandas isn't worth it.