TwitterBot on GAEにReplyさせる(その1)
前回
http://d.hatena.ne.jp/intheflight/20090611/p1
やっぱりBotらしく、話しかけられたら反応しなくちゃね!というわけでReply機能を追加します。まず必要なのがBot宛にReplyがあるか調べることです。これは定期的にチェックしなければいけません。そしてやはり反応は早いほうがいい。以上のことから、10分間隔でcronを用いてチェックすることにします。前回はデータを送信するという機能でしたが、今回はデータを受け取ってそれを処理しやすいように加工しなければいけません。具体的にはTwitterから送られたXMLデータをパースしてDatastoreに格納します。
最初にデータモデルを定義します。データモデルとは、GAE用のデータベースであるDatastoreに格納するときのデータ構造の定義です。いわゆるRDBのテーブル定義みたいなものです。これはTwitter APIを見ながら必要そうな項目から作ります。
http://watcher.moe-nifty.com/memo/2007/04/twitter_api.html
class Status(db.Model): id = db.IntegerProperty() user = db.StringProperty() text = db.StringProperty(multiline=True) created_at = db.DateTimeProperty() replied = db.BooleanProperty()
データモデルはdb.Modelを継承します。Twitter APIとは別に独自に追加した項目はrepliedで、これは既にReply済みか判断するフラグです。これでデータモデルは一応完成。
次はXMLパーサです。今回はパースと一緒にDatastoreへの格納も同時に行っているので、その2つの機能は分割したほうがいいんだろうけど面倒なのでしてません。
class XMLHandler: def __init__(self): self.status = None self.text = "" self.user_flg = False self.id_flg = False self.screen_name_flg = False self.text_flg = False self.created_at_flg = False def start_element(self, name, attr): if name == "status": self.status = Status() self.status.replied = False elif name == "user": self.user_flg = True elif name == "screen_name": self.screen_name_flg = True elif name == "text": self.text_flg = True elif self.user_flg == False and name == "created_at": self.created_at_flg = True elif self.user_flg == False and name == "id": self.id_flg = True def end_element(self, name): if name == "status": st = db.Query(Status) st.filter("id =", self.status.id); if st.count() == 0: self.status.put() self.status = None elif name == "user": self.user_flg = False elif name == "screen_name": self.screen_name_flg = False elif name == "text": self.status.text = self.text self.text = "" self.text_flg = False elif self.user_flg == False and name == "created_at": self.created_at_flg = False elif self.user_flg == False and name == "id": self.id_flg = False def char_data(self, data): if self.screen_name_flg == True: self.status.user = data elif self.text_flg == True: self.text += data elif self.created_at_flg == True: self.status.created_at = self.change_date(data) elif self.id_flg == True: self.status.id = int(data) def change_date(self, data): arr = data.split(" ") year = arr[5] month = 0 if arr[1] == "Jan": month = 1 elif arr[1] == "Feb": month = 2 elif arr[1] == "Mar": month = 3 elif arr[1] == "Apr": month = 4 elif arr[1] == "May": month = 5 elif arr[1] == "Jun": month = 6 elif arr[1] == "Jul": month = 7 elif arr[1] == "Aug": month = 8 elif arr[1] == "Sep": month = 9 elif arr[1] == "Oct": month = 10 elif arr[1] == "Nov": month = 11 elif arr[1] == "Dec": month = 12 day = arr[2] arr2 = arr[3].split(":") hour = arr2[0] minute = arr2[1] second = arr2[2] dt = datetime(int(year), int(month), int(day), int(hour), int(minute), int(second)) td = timedelta(hours=9) return dt + td class TwitterParser: def __init__(self): self.handler = XMLHandler() self.parser = xml.parsers.expat.ParserCreate() self.parser.StartElementHandler = self.handler.start_element self.parser.EndElementHandler = self.handler.end_element self.parser.CharacterDataHandler = self.handler.char_data def parse(self, text): self.parser.Parse(text)
時刻の変換とかもっと簡単なやり方があったら教えてください。end_elementでは既にDatastoreに格納済みのデータなら格納しないという処理をしてます。あと、必要なモジュールをインポートします。
import xml.parsers.expat from google.appengine.ext import db from datetime import *
準備ができたので、Twitterのタイムラインを取得する部分を作ります。とりあえず以下のコードです。パラメータはAPI仕様を見ながら調べてください。
http://watcher.moe-nifty.com/memo/2007/04/twitter_api.html
class NkskReply(webapp.RequestHandler): def get(self): # 受信部分 tw = TwitterParser() statuses = db.GqlQuery("SELECT * FROM Status ORDER BY created_at DESC LIMIT 1") url = "http://twitter.com/statuses/mentions.xml" if statuses.count() > 0: url += "?since_id=" url += str(statuses[0].id + 1) base64string =b64encode("%s:%s" % (username, password)) headers = {"Authorization": "Basic %s" % base64string} xml_result = urlfetch.fetch(url, method=urlfetch.GET, headers=headers).content.decode("utf-8") tw.parse(xml_result) # 送信部分 url = "http://twitter.com/statuses/update.xml" statuses2 = db.Query(Status) statuses2.filter("replied =", False) for status in statuses2: status.replied = True db.put(status) message = u"@" + status.user + u" キュフフ♪" payload = urllib.urlencode({'status': message.encode("utf-8")}) base64string =b64encode("%s:%s" % (username, password)) headers = {"Authorization": "Basic %s" % base64string} urlfetch.fetch(url, payload=payload, method=urlfetch.POST, headers=headers)
NkskReplyクラスのget()は受信部分と送信部分の2パートに分かれてます。まず、Datastoreから最新のStatusを取り出して、それよりも最新のReplyをTwitterから受け取ります。それをパーサに渡してDatastoreに格納します。ここまでが受信部分。送信部分では一旦格納したデータの中からReplyしてないデータを取り出してReplyします。Datastoreからのデータの取出しには2種類の方法があります。まずGQLを使う方法。
statuses = db.GqlQuery("SELECT * FROM Status") for status in statuses: status.replied = True
もうひとつはQuery()を使う方法。
statuses = db.Query(Status) statuses.filter("replied =", False) for status in statuses: status.replied = True
使いやすいほうを使ってください。詳細は以下のリンクで。
http://code.google.com/intl/ja/appengine/docs/python/datastore/
最後にURLマッピングしてnkskbot.pyの編集は終わりです。
application = webapp.WSGIApplication( [('/', MainPage)], [('/post', NkskPost)], [('/reply', NkskReply)], debug=True)
あとはapp.yamlとcron.yamlの編集です。app.yamlのhandlersに以下のコードを追加します。
- url: /reply script: nkskbot.py login: admin
cron.yamlにも以下のコードを追加します。
- description: nksk reply job url: /reply schedule: every 10 minutes
これでテストして大丈夫だったらアップロード。こんな感じです。