Donnerstag, 10. Februar 2011

Schedule MapReduce daily on AppEngine with Cron.yaml in Python

In order to create rich daily statistic for MiuMeet I need to run MapReduces on a daily basis.

With the standard MapReduce library and a little helper class this becomes very easy to do.

cron.yaml
The cron.yaml lets you define tasks that should be executed daily on your AppEngine app.


cron:
  - description: DailyStats MapReduce
  url: /cron_mapreduce?name=Stats&reader_spec=mymr.map&entity_kind=model.MyModel
  schedule: every day 00:00

The cron_mapreduce.py takes a couple of cgi-arguments:
  • name: The name of your MapReduce
  • reader_spec: The Mapper function of your MapReduce
  • entity_kind: The Datastore entity kind you want to process
  • reader_parameters (optional): The input reader class (default is datastore input reader)
  • processing_rate (optional): The processing rate of the input reader (default: 100)
  • done_callback (optional): A URL that should be called after the MapReduce finishes (default: None)

app.yaml
Add my library to the your app.yaml


handlers:
- url: /cron_mapreduce.*
   script: /cron_mapreduce.py
   login: admin


My cron_mapreduce.py library


#!/usr/bin/env python

"""
 Created by Andrin von Rechenberg, 2011.
 
 This library is free software: you can redistribute it
 and/or modify it under the terms of the GNU General Public License
 as published by the Free Software Foundation, either version 3 of
 the License, or (at your option) any later version.

 Example usage:
   http://devblog.miumeet.com/2011/02/schedule-mapreduce-daily-on-appengine.html    
 
Cheers,
 -Andrin

"""

from google.appengine.ext import webapp
from google.appengine.ext.webapp import util

from mapreduce import control as mr_control

class ScheduleMapReduce(webapp.RequestHandler):
  def get(self):
    mr_control.start_map(
     self.request.get("name"),
     self.request.get("reader_spec", "your_mapreduce.map"),
     self.request.get("reader_parameters",
                      "mapreduce.input_readers.DatastoreInputReader"),
     { "entity_kind": self.request.get("entity_kind", "models.YourModel"),
       "processing_rate": int(self.request.get("processing_rate", 100)) },
     mapreduce_parameters={"done_callback": self.request.get("done_callback",
                                                             None) } )
    self.response.out.write("MapReduce scheduled");

application = webapp.WSGIApplication([
  ('/.*', ScheduleMapReduce),
], debug=True)


def main():
  util.run_wsgi_app(application)
if __name__ == "__main__":
  main() 


Cheers,
-Andrin

0 Kommentare:

Kommentar veröffentlichen