Multi-Dimensional Aggregation

Aggregate cost-per-view across two dimensions (item and time-of-day) from CSV data -- the kind of multi-dimensional rollup you constantly build in feature engineering pipelines.

Problem

Given two CSV files:

  • times.csv -- named time windows (e.g., Morning, Afternoon, Prime) with start/end times
  • data.csv -- transaction records with timestamp, item ID, cost, and output (views)

Produce two summaries:

  1. Cost per view by item -- total cost / total views for each item
  2. Cost per view by time window and date -- same metric grouped by (time window, date)

No external libraries allowed beyond the standard library.

times.csv

"Start","End","Name"
"6:00 AM","12:00 PM","Morning"
"12:00 PM","4:00 PM","Afternoon"
"3:00 PM","8:00 PM","Prime"

data.csv

"Date","Time","Item","Cost","Output"
"01/02/2016","8:30 AM","ITEM001",120.50,100
"01/02/2016","11:30 AM","ITEM001",240.50,110
"01/02/2016","3:30 PM","ITEM002",500,80
...

Approach

  1. Parse time windows into (start, end, name) tuples so we can match any timestamp to its window.
  2. Stream through data rows -- for each row, parse its time, find the matching window, then accumulate cost and views into two separate groupings.
  3. Use a helper class (CostCounter) to track running totals and compute cost-per-view on demand. This avoids a second pass over the data.

This is essentially a manual GROUP BY with two different key expressions -- something pandas handles in one line, but implementing it from scratch shows you understand the underlying mechanics.

Implementation

import csv
from datetime import datetime
from collections import namedtuple

ROTATIONS_FILE_PATH = 'rotations.csv'
SPOTS_FILE_PATH = 'spots.csv'

Rotation = namedtuple('Rotation', ['start', 'end', 'name'])


class CostCounter:
    def __init__(self):
        self.total_cost = 0.0
        self.total_views = 0

    def add_spot(self, spot):
        self.total_cost += float(spot['Cost'])
        self.total_views += int(spot['Output'])

    @property
    def cost(self):
        return self.total_cost / self.total_views if self.total_views else float('inf')


def load_csv_data(file_path):
    with open(file_path, mode='r', newline='', encoding='utf-8') as file:
        return list(csv.DictReader(file))


def parse_time(time_str):
    return datetime.strptime(time_str, '%I:%M %p').time()


def get_rotation(time, rotations):
    for rotation in rotations:
        if rotation.start <= time <= rotation.end:
            return rotation.name
    return None


def calculate_cost_metrics(spots, rotations):
    cost_by_item = {}
    cost_by_rotation_day = {}

    for spot in spots:
        spot['Rotation'] = get_rotation(parse_time(spot['Time']), rotations)
        item = spot['Item']
        rotation_day_key = (spot['Rotation'], spot['Date'])

        cost_by_item.setdefault(item, CostCounter()).add_spot(spot)
        cost_by_rotation_day.setdefault(rotation_day_key, CostCounter()).add_spot(spot)

    return cost_by_item, cost_by_rotation_day


def main():
    rotations = [Rotation(parse_time(rot['Start']), parse_time(rot['End']), rot['Name'])
                 for rot in load_csv_data(ROTATIONS_FILE_PATH)]
    spots = load_csv_data(SPOTS_FILE_PATH)
    cost_by_item, cost_by_rotation_day = calculate_cost_metrics(spots, rotations)

    print({k: v.cost for k, v in cost_by_item.items()})
    print({k: v.cost for k, v in cost_by_rotation_day.items()})


if __name__ == "__main__":
    main()

Output

{'ITEM001': 2.8096153846153844, 'ITEM002': 3.404255319148936}
{('Morning', '01/02/2016'): 1.719..., ('Afternoon', '01/02/2016'): 4.642..., ...}

Complexity

  • Time: O(n * m) where n = data rows and m = time windows (linear scan per row to match window)
  • Space: O(k) where k = number of unique group keys

Takeaway

Building a GROUP BY from scratch clarifies what pandas does under the hood. It's also the right move when you need a lightweight, dependency-free computation in a Lambda or edge deployment where pulling in pandas isn't worth it.


Back to Software Design