Ripyr: sampled metrics on datasets using python’s asyncio

Today I’d like to introduce a little python library I’ve toyed around with here and there for the past year or so, ripyr. Originally it was written just as an excuse to try out some newer features in modern python: asyncio and type hinting. The whole package is type hinted, which turned out to be a pretty low level of effort to implement, and the asyncio ended up being pretty speedy.

But first the goal: we wanted to stream through large datasets stored on disk in a memory efficient way, and parse out some basic metrics from them. Things like cardinality, what a date field’s format might be, an inferred type, that sort of thing.

It’s an interesting use-case, because in many cases, pandas is actually really performant here. The way pandas pulls data off of disk into a dataframe can balloon memory consumption for a short time, making analysis on very large files prohibitive, but short of that it’s pretty fast and easy.  So keep that in mind if you’re dealing with smaller datasets, YMMV.

So using asyncio to lower memory overhead is a great benefit, but additionally, I wanted to make a nicer interface for the developer. Anyone who’s written a lot of pandas-based code can probably parse what this is doing (and it probably can be done in some much nicer ways), but it’s not super pretty:

df = pd.read_csv('sample.csv')
df['date'] = pd.to_datetime(df['date'], infer_datetime_format=True)
report = {
    "columns": df.columns.values.tolist(),
    "metrics": {
        "A": {
            "count": df.shape[0]
        },
        "B": {
            "count": df.shape[0],
            "approximate_cardinality": len(pd.unique(df['B']))
        },
        "C": {
            "count": df.shape[0],
            "approximate_cardinality": len(pd.unique(df['C'])),
            "max": df['C'].max()
        },
        "D": {
            "count": df.shape[0]
        },
        "date": {
            "count": df.shape[0],
            "estimated_schema": 'pandas-internal'
        }
    }
}

The equivalent with ripyr is:

cleaner = StreamingColCleaner(source=CSVDiskSource(filename='sample.csv'))
cleaner.add_metric_to_all(CountMetric())
cleaner.add_metric('B', [CountMetric(), CardinalityMetric()])
cleaner.add_metric('C', [CardinalityMetric(size=10e6), MaxMetric()])
cleaner.add_metric('date', DateFormat())
cleaner.process_source(limit=10000, prob_skip=0.5)
print(json.dumps(cleaner.report(), indent=4, sort_keys=True))

I think that is a lot more readable. In that second to last line you’ll also see a 3rd and final cool thing we do in ripyr. So in this example, we will end up with a sampling of 10,000 records regardless of the total size of the dataset, and will skip rows as the yield of disk with a probability of 50%. So running this analysis on just the first N rows, or a random sampling of M rows, is super super easy. In many cases, that’s all you need to do, so we don’t need to be pulling huge datasets into memory in the first place.

Currently there are two supported source types:

  • CSVDisk: a CSV on disk, backed by the standard python CSVReader
  • JSONDisk: a file of one-json-blob-per-row data, without newlines in the json itself anywhere.

For a given source, you can apply any of a number of metrics:

  • categorical
    • approximate cardinality, based on a bloom filter
  • dates
    • date format inference
  • inference
    • type inference
  • numeric
    • count
    • min
    • max
    • histogram

It’s not on PyPI or anything yet, and isn’t super actively developed, since it was mostly just a learning exercise for asyncio and type hinting, but if you think it’s interesting, find me on github, or comment below. I’d be happy to collaborate with others that find this sort of thing useful.

https://github.com/predikto/ripyr