from zope.interface import implements from OFS.SimpleItem import SimpleItem from interfaces.SQIInterface import ISQISessionManager from random import choice import time class sqiSession(SimpleItem): meta_type = 'sqi_session' def __init__(self, sessionID, isAnon=0, username=''): """ create session object """ self.id = sessionID self.sessionID = sessionID self.isAnon = isAnon self.username = username # TODO: implement session timeout based on these values self.started = time.time() self.lastAccess = time.time() self._queryLanguage = 'vsql' self._resultsFormat = 'RSS' self._maxQueryResults = 100 self._maxDuration = 100 self._resultsSetSize = 25 self._sourceLocation = '' self._queries = {} self._results = {} def _setQueryLanguage(self, ql): self._queryLanguage = ql def _setResultsFormat(self, rf): self._resultsFormat = rf def _setMaxQueryResults(self, max): self._maxQueryResults = max def _setMaxDuration(self, dur): self._maxDuration = dur def _setResultsSetSize(self, size): self._resultsSetSize = size def _getResultsSetSize(self): return self._resultsSetSize def _setSourceLocation(self, location): self._sourceLocation = location def _getSourceLocation(self): return self._sourceLocation def addQuery(self, query, queryID): # queryID is 0 when it's syncQuery from zLOG import LOG LOG('SQICore', 0, 'Received query', query) self._queries[queryID] = query def getQuery(self, queryID): """ return query """ return self._queries[queryID] def delQuery(self, queryID): """ delete query """ del self._queries[queryID] def addResults(self, results, queryID): # queryID is 0 when it's syncQuery self._results[queryID] = results def getQueries(self): """ return queries """ return self._queries def setResultSet(self, results, queryID): self._results[queryID] = results def getResultSet(self, queryID): """ return results set """ return self._results[queryID] def getResults(self): """ return results """ return self._results class SQISessionManager: implements(ISQISessionManager) def __init__(self): """ init """ pass def createAnonymousSession(self): """ create anonymous session """ return self._createNewSession(isAnon=1) def createSession(self, username, password): """ create session. I don't care what the username and password is. """ result = 1 # we could add zope's internal user authentication here if hasattr(self, '_doUserCheck'): result = self._doUserCheck(username, password) if result: return self._createNewSession(isAnon=0, username=username) # FIXME: raise an SQIFault! return 0 def destroySession(self, sessionID): """ destroy session eg. logout of something """ self._delObject(sessionID) def _createNewSession(self, isAnon=1, username=''): """ create session and return session id """ from string import letters, digits chars = letters+digits sessID = ''.join([choice(chars) for i in range(15)]) s = sqiSession(sessID, isAnon, username) self._setObject(s.id, s) return sessID def _getSession(self, sessID): """ get session """ for x in self.objectValues('sqi_session'): if x.sessionID == sessID: return x return None def wsdl(self): """ return wsdl file """ from Products.PageTemplates.PageTemplateFile import PageTemplateFile wsdl = PageTemplateFile('sqiSessionManagement.wsdl.pt', globals()) return wsdl.__of__(self)()