#!/usr/bin/env python3
import asyncio
from collections import defaultdict
import pickle
import zmq.asyncio
class _VarInfo(list):
""" Stores observation data for a particular variable with key + s3 versions. Behaves like a list."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.key_version = self.s3_version_id = None
class _StationInfo(defaultdict):
""" Stores information about a given station, behaves like a dictionary. """
def __init__(self, lat: float, lon: float, elev_m: float):
# initialize our parent class (defaultdict) to have a default `_VarInfo` key value: defaultdict(_VarInfo)
super().__init__(_VarInfo) # { var_name: [(obs_epoch, obs_value), ...]
self.lat = lat
self.lon = lon
self.elev_m = elev_m
def add_var_obs(self, cleaned_obs_data: dict, obs_epoch: float):
for var_name, obs_value in cleaned_obs_data.items():
self[var_name].append((obs_epoch, obs_value))
def _process_queued_obs(_input_lock, _queued_obs, _backlog):
assert _input_lock.locked()
try:
while _queued_obs:
# Process list in FIFO manner
cleaned_data_list = _queued_obs.pop(0)
for stn_tuple, lat, lon, elev_m, obs_epoch, cleaned_obs_data in cleaned_data_list:
stn_info = _backlog.get(stn_tuple)
if stn_info is None:
stn_info = _backlog[stn_tuple] = _StationInfo(lat, lon, elev_m)
else:
# update lat, lon for a given s2id. Given that list_objects returns files from oldest to newest, we'll
# continuously replace the lon/lat of a given StationTuple with the "latest" version
stn_info.lat = lat
stn_info.lon = lon
stn_info.elev_m = elev_m
# append new data
stn_info.add_var_obs(cleaned_obs_data, obs_epoch)
except:
_success = False
raise
async def main():
_input_lock = asyncio.Lock()
for x in range(500):
with open('/tmp/q.pickle', 'rb') as f:
queued_obs = pickle.loads(f.read())
backlog = dict()
async with _input_lock:
await _loop.run_in_executor(None, _process_queued_obs, _input_lock, queued_obs, backlog)
assert len(backlog) > 30000
if __name__ == '__main__':
# _loop = asyncio.get_event_loop()
_loop = zmq.asyncio.ZMQEventLoop()
asyncio.set_event_loop(_loop)
_loop.run_until_complete(main())