I need to import a zipped csv into a mongo collection, but there is a catch - every record contains a timestamp in Pacific Time, which must be converted to the local time corresponding to the (longitude,latitude) pair found in the same record.
The code looks like so:
def read_csv_zip(path, timezones):
with ZipFile(path) as z, z.open(z.namelist()[0]) as input:
csv_rows = csv.reader(input)
header = csv_rows.next()
check,converters = get_aux_stuff(header)
for csv_row in csv_rows:
if check(csv_row):
row = {
converter[0]:converter[1](value)
for converter, value in zip(converters, csv_row)
if allow_field(converter)
}
ts = row['ts']
lng, lat = row['loc']
found_tz_entry = timezones.find_one(SON({'loc': {'$within': {'$box': [[lng-tz_lookup_radius, lat-tz_lookup_radius],[lng+tz_lookup_radius, lat+tz_lookup_radius]]}}}))
if found_tz_entry:
tz_name = found_tz_entry['tz']
local_ts = ts.astimezone(timezone(tz_name)).replace(tzinfo=None)
row['tz'] = tz_name
else:
local_ts = (ts.astimezone(utc) + timedelta(hours = int(lng/15))).replace(tzinfo = None)
row['local_ts'] = local_ts
yield row
def insert_documents(collection, source, batch_size):
while True:
items = list(itertools.islice(source, batch_size))
if len(items) == 0:
break;
try:
collection.insert(items)
except:
for item in items:
try:
collection.insert(item)
except Exception as exc:
print("Failed to insert record {0} - {1}".format(item['_id'], exc))
def main(zip_path):
with Connection() as connection:
data = connection.mydb.data
timezones = connection.timezones.data
insert_documents(data, read_csv_zip(zip_path, timezones), 1000)
The code proceeds as follows:
Every record read from the csv is checked and converted to a dictionary, where some fields may be skipped, some titles be renamed (from those appearing in the csv header), some values may be converted (to datetime, to integers, to floats. etc ...)
For each record read from the csv, a lookup is made into the timezones collection to map the record location to the respective time zone. If the mapping is successful - that timezone is used to convert the record timestamp (pacific time) to the respective local timestamp. If no mapping is found - a rough approximation is calculated.
The timezones collection is appropriately indexed, of course - calling explain() confirms it.
The process is slow. Naturally, having to query the timezones collection for every record kills the performance. I am looking for advises on how to improve it.
Thanks.
EDIT
The timezones collection contains 8176040 records, each containing four values:
> db.data.findOne()
{ "_id" : 3038814, "loc" : [ 1.48333, 42.5 ], "tz" : "Europe/Andorra" }
EDIT2
OK, I have compiled a release build of http://toblerity.github.com/rtree/ and configured the rtree package. Then I have created an rtree dat/idx pair of files corresponding to my timezones collection. So, instead of calling collection.find_one I call index.intersection. Surprisingly, not only there is no improvement, but it works even more slowly now! May be rtree could be fine tuned to load the entire dat/idx pair into RAM (704M), but I do not know how to do it. Until then, it is not an alternative.
In general, I think the solution should involve parallelization of the task.
EDIT3
Profile output when using collection.find_one:
>>> p.sort_stats('cumulative').print_stats(10)
Tue Apr 10 14:28:39 2012 ImportDataIntoMongo.profile
64549590 function calls (64549180 primitive calls) in 1231.257 seconds
Ordered by: cumulative time
List reduced from 730 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.012 0.012 1231.257 1231.257 ImportDataIntoMongo.py:1(<module>)
1 0.001 0.001 1230.959 1230.959 ImportDataIntoMongo.py:187(main)
1 853.558 853.558 853.558 853.558 {raw_input}
1 0.598 0.598 370.510 370.510 ImportDataIntoMongo.py:165(insert_documents)
343407 9.965 0.000 359.034 0.001 ImportDataIntoMongo.py:137(read_csv_zip)
343408 2.927 0.000 287.035 0.001 c:\python27\lib\site-packages\pymongo\collection.py:489(find_one)
343408 1.842 0.000 274.803 0.001 c:\python27\lib\site-packages\pymongo\cursor.py:699(next)
343408 2.542 0.000 271.212 0.001 c:\python27\lib\site-packages\pymongo\cursor.py:644(_refresh)
343408 4.512 0.000 253.673 0.001 c:\python27\lib\site-packages\pymongo\cursor.py:605(__send_message)
343408 0.971 0.000 242.078 0.001 c:\python27\lib\site-packages\pymongo\connection.py:871(_send_message_with_response)
Profile output when using index.intersection:
>>> p.sort_stats('cumulative').print_stats(10)
Wed Apr 11 16:21:31 2012 ImportDataIntoMongo.profile
41542960 function calls (41542536 primitive calls) in 2889.164 seconds
Ordered by: cumulative time
List reduced from 778 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.028 0.028 2889.164 2889.164 ImportDataIntoMongo.py:1(<module>)
1 0.017 0.017 2888.679 2888.679 ImportDataIntoMongo.py:202(main)
1 2365.526 2365.526 2365.526 2365.526 {raw_input}
1 0.766 0.766 502.817 502.817 ImportDataIntoMongo.py:180(insert_documents)
343407 9.147 0.000 491.433 0.001 ImportDataIntoMongo.py:152(read_csv_zip)
343406 0.571 0.000 391.394 0.001 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:384(intersection)
343406 379.957 0.001 390.824 0.001 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:435(_intersection_obj)
686513 22.616 0.000 38.705 0.000 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:451(_get_objects)
343406 6.134 0.000 33.326 0.000 ImportDataIntoMongo.py:162(<dictcomp>)
346 0.396 0.001 30.665 0.089 c:\python27\lib\site-packages\pymongo\collection.py:240(insert)
EDIT4
I have parallelized the code, but the results are still not very encouraging. I am convinced it could be done better. See my own answer to this question for details.