#!/usr/bin/python import os import sys import popen2 import signal import logging import logging.handlers from twisted.internet import reactor from twisted.enterprise import adbapi import morfo def _postSuccessWordData(txn, word, prim, lemma, category, gram, language, raw): txn.execute("SELECT count(\"ID\") FROM words WHERE word='%s' AND language='%s'" % (word, language)) res = txn.fetchone() if res[0] == 0: txn.execute("INSERT INTO words \ (\"ID\", word, prim, lemma, category, gram, language, raw) \ VALUES \ (NEXTVAL('widseq'), '%s', '%s', '%s', '%s', '%s', '%s', '%s')" \ % (word, prim, lemma, category, gram, language, raw)) class Runner: # FRESH words WORDS_SQL = 'SELECT "ID", word, language FROM docrefs WHERE wstat=0 LIMIT 100' delays = [0, 1, 5, 10, 15, 20, 20, 20, 40, 40, 40, 60] delstep = 0 def __init__(self, reactor): self.dbpool = adbapi.ConnectionPool('psycopg2', user='evkk', database='evkk') self.reactor = reactor logger.debug("importing morfo...") self.morfo = morfo.Morfo() logger.debug("... morfo import done") def handleFreshWords(self, txn): res = txn.execute(self.WORDS_SQL) res = txn.fetchall() if len(res) > 0: logger.debug("Runner got %s words" % len(res)) self.delstep = 0 else: self.delstep = self.delstep + 1 if self.delstep > len(self.delays)-1: self.delstep = len(self.delays) - 1 logger.debug("Runner: Sleeping %s seconds" % self.delays[self.delstep]) ids = [] for r in res: txn.execute('UPDATE docrefs SET wstat=1 WHERE "ID"=%s' % r[0]) ids.append({'id': r[0], 'word':r[1], 'lang': r[2]}) self.reactor.callLater(self.delays[self.delstep], self.start) return ids def printErr(self, failure): logger.debug(failure) print failure self.reactor.stop() def _postUndecided(self, txn, word, lang, options): txn.execute("SELECT count(\"ID\") FROM undecided WHERE word='%s' AND language='%s'" % (word, lang)) res = txn.fetchone() if res[0] == 0: for option in options: txn.execute("INSERT INTO undecided (\"ID\", word, option, language) VALUES \ (NEXTVAL('unidseq'), '%s', '%s', '%s')" \ % (word, option, lang) ) def _processWords(self, ids): for wdata in ids: word = unicode(wdata['word'], 'utf-8') morf = self.morfo.getinfo(word) wstat = 3 dirMatch = False lang = wdata['lang'] if morf is None: wstat = 2 else: if len(morf) == 1: v = morf[0] if not v.startswith('#') and not v.startswith('?'): # direct match dirMatch = True if dirMatch: wstat = 3 v = morf[0].split(" ") # insert into words table prim = v[1][1:] lemma = v[5][1:] cat = v[6][1:] gram = v[3][1:] raw = morf[0] self.dbpool.runInteraction(_postSuccessWordData, word, prim, lemma, cat, gram, lang, raw) elif morf is not None: # multiple matches # insert into undecided table wstat = 4 self.dbpool.runInteraction(self._postUndecided, word, lang, morf) # update word status in docrefs table self.dbpool.runOperation('UPDATE docrefs SET wstat=%s WHERE "ID"=%s' % (wstat, wdata['id']) ) def queryWords(self): x = self.dbpool.runInteraction(self.handleFreshWords) x.addCallback(self._processWords) def start(self): self.reactor.callLater(0, self.queryWords) class UndecCopier: delays = [2, 5, 10, 15, 20, 20, 20, 40, 40, 40, 60] delstep = 0 def __init__(self, reactor): self.dbpool = adbapi.ConnectionPool('psycopg2', user='evkk', database='evkk') self.reactor = reactor def getTicked(self, txn): """ interaction getting all ticked and not processed words """ ids = [] res = txn.execute('SELECT "ID", word, option, language \ FROM undecided WHERE tick=true and processed=false') res = txn.fetchall() if len(res) > 0: logger.debug("Ticker got %s decisions." % len(res)) for r in res: ids.append({'id': r[0], 'word': r[1], 'option': r[2], 'language': r[3]}) return ids def _processTicked(self, data): for d in data: # process raw = d['option'] lang = d['language'] word = d['word'] v = raw.split(" ") prim = v[1][1:] lemma = v[5][1:] cat = v[6][1:] gram = v[3][1:] self.dbpool.runInteraction(_postSuccessWordData, word, prim, lemma, cat, gram, lang, raw) # mark this word processed self.dbpool.runOperation("UPDATE undecided set processed=true \ WHERE word='%s'" % d['word']) if len(data) == 0: self.delstep = self.delstep + 1 if self.delstep > len(self.delays)-1: self.delstep = len(self.delays) - 1 logger.debug("Ticker: Sleeping %s seconds" % self.delays[self.delstep]) else: self.delstep = 0 self.reactor.callLater(self.delays[self.delstep], self.start) def queryTicked(self): x = self.dbpool.runInteraction(self.getTicked) x.addCallback(self._processTicked) def start(self): self.reactor.callLater(0, self.queryTicked) # logging LOGFILE = 'lemmatize2.log' logger = logging.getLogger('lemmatize') logger.setLevel(logging.DEBUG) loghandler = logging.handlers.RotatingFileHandler( LOGFILE, maxBytes=1000000, backupCount=10) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") loghandler.setFormatter(formatter) logger.addHandler(loghandler) invalidUsage = False command = None if len(sys.argv) > 1: if sys.argv[1] not in ['start', 'stop']: invalidUsage = True else: command = sys.argv[1] if len(sys.argv) < 2 or invalidUsage: print "Usage: %s start|stop" % sys.argv[0] sys.exit() try: pidname = sys.argv[0][:sys.argv[0].index('.')]+'.pid' except ValueError: pidname = sys.argv[0]+'.pid' if command == "start": if os.path.exists(pidname): pif = open(pidname, 'r') extpid = int(pif.read()) pif.close() stdout, stdin = popen2.popen2('ps -o pid= %s' % extpid) if not stdout.read(): os.remove(pidname) if os.path.exists(pidname): print "Already running!" sys.exit(1) pid = os.fork() if not pid: runner = Runner(reactor) runner.start() undec = UndecCopier(reactor) undec.start() reactor.run() else: pif = open(pidname, 'w') pif.write(str(pid)) pif.close() print "Success" elif command == "stop": try: pif = open(pidname, 'r') pid = int(pif.read()) pif.close() except IOError: print "Script not running?" sys.exit() os.kill(pid, signal.SIGTERM) os.remove(pidname)