# -*- coding: utf-8 # $Id$ # # Copyright 2001, 2002 by IVA Team and contributors # # This file is part of IVA. # # IVA is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # IVA is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with IVA; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """Contains class Cruft, which is used as a placeholder for miscellaneous utility functions that many objects need.""" __version__ = "$Revision$"[11:-2] from AccessControl import ClassSecurityInfo from AccessControl import getSecurityManager from urllib import quote from common import perm_view, perm_edit, perm_manage, perm_add_lo, translate import re import common import time from common import get_local_roles import string from DateTime import DateTime import calendar calendar.setfirstweekday(0) from Errors import FleError from Features import _FEATURES from interfaces import IWebtopItem, IUserManager, IStatistics from zope.component import getUtility, getAdapter class Cruft: """Class containing miscellaneous methods that are historical baggage, but possible be gotten rid of, or at least implemented differently or somewhere else.""" security = ClassSecurityInfo() security.declarePublic('find_URL_of_fle_root') def find_URL_of_fle_root(self, REQUEST): """Return URL of our FLE installation.""" import FLE from string import rfind url = REQUEST.URL1 obj = self while not isinstance(obj, FLE.FLE): obj = obj.parent() url = url[:rfind(url,'/')] return url def find_URL_of_user_info(self, REQUEST): """Return URL of UserInfo.""" import UserInfo from string import rfind url = REQUEST.URL1 obj = self while not isinstance(obj, UserInfo.UserInfo): obj = obj.parent() url = url[:rfind(url,'/')] return url security.declarePublic('find_URL_of_webtop') def find_URL_of_webtop(self, REQUEST): """Return URL of Webtop.""" import Webtop from string import rfind url = REQUEST.URL1 obj = self while not isinstance(obj, Webtop.Webtop): obj = obj.parent() url = url[:rfind(url,'/')] return url security.declarePublic('find_URL_of_group_folder') def find_URL_of_group_folder(self, REQUEST): """Return URL of toplevel group folder or group folder proxy.""" import GroupFolder from string import rfind url = REQUEST.URL1 obj = self while 1: if isinstance(obj, GroupFolder.GroupFolder): return url obj = obj.parent() url = url[:rfind(url,'/')] return url security.declarePublic('find_URL_of_thread_start_node') def find_URL_of_thread_start_node(self, REQUEST): """Return URL of starting note.""" import CourseContext from string import rfind url = REQUEST.URL1 obj = self while not isinstance(obj.parent(), CourseContext.CourseContext): obj = obj.parent() url = url[:rfind(url,'/')] return url security.declarePublic('find_URL_of_course') def find_URL_of_course(self, REQUEST): """Return URL of starting note.""" obj = self while not obj.meta_type == 'Course': obj = obj.parent() return obj.absolute_url() security.declarePublic('find_URL_of_course_context') def find_URL_of_course_context(self, REQUEST): """Return URL of starting note.""" import CourseContext from string import rfind url = REQUEST.URL1 obj = self while not isinstance(obj, CourseContext.CourseContext): obj = obj.parent() url = url[:rfind(url,'/')] return url security.declarePublic('is_student') def is_student(self, REQUEST,nr=None, uname=''): """ is give user is student in given course """ splitter = '/' if splitter not in REQUEST.PATH_TRANSLATED: splitter = '\\' if nr is None: nr = self.jooksva_kursuse_nr(REQUEST,uname) ic = string.rfind(REQUEST.PATH_TRANSLATED, 'fle_users') url = REQUEST.PATH_TRANSLATED if ic == -1: nr = self.fle_root().courses.get_course_id_from_req(REQUEST) else: i = ic plst = filter(lambda x:x, string.split(url[i:], splitter)) nr = plst[3][1:] if not hasattr(self.fle_root().courses, str(nr)): return 0 checkname = str(REQUEST.AUTHENTICATED_USER) if 'Student' in get_local_roles(getattr(self.fle_root().courses, nr),checkname): return 1 try: fm = getattr(self.fle_root().fle_users, checkname) if fm.has_any_role(('IVAAdmin','Manager')): return 1 except: return 0 return 0 security.declarePublic('kas_opetaja') def kas_opetaja(self,REQUEST,nr=None,uname=''): """ If user is teacher. Additional role checks """ if nr is None: nr = self.jooksva_kursuse_nr(REQUEST,uname) if not hasattr(self.fle_root().courses, str(nr)): return 0 if uname: pass #checkname = uname else: pass #checkname = str(REQUEST.AUTHENTICATED_USER) checkname = str(getSecurityManager().getUser()) if checkname in getattr(self.fle_root().courses, nr).get_teachers(): return 1 #try: #XXX: Should we put try except here? if 1==1: fm = getattr(self.fle_root().fle_users, checkname) #XXX: this sucks? if fm.has_any_role(('IVAAdmin','Manager')): return 1 return 0 security.declarePublic('jooksva_kursuse_nimi') def jooksva_kursuse_nimi(self,REQUEST,uname=''): """ jooksva kursuse nimi """ nr = str(self.jooksva_kursuse_nr(REQUEST,uname)) try: return str(getattr(self.fle_root().courses, nr).get_name()) except UnicodeEncodeError: return str(getattr(self.fle_root().courses, nr).get_name().encode('utf-8')) except AttributeError: return translate(self,"course not selected", target=self.giveLanguage(REQUEST)) try: return str(getattr(self.fle_root().courses, nr).get_name()) except: return translate(self,"course not selected", target=self.giveLanguage(REQUEST)) security.declarePublic('jooksva_kursuse_nr') def jooksva_kursuse_nr(self,REQUEST,uname=''): """ jooksva kursuse number """; try: if uname: res = str(getattr(self.fle_root().fle_users, uname).get_jooksev_kursus()) return res else: return str(getattr(self.fle_root().fle_users, self.get_current_user(REQUEST)).get_jooksev_kursus()) except: return 0 security.declarePublic('get_current_user') def get_current_user(self, REQUEST): """Extract current user's name from the REQUEST object.""" return str(REQUEST.AUTHENTICATED_USER) security.declarePublic('urlquote') def urlquote(self, text): """urlquote given text.""" return quote(text) def get_url_to_object(self,obj): # Make a list of parents without the Application object # (from obj to FLE) return obj.absolute_url()+'/' #objs = obj.aq_inner.aq_chain[:-1] #objs.reverse() #path='/'.join([x.getId() for x in objs])+'/' #return path def get_object_of_url(self,url,base,remove_last=1): #XXX: broken for VirtualHostMonster! path=url.split('/') if remove_last: path.pop() # remove last entry path.reverse() if path[-1]=='http:': # remove protocol path.pop() path.pop() path.pop() # remove first (root) entry obj = base.fle_root() #obj = base.getPhysicalRoot() try: while path: oname=path.pop() obj=getattr(obj,oname).__of__(obj) return obj except: #raise 'Internal link error, object not found',str(obj)+' - '+oname return None def kasutajaURList(self, REQUEST): "Kasutajanimi" nimi=self.kasutajaNimi(REQUEST) if nimi=="": return nimi if nimi==str(REQUEST.AUTHENTICATED_USER): return "" return "> " + ""+nimi+"" def algusURList(self, REQUEST, osa): "URL soovitud loiguni" x=REQUEST.URL0 if x.find(osa)==-1: return "" return x[:x.find(osa)+len(osa)] def kasLopeb(self, tekst, otsitav): "jah/ei" # return tekst+" ja " + otsitav return tekst.endswith(otsitav) def myOwnWebtop(self, REQUEST): "jah/ei" return self.kasutajaNimi(REQUEST)==str(REQUEST.AUTHENTICATED_USER) def kasutajaNimi(self, REQUEST): "nimi" aadress=REQUEST.URL0 nr=aadress.find('fle_users') if nr==-1: return "" ots=aadress.find('/', nr+10) nimi=aadress[nr+10:ots] nimi = re.sub('%20', ' ',nimi) return nimi def ajaMuutmisElemendid(self, elemendinimi, aeg=-1): "Valikute kogum" import YlTest from YlTest import LahendusLuba from types import StringType abi=LahendusLuba() if type(aeg)==StringType: aeg=float(aeg) if aeg==-1:aeg=time.time() m=time.localtime(aeg) tulemus=abi.looValik(elemendinimi+"Aasta", m[0]-5, m[0]+10, m[0]) tulemus=tulemus+"-"+\ abi.looValik(elemendinimi+"Kuu", 1, 12, m[1]) tulemus=tulemus+"-"+\ abi.looValik(elemendinimi+"Paev", 1, 31, m[2]) tulemus=tulemus+"   "+\ abi.looValik(elemendinimi+"Tund", 0, 23, m[3]) tulemus=tulemus+":"+\ abi.looValik(elemendinimi+"Minut", 0, 59, m[4], samm=5) return tulemus def set_undefined_page(self, message, REQUEST): """Set REQUEST.RESPONSE.redirect, so that we go to undefined page and give an explanation.""" message = common.quote_html_hack(message) REQUEST.RESPONSE.redirect("undefined_page?explanation=%s" % message) security.declarePublic('to_str_list') # Using eval would be simpler and more general, but # we want to be paranoid. def to_str_list(self, s): """Reverse str representation of list of strings back to list.""" return [x.strip() for x in s[1:-1].replace("'", "").split(',')] def kasutajaasukoht(self, REQUEST,url=''): "Kasutaja asukoht" if url: x = url else: x = REQUEST.URL0 if x.find('manage')>0: return "Management" if x.find('sharedDocs')>0: return "Organizer" if x.find('tracker_show')>0: return "Bookshelf" if x.find('tracker')>0: return "Management" if x.find('WelCome')>0: return "Bookshelf" #Course ZWiki if x.find('assignmentGrading')>0: return "Webtop" if x.find('feedmoot')>0: return "Organizer" if x.find('createMail')>0 or x.find('edit_user_form')>0 or x.find('show_user_info')>0: return 'Organizer' if x.find('kalendriAasta')>0 or x.find('organizer')>0 or x.find('logout')>0: return 'Organizer' if x.find('course_info_user')>0: return "Webtop" if x.find('course_index')>0 or x.find('course_info')>0: return 'Bookshelf' if x.find('tootubade_leht')>0 or x.find('workshop_')>0: return "Workshops" if x.find('/courses/')>0: if x.find('/testihaldus')>0 or x.find('/courses/index_html')>0 or self.kas_haldus(REQUEST,x): return "Management" if not x.find('/gf/')>0 and x.find('/testid/')>0: return "Management" if x.find('import_form_handler2')>0: return "Management" if not x.find('/gf/')>0: return "Workshops" if x.find('/fle_users/')>0 and x.find('/webtop/')>0: return "Webtop" if self.kas_haldus(REQUEST, x) or x.find('ootajateLoetelu')>0: return 'Management' else: return 'Bookshelf' def sisegrupiNimi(self, REQUEST): "URList loetud grupi nimi" #XXX: remove x=REQUEST.URL0 if x.find('/subgroups/')>0: algus=x.find('/subgroups/') ots=x.find('/', algus+11) if ots==-1: tulemus=x[algus+11:] else: tulemus=x[algus+11:ots] if tulemus == 'wt_index_html': return "listing" return tulemus else: return "" security.declarePublic('kas_haldus') def kas_haldus(self, REQUEST, veebiurl): "kas sobib halduseks" if re.match(".*/courses/[0-9]+/syndmused", veebiurl): if self.kas_opetaja(self, REQUEST): return 1 else: return 0 laused=( ".*haldusleht", ".*testihaldus", ".*muuda_vorm", ".*kysimuse_vorm", ".*add_course_form", ".*sisegrupihaldus", ".*courses/index_html", ".*ylesandedMuutmisTabelina", ".*ylesanneteTyypideLoetelu", ".*pealkirjaMuutmisVormHTML", ".*muutmisVorm", ".*lubadeViited", ".*koigiLuba", ".*isikuLubadeLisamisVorm", ".*grupiLubadeLisamisVorm", ".*/filtreerimisvorm2", ".*/kasutajateTulemused", ".*/typesets", ".*/lahendusteStatistika", ".*/kodutood.*", ".*/manage_.*", ".*qtCatalog.*", #".*", ) leitud=0 x=veebiurl for lause in laused: if re.match(lause, x): leitud=1 return leitud security.declarePublic('kas_kasutaja_haldus') def kas_kasutaja_haldus(self, veebiurl): "kas sobib halduseks" laused=( ".*kasutajate_haldamise_vorm", ".*kasutajateLisamisFailiVorm", ".*kasutajaLehelt1", ".*kasutajadFailist2", ".*kasutajadFailist1", ".*erialadeHalduseVorm", ".*/kasutajaotsing", ".*/kasutajateKustutus.*", #".*", ) leitud=0 x=veebiurl for lause in laused: if re.match(lause, x): leitud=1 return leitud security.declareProtected(perm_view, 'search_users') #XXX: is tthis right? def search_users(self, first_name='', last_name='', uname='', organization='', exclude=[], sort_on='get_last_name'): """ search users from zcatalog. exclude - excludes usernames, must be a list of username """ if organization == '-1': organization='' elif not organization: organization='' else: organization = self.fle_users.getAffiliation(int(organization)) query = { 'get_first_name':first_name, 'get_last_name':last_name, 'get_uname':uname, 'get_organization':organization, } results = self.fle_users.userinfo_zcatalog(query) tmp = [] if exclude: for x in results: if x.get_uname in exclude: continue tmp.append(x) else: tmp = results return tmp def zwiki_export(self,REQUEST,level,wiki_id): """ zwiki manage form handler """ from ExportIMS import Kirjutus from ExportIMS import kirjutaYldM import tempfile, os failinimi = tempfile.mktemp() if not level: level=99999 # number = self.get_course_id_from_req(REQUEST) number = 0 # nr = self.get_id() # try: # number = int(nr) # except: # number = int(nr[1:]) yld = kirjutaYldM() yld.kirjutaYldmanifest(number,'WIKI',base=str(number)+'/') yld.lisaFaili(failinimi) k = Kirjutus(self.fle_root(), number) k.looKursuseFail(0) wikikoht='' try: wikikoht = getattr(self,wiki_id) except: pass if wikikoht: k.kirjutaWiki(wikikoht,'ZWiki',wikikoht.aq_inner.get_name(),wikikoht.wikiKaustEsimeneLeht(),palju=level) # else: # k.kirjutaKasutajaWiki((self.get_user_info(str(REQUEST.AUTHENTICATED_USER)),)) k.pakiKursusFaili(failinimi) file = open(failinimi,"rb") export_data=file.read() file.close() os.remove(failinimi) REQUEST.RESPONSE.setHeader('Content-disposition','attachment; filename=zwikiexport.zip') REQUEST.RESPONSE.setHeader('content-type','application/zip') return export_data security.declareProtected(perm_view, 'saadaEmail') def saadaEmail(self,tomail,subject,message,saatja=''): """ proovime saata kirja """ p_saatja = '' if saatja: p_saatja = saatja if hasattr(self,'MFROM') and not p_saatja: p_saatja = self.MFROM if not p_saatja: return 0 try: mailhost = self.MailHost #message = "Content-Type:text/plain; charset=utf-8\n\n\n"+message #message = message+"\n\n\nThis message was sent to you in IVA and it was forwared to your email as you requested." #mailhost.secureSend(message,tomail,p_saatja,subject,encode='quoted-printable') mailhost.secureSend(message,tomail,p_saatja,subject,charset='utf-8') return 1 except: return 0 def leivaPuru(self,REQUEST): """ leivapuru """ #XXX:maybe we don't need this! if REQUEST.URL0.find('kirjad')>0: return REQUEST['L_ic_mailbox'] if self.kasLopeb(REQUEST.URL1, 'webtop/c'+str(self.jooksva_kursuse_nr(self, REQUEST))): return 'portfolio' else: return ""+'portfolio'+"" return "kala" def numberToText(self, REQUEST, arv): "Asendab punkti komaga, kui see vajalik" tekst=str(arv) eraldaja=translate(self, "decimal_separator", target=self.giveLanguage(REQUEST)) tekst=tekst.replace(".", eraldaja) return tekst security.declarePublic('giveLanguage') def giveLanguage(self, REQUEST): """ gives user language """ prev_lang = REQUEST.HTTP_ACCEPT_LANGUAGE isAnon = 0 try: if getattr(self.fle_root().fle_users, str(REQUEST.AUTHENTICATED_USER)).kasKeelBrauserist(): language = prev_lang else: language = getattr(self.fle_root().fle_users, str(REQUEST.AUTHENTICATED_USER)).get_language() except AttributeError: language = prev_lang isAnon = 1 #XXX: let's see if this works.. if language == 'et_ee': language = 'et' if language not in self.get_languages() and not isAnon: language = prev_lang language = self.default_language return language def firstAndLast(self,uname='',req=None): """ Give object author's first and last name """ if hasattr(uname, 'im_func'): uname = uname() abc = '' if not uname and not req: abc = self.getOwnerTuple()[1] if not uname and not abc and req: abc = str(req.AUTHENTICATED_USER) elif not uname and not abc and not req: abc = uname elif uname: abc = uname try: ui = self.fle_users.get_user_info(str(abc)) if ui and ui.get_first_name()!='' and ui.get_last_name()!='': nimi = ui.get_first_name()+" "+ui.get_last_name() else: nimi = abc except FleError: nimi = uname return nimi def firstAndLastNG(self, uname='', req=None, last_first = False): """Give object author's first and last name """ if hasattr(uname, 'im_func'): uname = uname() abc = '' if not uname and not req: abc = self.getOwnerTuple()[1] if not uname and not abc and req: abc = str(req.AUTHENTICATED_USER) elif not uname and not abc and not req: abc = uname elif uname: abc = uname # first first and last name nimi = str(abc) cat = getUtility(IUserManager).userinfo_zcatalog res = cat({'get_uname': str(abc)}) if len(res) == 1: if res[0].get_last_name.strip() == "" or res[0].get_first_name.strip() == "": return nimi if last_first: return res[0].get_last_name + ", " + res[0].get_first_name return res[0].get_first_name+" "+res[0].get_last_name else: for x in res: if x.get_uname == str(abc): if x.get_last_name.strip() == "" or x.get_first_name.strip() == "": return nimi if last_first: return x.get_last_name + ", " + x.get_first_name return x.get_first_name+" "+x.get_last_name return nimi def lastAndFirst(self,uname='',req=None): """ Give object author's first and last name """ return self.firstAndLastNG(uname, req, last_first = True) def is_courseContext(self): return 0 def getCalendar(self): """ return closest calendar """ return getattr(self, 'syndmused', None) def batch_previous(self, batch): """ generate links to next and previous pages """ result = [] while batch.previous: batch = batch.previous result.append(batch.start-1) result.reverse() return result def batch_next(self, batch): """ return an array of next batch staring points """ result = [] while batch.next: batch = batch.next result.append(batch.start-1) return result security.declareProtected(perm_manage, 'migratePersonalEvents') def migratePersonalEvents(self): """ from 0.7 to 0.8 disables creation of personal event method will go through all user's personal 'syndmused' folder and remove 'Add FLE LO' permission from everybody 'Add FLE LO' permission will be given to 'User' and 'Owner' on 'subgroups' folder. """ a = self.courses.courses_zcatalog.addColumn('getLogoThumbnailURL') for c in self.courses.get_courses(): g = c.getObject() subgroup = getattr(g.aq_inner, 'subgroups', None) if subgroup is None: continue subgroup.manage_permission('Add FLE LO', ('Owner', 'User'), 1) for u in self.fle_users.get_users(): s = getattr(u.aq_inner, 'syndmused', None) if s is None: continue s.manage_permission('Add FLE LO', (), 0) return "Removed 'Add FLE LO' permission from users. Added 'Add FLE LO' permission on subgroups. Added thumbnail url metadata to course_zcatalog." security.declareProtected(perm_manage, 'migrateTo072') def migrateTo072(self): """ migrates IVA to version 0.7.2 """ um = self.fle_users umz = um.userinfo_zcatalog umz.delIndex('get_organization') umz.addIndex('get_organization', 'FieldIndex') return "done" security.declareProtected(perm_view, 'format_date') def format_date(self, REQUEST, timestamp): """ format timestamp """ return time.strftime(translate(self,'short_date_format',default="%Y-%m-%d",target=self.giveLanguage(REQUEST)), time.localtime(timestamp)) security.declareProtected(perm_manage, 'getFeatures') def getFeatures(self, installed=1, not_installed=0): """ get all (not)installed features """ res = [] for x in _FEATURES.keys(): adapt = getAdapter(self, _FEATURES.get(x).get('interface')) if adapt.isInstalled(): if installed: res.append(x) else: if not_installed: res.append(x) return res def getFeatureDescription(self, feature, spec): """ return feature description """ return _FEATURES[feature][spec] security.declareProtected(perm_manage, 'enableFeature') def enableFeature(self, REQUEST=None, feature='_x'): """ enable/install module - usually it means setting up skins and possibly some objects """ if not _FEATURES.has_key(feature): raise 'Not available' adapt = getAdapter(self, _FEATURES.get(feature).get('interface')) adapt._install() if REQUEST: return REQUEST.RESPONSE.redirect('manage_iva_setup') return 0 security.declareProtected(perm_manage, 'disableFeature') def disableFeature(self, REQUEST=None, feature='_x'): """ disable/uninstall feature XXX: should add support for uninstall method? """ if not _FEATURES.has_key(feature): raise 'Not available' adapt = getAdapter(self, _FEATURES.get(feature).get('interface')) adapt._uninstall() if REQUEST: return REQUEST.RESPONSE.redirect('manage_iva_setup') return 0 def isFeatureEnabled(self, feature): """ is feature enabled/installed """ feat = _FEATURES.get(feature, None) if feat: adapt = getAdapter(self.fle_root(), _FEATURES.get(feature).get('interface')) if adapt.isInstalled(): return True def isFeatureVisible(self, feature): """ should feature be visible to users or not """ if not self.isFeatureEnabled(feature): return False adapt = getAdapter(self.fle_root(), _FEATURES.get(feature).get('interface')) return adapt.isVisible() security.declareProtected(perm_manage, 'featureManagement') def featureManagement(self, REQUEST, feature): """ return feature management page """ if not _FEATURES.has_key(feature): raise 'Not available' return REQUEST.RESPONSE.redirect('@@manage-'+feature) def DictPrinter(self, d): """ generated html from dictionary """ r = '
' for k, v in d.items(): r += '
%s: %s
' % (translate(self, k, target=self.giveLanguage(self.REQUEST)), v) r += '
' return r def isAnonymousUser(self): """ check if user is anonymous """ from Products.CMFCore.utils import _getAuthenticatedUser u = _getAuthenticatedUser(self) if u is None or u.getUserName() == 'Anonymous User': return 1 return 0 security.declareProtected(perm_manage, 'migrateTo13_step2') def migrateTo13_step2(self): """ stuff to do when upgrading to IVA 1.3 """ self.Statistics._makeSQLMethods() from config import IVA_VERSION self.IVA_VERSION = IVA_VERSION return "done" security.declareProtected(perm_manage, 'migrateTo13_step4') def migrateTo13_step4(self): """ stuff to do when upgrading to IVA 1.3 """ import MySQLdb import cPickle import os, Globals conf = self.getSQLSettings() table_prefix = conf.get('table_prefix') db = conf.get('database', None) host = conf.get('hostname', None) user = conf.get('username', None) passwd = conf.get('password', None) data_dir = os.path.join(Globals.INSTANCE_HOME,'var','iva_stat') path = '_'.join(self.fle_root().getPhysicalPath()[1:]).lower()+'_' if not db or not host: return "Invalid MySQL connection settings. Please revise your MySQL settings." db_conn = MySQLdb.connect(host=host, user=user, passwd=passwd, db=db) cursor = db_conn.cursor() #uc_stat -> iva_course_user #f = open('ivas_htkiva_iva_uc_stat.dat', 'r') fname = os.path.join(data_dir, path+'uc_stat.dat') f = open(fname, 'r') uc_stat = cPickle.load(f) f.close() for uname in uc_stat.keys(): for course in uc_stat[uname].keys(): vals = uc_stat[uname][course] lastAccess = str(time.strftime('%Y-%m-%d %H:%M', time.localtime(float(vals['lastAccess'])))) try: visit_kb = str(time.strftime('%Y-%m-%d %H:%M', time.localtime(float(vals['visit_knowledgebuilding'])))) except KeyError: visit_kb = 0 try: visit_jm = str(time.strftime('%Y-%m-%d %H:%M', time.localtime(float(vals['visit_jamming'])))) except KeyError: visit_jm = 0 cursor.execute("""insert into %s_course_user values ('%s', '%s', %s, '%s', %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, '%s', '%s') """ % (table_prefix, uname, str(course), vals['openPages'], lastAccess, vals['notesRead'], vals['postedNotes'], vals['postedArtefacts'], vals['postedArtefactAnnos'], vals['uploadedFiles'], vals['createdWikis'], vals['memosAdded'], vals['foldersOpened'], vals['testsSolved'], vals['numTimesEntered'], vals['linksAdded'], visit_kb, visit_jm, ) ) # c_stat -> iva_course_updates fname = os.path.join(data_dir, path+'c_stat.dat') f = open(fname, 'r') #f = open('ivas_htkiva_iva_c_stat.dat', 'r') c_stat = cPickle.load(f) f.close() for course in c_stat.keys(): vals = c_stat[course] try: mod_bs = str(time.strftime('%Y-%m-%d %H:%M', time.localtime(float(vals['modBookshelf'])))) except KeyError: mod_bs = '0000-00-00 00:00:00' try: mod_kb = str(time.strftime('%Y-%m-%d %H:%M', time.localtime(float(vals['modKnowledge'])))) except KeyError: mod_kb = '0000-00-00 00:00:00' try: mod_jm = str(time.strftime('%Y-%m-%d %H:%M', time.localtime(float(vals['modJamming'])))) except KeyError: mod_jm = '0000-00-00 00:00:00' cursor.execute("""insert into %s_course_updates values ('%s', 0,0,0,0,0) on duplicate key update course_id=course_id """ % (table_prefix, str(course))) q = """update %s_course_updates set modified_jamming='%s' where course_id=%s""" % ( table_prefix, mod_jm, str(course) ) cursor.execute(q) q = """update %s_course_updates set modified_knowledgebuilding='%s' where course_id=%s""" % ( table_prefix, mod_kb, str(course) ) cursor.execute(q) q = """update %s_course_updates set modified_bookshelf='%s' where course_id=%s""" % ( table_prefix, mod_bs, str(course) ) cursor.execute(q) # ug_stat -> iva_users fname = os.path.join(data_dir, path+'ug_stat.dat') f = open(fname, 'r') #f = open('ivas_htkiva_iva_ug_stat.dat', 'r') c_stat = cPickle.load(f) f.close() for uname in c_stat.keys(): try: tim = c_stat[uname]['lastAccessGlobal'] except: tim = '0000-00-00 00:00:00' lastAccess = str(time.strftime('%Y-%m-%d %H:%M', time.localtime(float(tim)))) p_open = c_stat[uname]['pagesOpenTotal'] cursor.execute("""insert into %s_users values ('%s', %s, '%s') on duplicate key update uname=uname""" % (table_prefix, uname, p_open, lastAccess)) # o_stat -> obj_history fname = os.path.join(data_dir, path+'o_stat.dat') f = open(fname, 'r') #f = open('ivas_htkiva_iva_o_stat.dat', 'r') o_stat = cPickle.load(f) f.close() for url in o_stat.keys(): for uname in o_stat[url].keys(): times = o_stat[url][uname]['times'] for t in o_stat[url][uname]['times']: acc_time = str(time.strftime('%Y-%m-%d %H:%M', time.localtime(float(t)))) cursor.execute("""insert into %s_obj_history (url, uname, count, time) values ( '%s', '%s', 1, '%s')""" % (table_prefix, url, uname, acc_time)) return "done" security.declareProtected(perm_manage, 'migrateTo13_step5') def migrateTo13_step5(self): """ set correct ZWiki editing permissions so that subgroup members can collaboratively work on one wiki """ self.courses._setupZWikipermissions() return "done" security.declareProtected(perm_manage, 'migrateTo13_step1') def migrateTo13_step1(self): """ stuff to do when upgrading to IVA 1.3 Give 'Change permissions' permission to Owner atleast on Blog folder. add fulltext index to blogs zcatalog and catalog_webtop_items """ for u in self.fle_users.objectValues('UserInfo'): b = u.Blog b.manage_permission('Change permissions', ('Owner',), 1) for c in self.courses.objectValues('Course'): b = c.Blog b.manage_permission('Change permissions', ('Teacher','IVAAdmin'), 1) from Products.ZCatalog.Catalog import CatalogError from OFS.ObjectManager import BadRequestException bz = self.blogs_zcatalog class largs: def __init__(self, **kw): self.__dict__.update(kw) try: bz.manage_addProduct['ZCTextIndex'].manage_addLexicon('lexicon', elements=[ largs(group= 'Case Normalizer' , name= 'Case Normalizer' ), largs(group= 'Stop Words', name= " Don't remove stop words" ), largs(group= 'Word Splitter' , name= "IVA Unicode Whitespace splitter" ), ] ) except BadRequestException: #id already in use pass class attrs: doc_attr = 'getTitle, getContent, getIntro' lexicon_id = 'lexicon' index_type = 'Cosine Measure' try: bz.addIndex('fulltext', 'ZCTextIndex', attrs()) bz.manage_reindexIndex(('fulltext',)) except CatalogError: pass try: bz.addColumn('absolute_url') except CatalogError: pass try: bz.addColumn('get_author') except CatalogError: pass try: bz.addColumn('get_timestamp') except CatalogError: pass wtz = self.catalog_webtop_items try: wtz.manage_addProduct['ZCTextIndex'].manage_addLexicon('lexicon', elements=[ largs(group= 'Case Normalizer' , name= 'Case Normalizer' ), largs(group= 'Stop Words', name= " Don't remove stop words" ), largs(group= 'Word Splitter' , name= "IVA Unicode Whitespace splitter" ), ] ) except BadRequestException: #id already in use pass wtz_extra = attrs() wtz_extra.doc_attr = 'get_name' try: wtz.addIndex('fulltext', 'ZCTextIndex', wtz_extra) #wtz.manage_reindexIndex(('fulltext',)) except CatalogError: pass try: wtz.addIndex('meta_type', 'KeywordIndex') except CatalogError: pass try: wtz.addIndex('getRelativeContentURL', 'PathIndex') except CatalogError: pass try: wtz.addColumn('absolute_url') except CatalogError: pass try: wtz.addColumn('getRelativeContentURL') except CatalogError: pass bz.refreshCatalog(clear=1) wtz.refreshCatalog(clear=1) from config import IVA_VERSION self.IVA_VERSION = IVA_VERSION return "permissions changed, indexes added" def migrateTo14_step1(self): """ register components""" from Products.MailHost.interfaces import IMailHost from interfaces import IFLE, ICourseManager from Products.Five.component import enableSite enableSite(self.fle_root()) sm = self.getSiteManager() sm.registerUtility(self.Statistics, IStatistics) sm.registerUtility(self, IFLE) sm.registerUtility(self.MailHost, IMailHost) sm.registerUtility(self.fle_users, IUserManager) sm.registerUtility(self.courses, ICourseManager) return "Step 1 completed." security.declareProtected(perm_manage, 'migrateTo14_step4') def migrateTo14_step2(self): """ migrate to version 1.4. Step 2 """ s = getUtility(IStatistics) dbpref = self.getSQLSettings().get('table_prefix') s._makeSQLMethods(prefix=dbpref) return "Step 2 completed." def migrateTo14_step3(self): """ migrate to version 1.4. Step 3 """ for ui in self.fle_users.objectValues('UserInfo'): wt = getattr(ui, 'webtop') try: wt._delObject('trash') except AttributeError: pass # TODO: del UserInfo.user_courselist_cache return "Step 3 completed." security.declareProtected(perm_manage, 'migrateTo14_step1') def migrateTo14_step4(self): """ migrate to version 1.4. Step 4 """ from Products.ZCatalog.Catalog import CatalogError from Products.ZCTextIndex.ZCTextIndex import manage_addLexicon class largs: def __init__(self, **kw): self.__dict__.update(kw) class attrs: doc_attr = 'getTitle, getContent, getIntro' lexicon_id = 'lexicon' index_type = 'Cosine Measure' # catalog_webtop_items catalog = self.catalog_webtop_items # del lexicon and fulltext index try: catalog._delObject('lexicon') except AttributeError: pass try: catalog.delIndex('fulltext') except CatalogError: pass try: catalog.delIndex('getRelativeContentURL') except CatalogError: pass # add lexicon and fulltext index manage_addLexicon(catalog, 'lexicon', elements=[ largs(group= 'Case Normalizer' , name= 'Case Normalizer' ), largs(group= 'Stop Words', name= " Don't remove stop words" ), largs(group= 'Word Splitter' , name= "IVA Unicode Whitespace splitter" ), ] ) params = attrs() params.doc_attr = 'get_name' try: catalog.addIndex('fulltext', 'ZCTextIndex', params) except CatalogError: pass try: catalog.addIndex('allowedRolesAndUsers', 'KeywordIndex') except CatalogError: pass try: catalog.addIndex('getObjPermission', 'FieldIndex') except CatalogError: pass try: catalog.addIndex('getWeight', 'FieldIndex') except CatalogError: pass try: catalog.addIndex('get_size', 'FieldIndex') except CatalogError: pass try: catalog.addIndex('get_timestamp', 'FieldIndex') except CatalogError: pass try: catalog.addIndex('path', 'IVAPathIndexNG') except CatalogError: pass class DRattrs: since_field='getStartTimeIndex' until_field='getEndTimeIndex' try: catalog.addIndex('visible', 'DateRangeIndex', DRattrs()) except CatalogError: pass # webtop_catalog metadata try: catalog.addColumn('getWeight') except CatalogError: pass try: catalog.addColumn('get_icon_path') except CatalogError: pass try: catalog.addColumn('get_id') except CatalogError: pass try: catalog.addColumn('get_size') except CatalogError: pass try: catalog.addColumn('kasWikiKaust') except CatalogError: pass try: catalog.addColumn('lastModification') except CatalogError: pass try: catalog.addColumn('meta_type') except CatalogError: pass try: catalog.addColumn('getTitle') except CatalogError: pass try: catalog.addColumn('get_timestamp') except CatalogError: pass # delete get_name # add get_name as fieldindex try: catalog.delIndex('get_name') except CatalogError: pass try: catalog.addIndex('get_name', 'FieldIndex') except CatalogError: pass # courses zcatalog catalog = self.courses.courses_zcatalog try: catalog.delIndex('get_all_users_id') except CatalogError: pass try: catalog.addIndex('get_all_users_id', 'KeywordIndex') except CatalogError: pass try: catalog.addIndex('isLocked', 'FieldIndex') except CatalogError: pass try: catalog.addColumn('isLocked') except CatalogError: pass try: catalog.addColumn('getSubgroupList') except CatalogError: pass try: catalog.addColumn('get_all_users_id') except CatalogError: pass # userinfo zcatalog um = self.fle_users umz = um.userinfo_zcatalog try: umz.addColumn('get_photo_tag') except CatalogError: pass # extra.indexed_attrs: get_uname try: umz.addIndex('userid', 'FieldIndex', extra = largs(indexed_attrs = 'get_uname')) except CatalogError: pass try: umz.addIndex('has_photo', 'FieldIndex') except CatalogError: pass # blogs catalog catalog = self.blogs_zcatalog catalog._delObject('lexicon') try: catalog.delIndex('fulltext') except CatalogError: pass manage_addLexicon(catalog, 'lexicon', elements=[ largs(group= 'Case Normalizer' , name= 'Case Normalizer' ), largs(group= 'Stop Words', name= " Don't remove stop words" ), largs(group= 'Word Splitter' , name= "IVA Unicode Whitespace splitter" ), ] ) try: catalog.addIndex('fulltext', 'ZCTextIndex', attrs()) except CatalogError: pass catalog.manage_reindexIndex(ids=['fulltext',]) # events zcatalog catalog = self.catalog_events try: catalog.addColumn('meta_type') except CatalogError: pass try: catalog.delIndex('path') except CatalogError: pass try: catalog.addIndex('path', 'IVAPathIndexNG') except CatalogError: pass catalog.manage_reindexIndex(ids=['path',]) from config import IVA_VERSION self.IVA_VERSION = IVA_VERSION return "Step 4 completed." security.declareProtected(perm_manage, 'migrateTo14_step5_1') def migrateTo14_step5_1(self): """ migrate to version 1.4. Step 5 """ cat = getattr(self, 'catalog_webtop_items') cat.manage_catalogClear() catalog = self.courses.courses_zcatalog #catalog.manage_reindexIndex(ids=['get_all_users_id']) catalog.refreshCatalog(clear=1) # refresh userinfo_catalog um = self.fle_users umz = um.userinfo_zcatalog umz.refreshCatalog(clear=1) return "Step 5.1 completed." security.declareProtected(perm_manage, 'migrateTo14_step5_2') def migrateTo14_step5_2(self): """ migrate to version 1.4. Step 5 """ print "TODO: courses:", len(self.courses.objectValues('Course')) #import transaction count = 1 for c in self.courses.objectValues('Course'): print count, c self._step5_walk(c) #if count in range(0, 1000, 150): # print "commit" # transaction.commit() count = count + 1 return "Step 5.2 completed." security.declareProtected(perm_manage, 'migrateTo14_step5_3') def migrateTo14_step5_3(self): """ migrate to version 1.4. Step 5 """ print "TODO: users:", len(self.fle_users.objectValues('UserInfo')) #import transaction count = 1 for c in self.fle_users.objectValues('UserInfo'): print count, c self._step5_walk(c.webtop) #if count in range(0, 8000, 1000): # print "commit" # transaction.commit() count = count + 1 return "Step 5.3 completed." security.declareProtected(perm_manage, 'migrateTo14_step6') def migrateTo14_step6(self): """ fix CourseContext permissions """ for c in self.courses.objectValues('Course'): for cc in c.objectValues('CourseContext'): cc.manage_permission('View', tuple(), 1) return "Step 6 done" security.declareProtected(perm_manage, 'migrateTo14_step7') def migrateTo14_step7(self): """ fix permissions """ for f in self.fle_users.objectValues('UserInfo'): w = f.webtop self._step7_walk(w) return "Step 7 done" def _step7_walk(self, obj): for x in obj.objectValues(): x.manage_permission('View', tuple(), 1) self._step7_walk(x) def _step5_walk(self, obj): """ step 5 walker """ for x in obj.objectValues(): if IWebtopItem.providedBy(x): try: #print "INDEXING:", x.absolute_url() x.index_object() except TypeError: print "TypeError" self._step5_walk(x) # EOF