# -*- coding: utf-8
# $Id: Statistics.py 517 2006-02-27 16:35:41Z vahur $
#
# Copyright 2006 by IVA Team and contributors
#
# This file is part of IVA.
#
# IVA is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# IVA is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with IVA; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
""" SQI interface for IVA """
__version__ = "$Revision: 0 $"[11:-2]
import Globals
from Globals import Persistent
from OFS import Folder
from AccessControl import ClassSecurityInfo
from Products.iva.Cruft import Cruft
from Products.iva.common import translate, perm_edit, perm_view, perm_manage, mkTime
from Products.SQICore.SQICore import SQICore
from Products.SQICore.SQISessionManager import SQISessionManager
from Products.SQICore.interfaces.SQIInterface import ISQIInterface
from Products.SQICore.interfaces.SQIInterface import ISQISessionManager
from zope.interface import implements
import time
from Products.iva.ImportExport import get_text
class SQITool(Folder.Folder,
SQICore,
Persistent,
Cruft
):
""" SQI Tool """
implements(ISQIInterface)
meta_type = 'SQITool'
security = ClassSecurityInfo()
security.declareObjectPublic()
def __init__(self):
""" initialize SQITool """
self.id = 'sqitool'
ssm = SQISessionManager()
self._setObject(ssm.id, ssm)
#SQICore.__init__(self)
def standard_error_message(self, error_value='', error_traceback='', error_tb='', error_type='', error_message='', error_log_url=''):
""" asd """
return error_traceback, error_type, error_message, error_value
def _processQuery(self, sessID, queryID):
""" execute query, package results as client requested """
# do query
sess = self._getSession(sessID)
parser = None
#print "------------ QUERY LANGUAGE ------------"
#print sess._queryLanguage
if sess._queryLanguage.lower() == 'vsql':
from Products.SQICore.Parsers import VSQI as parser
if sess._queryLanguage.lower() == 'keywords':
from Products.SQICore.Parsers import Keywords as parser
query = parser(self, sessID, queryID)
all_empty = True
for x in query.keys():
if not query[x]:
del query[x]
else:
all_empty = False
results = None
if query.has_key('object_uid'):
#print "---------------- HAS OBJECT_UID:: ----------------"
#print query.get('object_uid')
# TODO: implement object query here
results = None
elif not all_empty:
results = self._execQuery(sessID, query)
sess.setResultSet(results, queryID)
else:
raise 'Panic 91'
return results
def _execQuery(self, sessID, query):
""" execute query """
cat = None
cat = self.blogs_zcatalog
results = cat.search(query)
cat = self.catalog_webtop_items
results += cat.search(query)
return results
def getFormatterConf(self, rf):
if rf == 'RSS':
return {'timestamp': 'get_timestamp',
'title': 'getTitle',
'intro': 'getIntro',
'channel_title': 'IVA',
'channel_link': self.fle_root().absolute_url(),
'channel_desc': 'Channel description here',
'channel_generator': 'IVA SQI',
'channel_lang': 'en'}
elif rf == 'SingleObject':
return {}
elif rf == 'Objecti':
return {'timestamp': 'get_timestamp',
'title': 'getTitle',
'modified': 'get_timestamp'}
#
# configured repositories
#
def getRepositories(self):
""" return a list of configured repositories """
if not hasattr(self, 'repositories_list'):
self.repositories_list = {}
self._p_changed = True
return self.repositories_list
security.declareProtected(perm_manage, 'setRepositories')
def setRepositories(self, REQUEST):
""" set repositories """
title = REQUEST.get('new_repos_title', '')
main_wsdl = REQUEST.get('new_main_wsdl_location', '')
sess_wsdl = REQUEST.get('new_session_wsdl_location', '')
if not title or not main_wsdl or not sess_wsdl:
return 1
if not hasattr(self, 'repositories_list'):
self.repositories_list = {}
self.repositories_list[str(time.time())] = {'title':title, 'main_wsdl':main_wsdl, 'sess_wsdl': sess_wsdl}
self._p_changed = True
return 0
security.declareProtected(perm_manage, 'delRepository')
def delRepository(self, req):
""" remove repository from list """
for x in req.get('to_del_repo', []):
del self.repositories_list[x]
#
# methods for making SOAP/SQI query
#
def repository_search(self, keyword):
""" make a query to remote repository/portal """
from SOAPpy import WSDL
from SOAPpy.wstools.WSDLTools import WSDLError
objs = []
for x in self.repositories_list.values():
try:
server = WSDL.Proxy(x['main_wsdl'])
except WSDLError:
continue
try:
auth = WSDL.Proxy(x['sess_wsdl'])
except WSDLError:
continue
sessID = auth.createAnonymousSession()
res = server.setResultsFormat(sessID, 'Objecti')
if not res: continue
query = ''
query += ''
keys = keyword.split()
for k in keys:
query += '%s' % k
query += ''
res = server.synchronousQuery(sessID, query, 0)
objs += self.parseResults(res, x['main_wsdl'], x['sess_wsdl'])
return objs
def parseResults(self, xmlstring, main_wsdl=None, sess_wsdl=None):
""" parse results that are in Objecti format """
from xml.dom.minidom import parseString
from Products.iva.ImportExport import get_text
if not hasattr(self, 'objects_cache'):
self.objects_cache = {}
r = parseString(xmlstring.encode('utf-8'))
r.normalize()
items = r.getElementsByTagName('item')
results = []
for item in items:
tmp = {}
tmp['title'] = get_text(item.getElementsByTagName('title')[0])
tmp['link'] = get_text(item.getElementsByTagName('link')[0])
tmp['pub'] = get_text(item.getElementsByTagName('pubDate')[0])
try: tmp['mod'] = get_text(item.getElementsByTagName('modDate')[0])
except IndexError: tmp['mod'] = get_text(item.getElementsByTagName('pubDate')[0])
tmp['guid'] = get_text(item.getElementsByTagName('guid')[0])
if main_wsdl: tmp['main_wsdl'] = main_wsdl
if sess_wsdl: tmp['sess_wsdl'] = sess_wsdl
results.append(tmp)
# cache
self.objects_cache[tmp['guid']] = tmp
self.objects_cache[tmp['guid']]['time_stored'] = time.time()
return results
def addProxyObject(self, context, guid):
""" add proxy object """
from ProxyFile import ProxyFile
pf = ProxyFile(context, self.objects_cache[guid].get('title'), self.objects_cache[guid])
context._setObject(pf.id, pf)
def isVisible(self):
""" boolean - visible to users or not... """
if not hasattr(self, 'is_visible'):
self.is_visible = False
return self.is_visible
security.declareProtected(perm_manage, 'setVisible')
def setVisible(self, val):
""" set visibility """
self.is_visible = int(val)
#
# not used
#
def make_query(self, REQUEST):
""" make SOAP query to ... somewhere ... """
hostname = 'http://localhost/wordpress'
username = 'admin'
password = 'admin'
from SOAPpy import WSDL
auth = WSDL.Proxy(hostname+'/wp-sqi/sqisession.php?wsdl')
service = WSDL.Proxy(hostname+'/wp-sqi/sqihandler.php?wsdl')
sessionID = auth.createSession('admin', 'admin')
if sessionID == '0':
return "couldn't log in"
# logic here
service.setQueryLanguage(sessionID, 'VSQI')
query = """"""
res = service.synchronousQuery(sessionID, query)
auth.destroySession(sessionID)
REQUEST.RESPONSE.setHeader('Content-Type', 'text/xml')
from xml.dom.minidom import parseString
q = parseString(res)
q.normalize()
posts = []
for x in q.getElementsByTagName('seq'):
for y in x.childNodes:
if y.nodeType != 1:
continue
name = y.nodeName
value = ''
for c in y.childNodes:
if c.nodeType == 4:
value = c.nodeValue
print name, "\t\t", value
return res
Globals.InitializeClass(SQITool)
class SQISessionManager(Folder.Folder,
SQISessionManager,
Persistent,
Cruft
):
""" SQI session manager """
implements(ISQISessionManager)
meta_type = 'SQISessionManager'
security = ClassSecurityInfo()
security.declareObjectPublic()
manage_options = Folder.Folder.manage_options
def __init__(self):
""" initialize sqi session manager """
self.id = 'sessionmanager'
def _doUserCheck(self, username, password):
""" check username and password """
if self.acl_users.authenticate(username, password, None):
return 1
#if username == 'riho' and password == 'mattias':
# return 1
return 0