# -*- coding: utf-8 # $Id$ # # Copyright 2001, 2002 by IVA Team and contributors # # This file is part of IVA. # # IVA is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # IVA is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with IVA; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA """Contains class Course, which represents one course, which has one or more CourseContexts, which in turn contain knowledge building conversations.""" __version__ = '$Revision$'[11:-2] import time, strptime import string, types import OFS, Globals from Globals import Persistent, Acquisition import AccessControl from AccessControl import ClassSecurityInfo, getSecurityManager import copy import re import Kodutoo import Blog try: from Products.ZWiki.ZWikiPage import ZWikiPage USE_ZWIKI = 1 except: USE_ZWIKI = 0 from Jamming import Jamming from TraversableWrapper import Traversable from common import intersect_bool, make_action, get_roles, get_local_roles, translate from input_checks import strip_all, is_valid_title from CourseContext import CourseContext #from ThinkingTypeSetManager import ThinkingTypeSetManager as TTSM from Thread import Thread from Cruft import Cruft from TempObjectManager import TempObjectManager from input_checks import render, normal_entry_tags_and_link from common import perm_view, perm_edit, perm_manage, perm_add_lo from Products.ZCatalog.CatalogPathAwareness import CatalogAware import Subgroups import Kalender import WordMap from QuizManager import QuizManager from zope.interface import implements from interfaces import ICourse, IStatistics, IUserManager, IFLE from zope.component import adapter, getUtility, queryUtility from zope.app.container.interfaces import IObjectAddedEvent from OFS.interfaces import IObjectWillBeRemovedEvent from Shared.DC.ZRDB.DA import DatabaseError import logging logger = logging.getLogger('IVA') # Each Course object contains (usually) one or more CourseContexts, which # represent different aspects of the course. # A Course contains information and services on one course implementation. class Course( Persistent, Traversable, Cruft, OFS.Folder.Folder, AccessControl.Role.RoleManager, OFS.SimpleItem.Item, Thread, CatalogAware, ): """Course, contained within CourseManager, represents one course.""" meta_type = 'Course' implements(ICourse) security = ClassSecurityInfo() security.declareObjectPublic() # Parameters: # #- parent: should be a CourseManager # #- name: name of the Course # #- tts: list of ThinkingTypeSets (copies are made in manage_afterAdd) # #- teachers: list of users (of class UserInfo?) that are granted #Teacher privileges # #- etc: other textual information def __init__( self, parent, # Whatever you do, dont bind this to self. name, teachers, description='', organisation='', methods='', starting_date='', ending_date='', uniq_id='' ): """Constructor of the course.""" # Overriding all_meta_types is not not beautiful...but, hey!, it works! #self.all_meta_types = ( # {'name': 'ThinkingTypeSet', # 'action': 'get_id'},) # Cache active members. self.active_memb_cache = [] # Remove all HTML tags from parameters name = strip_all(name) description = strip_all(description) methods = strip_all(methods) organisation = strip_all(organisation) Thread.__init__(self, parent) # Takes care of id and title. if teachers != ('',): self.add_teachers(teachers) self.__name = name # name of the course self.__organisation = organisation self.__description = description self.__methods = methods self.__starting_date = starting_date self.__ending_date = ending_date self.__credits = '' self.__courseID = '' self.__quote = '' self.course_category = 0 self.course_locked = 0 self.default_catalog = 'courses_zcatalog' # course should have uniq id since self.id tends to change self.setUniqId(uniq_id) # This is for group folder path listings - show path up to course. self.toplevel = 1 security.declareProtected(perm_edit, 'add_course_form_handler') def add_course_form_handler(self, REQUEST, course_id, my_name, desc, organisation, methods, start_date, end_date, cancel='', # submit buttons add='', # tekst='', logo_upload=None, staatus=0, regStatus=0, cCat = 0, credits='', courseID='', delete_logo='', lock_course=0, ): """ modify course info, override CourseManager method. """ if cancel: return REQUEST.RESPONSE.redirect('course_info.html?course_id=%s' % course_id) elif not add: raise 'IVA Error', 'Inknown button' if course_id != self.get_id() or not course_id: return action=apply( make_action, ['manage_course_info.html'] + [(x, eval(x)) for x in ('my_name', 'desc', 'organisation', 'methods', 'start_date', 'end_date')]) action += '&course_id=' + course_id my_name=my_name.strip() if not is_valid_title(my_name): return self.restrictedTraverse('message_dialog_error.html')( self, REQUEST, title='Invalid name', message='Give valid name', action=action) if my_name != self.get_name(): if my_name in [x.get_name for x in self.get_courses()]: return self.restrictedTraverse('message_dialog_error.html')( self, REQUEST, title='Invalid name', message="Name '%s' taken" % my_name, action=action) from common import convert_date # convert dates to time.time()-format errors = [] if not start_date: starting_date = 0 else: try: time_tuple = strptime.strptime(start_date, translate(self,'short_date_format',default="%Y-%m-%d")) starting_date = convert_date(str(time_tuple[2]), # day str(time_tuple[1]), # month str(time_tuple[0])) # year except: errors.append(translate(self,'Starting date:',default="%Y-%m-%d"),) if not end_date: ending_date = 0 else: try: time_tuple = strptime.strptime(end_date, translate(self,'short_date_format',default="%Y-%m-%d")) ending_date = convert_date(str(time_tuple[2]), # day str(time_tuple[1]), # month str(time_tuple[0])) # year except: errors.append(translate(self,'Ending date:',default="%Y-%m-%d"),) organisation = organisation.strip() if organisation and not is_valid_title(organisation): errors.append(translate(self,'Organization')) if len(errors) > 0: return self.restrictedTraverse('message_dialog_error.html')( self, REQUEST, title='Invalid input', message=translate(self,'Invalid fields') + ": '" + \ "' , '".join(errors) + "'", action=action) logo = None try: if len(logo_upload.filename)>0: logo=logo_upload.read() else: logo = None except AttributeError: pass self.update( my_name, desc, organisation, methods, starting_date, ending_date, tekst, logo, staatus, regStatus, cCat, credits, courseID, delete_logo, lock_course, ) return REQUEST.RESPONSE.redirect(self.absolute_url()+'/course_info.html') def is_courseContext(self): """ is course context? """ return def kasTeadmuspajasKontekste(self): "jah/ei" return len(self.objectValues('CourseContext'))>0 def kasMeediapajasProjekte(self): "jah/ei" return len(self.jamming.objectValues('JamSession'))>0 security.declareProtected(perm_manage, 'resetPenders') def resetPenders(self): """ reset penders """ self.ootajad = [] return "done" def kysiOotajad(self, REQUEST=None): "Kursusele registreerunud, keda pole veel kinnitatud" return getattr(self, 'ootajad', []) def isLocked(self): """ is course locked """ try: return self.course_locked except AttributeError: return False security.declareProtected(perm_edit,'manage_pendingUsers_handler') def manage_pendingUsers_handler(self, REQUEST, addtocourse='',deletefromlist='',pender=''): """Add or remove user from pending list""" #from Kirjakast import Kirjakast temp = getUtility(IUserManager) if not pender: return self.restrictedTraverse('message_dialog_error.html')(self, REQUEST, title="Error", message='Please select some users first', action='manage_pendingUsers.html') if type(pender)==types.StringType: pender=(pender,) penders=self.kysiOotajad() new_penders = [] if deletefromlist: for x in pender: for y in penders: if y == x: continue new_penders.append(y) if addtocourse: for x in pender: puser = temp.get_user_info(x) self.add_student(puser.get_uname()) mess_title = translate(self,'Added to course') mess_body = translate(self,'You have been added to course ').encode('utf-8')+' '+self.get_name() puser.kirjad.saadaKiri(REQUEST,puser.get_uname(),puser.get_uname(),mess_title,mess_body,999999) for x in penders: if x not in pender: new_penders.append(x) self.reindex_object() self.ootajad=new_penders return REQUEST.RESPONSE.redirect('management_index.html') def _ootajastKursuseLiikmeks(self, REQUEST, kasutaja): """Lisamine""" print "_ootajastKursuseLiikmeks" self.add_student(kasutaja) return 1 def lisaOotaja(self, REQUEST, ootaja): """ Kasutaja lisab end ootama """ print "lisan ootaja...", ootaja, self.getRegStatus() if self.getRegStatus()=='2': #automaatregistreerumine self._ootajastKursuseLiikmeks(REQUEST, ootaja) else: m=self.kysiOotajad(REQUEST) if ootaja not in m: m.append(ootaja) elif ootaja in m: m.remove(ootaja) self.ootajad=m self.reindex_object() return 0 def kasOotaja(self, nimi): "Kas jooksev kasutaja on kursusele ootaja" m=self.kysiOotajad() kas=0 for x in m: if nimi==x: kas=1 return kas security.declareProtected(perm_view, 'get_printable_name') # No additional comments. def get_printable_name(self): """Return name of the course.""" return self.__name security.declareProtected(perm_view, 'get_bg_colour_name') def get_bg_colour_name(self): """...""" return 'gr' # security.declareProtected(perm_view, 'get_name') security.declarePublic('get_name') # No additional comments. def get_name(self): """Get course name.""" #return unicode(self.__name, 'utf-8') return self.__name security.declareProtected(perm_view, 'get_organisation') # No additional comments. def get_organisation(self): """Get organisation name.""" return self.__organisation security.declareProtected(perm_view, 'get_description') # No additional comments. def get_description(self): """Get description.""" return self.__description security.declareProtected(perm_view,'render_description') def render_description(self): """Render description.""" return render( self.get_description(), legal_tags=normal_entry_tags_and_link) security.declareProtected(perm_view, 'get_methods') # No additional comments. def get_methods(self): """Get info on methods.""" return self.__methods security.declareProtected(perm_view,'render_methods') def render_methods(self): """Render methods.""" return render( self.get_methods(), legal_tags=normal_entry_tags_and_link) def get_courseID(self): """ get course id """ try: return self.__courseID except AttributeError: return "" def set_courseID(self,courseID): """ set course id(the other one) """ self.__courseID = courseID def get_credits(self): """ get credits info """ try: return self.__credits except AttributeError: return "" def set_credits(self,credits): """ set credits """ self.__credits = credits # security.declareProtected(perm_view, 'get_teachers') # No additional comments. security.declarePublic('get_teachers') def get_teachers(self): """Get teachers.""" principals = self.acl_users.groups.listAssignedPrincipals(self.getId()+'_teachers') return [ x[0] for x in principals] def add_teachers(self, teachers): grname = self.getId()+'_teachers' fle = getUtility(IFLE) fle.acl_users.groups.manage_addPrincipalsToGroup(grname, teachers) def get_students(self): """Get students.""" principals = self.acl_users.groups.listAssignedPrincipals(self.getId()) return [ x[0] for x in principals] security.declareProtected(perm_view, 'get_start_dd') # No additional comments. def get_start_dd(self): """Return starting day.""" if self.__starting_date: return time.localtime(self.__starting_date)[2] else: return '' security.declareProtected(perm_view, 'get_start_mm') # No additional comments. def get_start_mm(self): """Return starting month.""" if self.__starting_date: return time.localtime(self.__starting_date)[1] else: return '' security.declareProtected(perm_view, 'get_start_yyyy') # No additional comments. def get_start_yyyy(self): """Return starting year.""" if self.__starting_date: return time.localtime(self.__starting_date)[0] else: return '' security.declareProtected(perm_view, 'get_end_dd') # No additional comments. def get_end_dd(self): """Return ending day.""" if self.__ending_date: return time.localtime(self.__ending_date)[2] else: return '' security.declareProtected(perm_view, 'get_end_mm') # No additional comments. def get_end_mm(self): """Return ending month.""" if self.__ending_date: return time.localtime(self.__ending_date)[1] else: return '' security.declareProtected(perm_view, 'get_end_yyyy') # No additional comments. def get_end_yyyy(self): """Return ending year.""" if self.__ending_date: return time.localtime(self.__ending_date)[0] else: return '' # security.declareProtected(perm_view, 'get_printable_starting_date') security.declarePublic('get_printable_starting_date') #FIX:security check # No additional comments. def get_printable_starting_date(self, REQUEST): """Get starting date.""" if not self.__starting_date or self.__starting_date == 0: return '' return time.strftime(translate(self,'short_date_format',default="%Y-%m-%d"), time.localtime(self.__starting_date)) # security.declareProtected(perm_view, 'get_printable_ending_date') security.declarePublic('get_printable_ending_date') #FIX:security check # No additional comments. def get_printable_ending_date(self, REQUEST): """Get ending date.""" if not self.__ending_date or self.__ending_date == 0: return '' return time.strftime(translate(self,'short_date_format',default="%Y-%m-%d"), time.localtime(self.__ending_date)) security.declarePrivate('get_start_date') def get_start_date(self): return self.__starting_date security.declarePrivate('get_end_date') def get_end_date(self): return self.__ending_date security.declarePublic('get_n_users') def get_n_users(self): return len(self.get_all_users_id()) security.declareProtected(perm_view, 'get_all_users') def get_all_users(self): """Return a list of all users on course.""" rv = [] for uname in self.get_all_users_id(): try: rv.append(self.fle_users.get_user_info(uname)) except Exception: pass return rv security.declareProtected(perm_edit, 'kasutajateKodutoodeArhiiv') def kasutajateKodutoodeArhiiv(self, REQUEST): """ Arhiiv, kuhu paigutatakse kasutaja esitatud materjalid """ import tempfile import zipfile failinimi=tempfile.mktemp() arhiiv=zipfile.ZipFile(failinimi,"w",zipfile.ZIP_DEFLATED) kasutajad=self.get_all_users() for x in kasutajad: try: kc=getattr(x.webtop, 'c'+self.id) except AttributeError: continue for too in self.kodutood.objectValues(): if too.tyyp==1: #materjal if hasattr(kc, 'portfolio'): if hasattr(kc.portfolio, too.id): tookataloog=getattr(kc.portfolio, too.id) tookataloog.looArhiiv(REQUEST, arhiiv, x.id+'/') if too.tyyp==2: #grupp mitu=0 punkte=-1 for kg in x.kysiKursuseSisegrupid(self): if kg.id in too.grupid: #XXX:TODO: check this, might be incorrect esitluskaust = getattr(kg.portfolio, too.id) esitluskaust.looArhiiv(REQUEST, arhiiv, kg.id+'/') arhiiv.close() file = open(failinimi,"rb") export_data=file.read() file.close() import os os.remove(failinimi) REQUEST.RESPONSE.setHeader('Content-type','application/zip') REQUEST.RESPONSE.setHeader('Content-disposition','attachment; filename=tooarhiiv.zip') return export_data def fotogaKasutajad(self): """return list of users with photo""" users = self.get_all_users_id() um = getUtility(IUserManager) #for u in users: q = {'userid': users, 'has_photo':1} res = um.userinfo_zcatalog(q) return res def fototaKasutajad(self): """return a list of users without a photo uploaded""" users = self.get_all_users_id() um = getUtility(IUserManager) q = {'userid': users, 'has_photo': 0} res = um.userinfo_zcatalog(q) return res security.declareProtected(perm_view, 'get_all_users_id') def get_all_users_id(self): """Return a list of all users on this course. Same thing as get_all_users, but this one returns a list of id's of UserInfo objects, not the UserInfo object reference.""" return self.get_teachers()+self.get_students() def get_sorted_student_list(self): nimed = [] rolliga = self.get_students() for aaa in rolliga: nimed.append([self.lastAndFirst(aaa), aaa]) nimed.sort() return [ x[1] for x in nimed ] security.declareProtected(perm_edit, 'add_student') #The person is added with Student role access. # NOTE: This method is no longer needed, except in the test cases! def add_student(self, name): """Add person to the course.""" # Check that user exists... Raises exception if not. #XXX: is this really needed!? now commented out print "adding student..." self.acl_users.groups.manage_addPrincipalsToGroup(str(self.getId()), (name,)) security.declareProtected(perm_manage, 'remove_person') def remove_person(self, person): """Remove person from the course.""" kasutaja = getUtility(IUserManager).get_user_info(person) kasutaja.jooksev_kursus = 0 self.acl_users.groups.manage_removePrincipalsFromGroup(self.getId(), (person,)) security.declareProtected(perm_view, 'has_role') # No additional comments. def has_role(self, person, role): """Return whether the user is in the specified role.""" return role in get_roles(self,person) security.declareProtected(perm_edit, 'add_users_form_handler') def add_users_form_handler(self, REQUEST, users_None=[], # checkboxes users_Teacher=[], users_Student=[], None_to_Teacher='', # submit buttons Teacher_to_None='', None_to_Student='', Student_to_None=''): """Adding and removing users from a given course.""" temp = getattr(self.fle_root(), 'fle_users') grname = str(self.getId()) if None_to_Student: # add students to course if not self.isCourseReady(self): return self.restrictedTraverse('message_dialog_error.html')( title="Error", message="Students can be added to the course when all required course information is submitted and course status is open(not preparing, not finished). Adding teachers is OK.", action="manage_course_info.html") self.acl_users.groups.manage_addPrincipalsToGroup(grname, users_None) for user in users_None: try: koht = getattr(temp, user) koht2 = getattr(koht, 'kirjad') uinf = getUtility(IUserManager).get_user_info(user) tlang = uinf.get_language() teade = translate(self,'You have been added to course ',target=tlang).encode('utf-8') koht2.saadaKiri(REQUEST, uinf.get_uname(), uinf.get_uname(), teade,'\n\n\n'+\ teade+": "+\ str(self.get_name()),9999) except AttributeError: pass except KeyError: pass elif Student_to_None: # remote students from course self.acl_users.groups.manage_removePrincipalsFromGroup(grname, users_Student) elif None_to_Teacher: # give users teacher role grname += '_teachers' self.acl_users.groups.manage_addPrincipalsToGroup(grname, users_None) elif Teacher_to_None: # remove teacher role from users grname += '_teachers' self.acl_users.groups.manage_removePrincipalsFromGroup(grname, users_Teacher) else: return self.restrictedTraverse('message_dialog_error.html')( title='Error', message='Please select some users first', action='manage_participants.html') self.reindex_object() return REQUEST.RESPONSE.redirect('manage_participants.html') # FIXME: input_checks: tt_set_name not checked # FIXME: input_checks: two course contexts can have identical name. security.declareProtected(perm_add_lo, 'add_course_context') # Handler for add_course_context_form def add_course_context( self, my_name, description, tt_set_name, description_long, REQUEST, use_roleplay='', # New in ZPTIVA show_only_roleplay=0, # Submit buttons. publish='', cancel='', ): """Add CourseContext object.""" if publish: error_fields = [] errors = [] my_name = my_name.strip() if not is_valid_title(my_name): error_fields.append(translate(self,'title of context')) if my_name in [x[1] for x in self.get_course_context_names()]: errors.append(translate(self,"Name '%s' taken") % my_name) # Variables 'description' and 'description_long' are not checked # because render_description() and render_long_description() # methods in CourseContext filter out unwanted HTML tags. if len(error_fields) > 0 or len(errors) > 0: msg = ", ".join(errors) if len(error_fields) > 0: msg = msg + "
" + 'Invalid fields' + \ ": '" + "' , '".join(error_fields) + "'" return self.restrictedTraverse('message_dialog_error.html')( self, REQUEST, title='Invalid input', message=msg, action=apply( make_action, ['add_course_context_form'] + [(x, eval(x)) for x in ('my_name', 'description', 'tt_set_name', 'description_long')])) uname=str(REQUEST.AUTHENTICATED_USER) obj = CourseContext( self, my_name, description, description_long, tt_set_name, uname,) obj.public = 1 if int(show_only_roleplay)==1: #obj.__roleplay_only = 1 obj.set_roleplay_only(1) id = obj.get_id() self._setObject(id, obj) #XXX: statistics - lisaSyndmus(konteksteLisatud. should it be somewhere? #self.lisaSyndmus(REQUEST, 'konteksteLisatud') try: # XXX:TODO: check this getUser... it fails sometimes obj.changeOwnership(self.acl_users.getUserById(id=uname).__of__(self.acl_users)) obj.manage_setLocalRoles(uname,('Owner',)) except: pass if REQUEST: pagename="" if use_roleplay: pagename="course_setup_roleplay_form.html" REQUEST.RESPONSE.redirect('%s/%s' % (str(id),pagename)) elif cancel: return REQUEST.RESPONSE.redirect(REQUEST.URL1) else: raise "add_course_context called without 'publish' or 'cancel'" security.declareProtected(perm_view, 'get_course_context_names') # No additional comments. def get_course_context_names(self): """Return a list of CourseContext names.""" retval = {} for e in self.get_children('CourseContext'): id = e.get_id() name = e.get_name() retval[id] = name return retval.items() #security.declareProtected(perm_view, 'get_n_notes') def get_n_notes(self): """Returns a sum of all notes in all contexts.""" count = 0 for cc in self.get_course_contexts(): count += cc.get_n_notes() return count security.declareProtected(perm_view, 'get_n_unread_notes') def get_n_unread_notes(self,uname): """Returns true if course as unread notes in KB.""" # TODO: figure out how many messages are unread uname = str(getSecurityManager().getUser()) st = getUtility(IStatistics).__of__(getUtility(IFLE)) lv = st.getUserCourseStat(self.getId(), 'visit_knowledgebuilding', uname) lu = st.getCourseStat(self.getId(), 'modified_knowledgebuilding') if lu>lv: return True # count = 0 # for cc in self.get_course_contexts(): # count += cc.get_n_unread_notes(uname) # return count return False security.declarePublic('update') # Parameters are received from the form (apparently). def update( self, name, description, organisation, methods, starting_date, ending_date, tekst, logo=None, status=0, regStatus = 0, cCat = 0, credits='', courseID='', delete_logo = '', lock_course=0, ): """Edit course information.""" self.__name = name self.__description = description self.__organisation = organisation self.__methods = methods self.__starting_date = starting_date self.__ending_date = ending_date self.setQuote(tekst) if logo: self.setLogo(logo) self.setStatus(status) self.setRegStatus(regStatus) if cCat: self.setCourseCategory(int(cCat)) self.set_credits(credits) self.set_courseID(courseID) self.course_locked = lock_course if delete_logo: self.removeLogo() #XXX: remove self.default_catalog = 'courses_zcatalog' self.reindex_object() security.declareProtected(perm_view, 'get_course_contexts') def get_course_contexts(self): """Return a list of all course contexts in this course.""" for abc in self.get_children('CourseContext'): if getattr(abc, 'public', None) is None: abc.public = 1 return self.get_children('CourseContext') security.declareProtected(perm_view, 'get_course_context_ids_in_order') def get_course_context_ids_in_order(self, id_list): """Return a list of ids of all course contexts in this course.""" return [o.get_id() for o in self.get_course_contexts_in_order(id_list)] security.declareProtected(perm_edit, 'delete_course_context') def context_form_handler(self,REQUEST,cc_id,staatus='', kustuta=''): """ delete or change status """ self.hide_show_course_context(cc_id) return REQUEST.RESPONSE.redirect(self.absolute_url()+'/kb_index.html') security.declareProtected(perm_edit, 'delete_course_context') def delete_course_context(self,id): for c in self.get_children('CourseContext'): if c.get_id() == id: self._delObject(id) return security.declareProtected(perm_edit, 'hide_show_course_context') def hide_show_course_context(self,id): for c in self.get_children('CourseContext'): if c.get_id() == id: if c.public == 1: c.public = 0 else: c.public = 1 c._p_changed = 1 break return # FIXME: See the random comment below, random is not # FIXME: probably the order that we really want. security.declareProtected(perm_view, 'get_course_contexts_in_order') def get_course_contexts_in_order(self, id_list): """Return a list of all course contexts in this course.""" contexts = {} for c in self.get_children('CourseContext'): contexts[c.get_id()] = c retval = [] if id_list and id_list != ['']: if type(id_list) == types.StringType: id_list = (id_list, ) # Return courses contexts in a given order. for identifier in id_list: try: retval.append(contexts[identifier]) del contexts[identifier] except KeyError: # invalid id_list pass # If we still course contexts left (id_list is shorter # than the actual number of course_contexts), append # them to list in a random order. for key in contexts.keys(): retval.append(contexts[key]) return retval security.declarePublic('may_view_course') def may_view_course(self, REQUEST): """Return boolean depending on wether user may or may not view the course.""" from AccessControl.PermissionRole import rolesForPermissionOn return intersect_bool( get_roles(self,str(REQUEST.AUTHENTICATED_USER)), rolesForPermissionOn(perm_view,self)) security.declareProtected(perm_view, 'may_add_course_context') def may_add_course_context(self, person): """Return boolean depending on wether user may or may not add a course context to the course.""" from AccessControl.PermissionRole import rolesForPermissionOn return intersect_bool( get_roles(self,person), rolesForPermissionOn(perm_add_lo,self)) security.declareProtected(perm_view, 'may_edit_course') def may_edit_course(self, person): """Return boolean depending on whether person can edit the course or not.""" from AccessControl.PermissionRole import rolesForPermissionOn return intersect_bool( get_roles(self,person), rolesForPermissionOn(perm_edit,self)) security.declareProtected(perm_view, 'has_group_folder') def has_group_folder(self): """Return whether course has a group folder or not.""" return len(self.objectIds('GroupFolder'))>0 def add_folder(self, my_name): from GroupFolder import GroupFolder new_id = 'gf' fol = GroupFolder(None,my_name) fol.id=new_id self._setObject(new_id,fol) fol=fol.__of__(self) security.declareProtected(perm_edit, 'teacher_import_handler') def teacher_import_handler(self, REQUEST, file='', to_import=''): """ Form handler for teacher """ kursusenr = self.get_course_id_from_req(REQUEST) if not self.kas_opetaja(REQUEST): return "Keelatud" auser = getSecurityManager().getUser() uname = str(auser) from ImportIMS import Importer from ImportExport import Exporter from StringIO import StringIO out = StringIO() import sys out = sys.stdout import tempfile, os filename = tempfile.mktemp() f = open(filename,"w+b") f.write(file.read()) f.close() import_data=None imported = Importer(uname,self.fle_root(),'',getattr(self.fle_root().courses, kursusenr),kasuta_jooksvat=1, out=out) if auser.has_role(('Manager',)): x = REQUEST.get('restoreMembership', False) if x: x = True imported.manager_importing = x imported.loadZip(filename) element = imported.loadFile('imsmanifest.xml') imported.processFile(self,element) return REQUEST.RESPONSE.redirect(self.absolute_url()+'/'+str(kursusenr)+'/course_info.html') security.declareProtected(perm_edit, 'teacher_export') def teacher_export(self,REQUEST): """ teacher export """ return self.iva_ims_export(REQUEST) security.declareProtected(perm_edit, 'iva_ims_export') def iva_ims_export(self, REQUEST=None, testinimi='',teacherExporting=1): """ Course or a quiz export. XML format. """ from ExportIMS import Kirjutus from ExportIMS import kirjutaYldM import tempfile, os failinimi = tempfile.mktemp() # self.fle_users.export_users(REQUEST,failinimi='ivaexport.zip',kursusega='jah',palju='self.fle_root().fle_users.get_users()') if testinimi: yld = kirjutaYldM() yld.kirjutaYldmanifest(self.get_id(),'QTI test','imsqti_xmlv1p1','','qti.xml') yld.lisaFaili(failinimi) k= Kirjutus(self.fle_root(), self.get_id()) #k.looKursuseFail() #k.exportQTI(failinimi,'') parent = k.looTestiFail() k.kirjutaTestid(parent,testinimi, users='None', teacherExporting=teacherExporting) k.kirjutaTestidZipi(failinimi,'') #k.pakiKursusFaili(failinimi) # k.pakiTulemusFaili(failinimi,testsonly="testsonly") else: yld = kirjutaYldM() yld.kirjutaYldmanifest(self.get_id(), self.get_description(),base=str(self.get_id())+"/") yld.lisaFaili(failinimi) k = Kirjutus(self.fle_root(), self.get_id()) k.looKursuseFail() k.kirjutavCal(self.syndmused,'kursus','GROUP') k.kirjutaWebtop(self.gf,tiitel='RAAMATURIIUL') #for sisegr in self.subgroups.objectValues('GroupFolder'): for sisegr in self.subgroups.objectValues('Subgroup'): k.kirjutaWebtop(sisegr,'SISEGRUPP:'+sisegr.get_name()) try: k.kirjutavCal(sisegr.syndmused,sisegr.get_name(),'GROUP') except: pass if not teacherExporting: k.kirjutaKasutajaWebtop(self.get_all_users()) k.kirjutaKasutajaBlog(self.get_all_users()) #k.kirjutaKasutajaSyndmus(self.get_all_users()) self.fle_users.export_users(REQUEST,failinimi=failinimi,kursusega=self.get_id(),palju='kursus.get_all_users()') k.kirjutaItem(k.organization, tiitel='KURSUSE KASUTAJAD',parameters='KURSUSE KASUTAJAD') k.kirjutaResource(k.resources, 'imsent_xmlv1p1','users.xml') k.kirjutaBlog(self.Blog, k.organization, k.resources) k.exportQTI(failinimi,str(self.get_id())+"/",teacherExporting=teacherExporting) k.exportPajad(failinimi,str(self.get_id())+"/",startonly=teacherExporting) k.exportKodutood(self.kodutood,failinimi,str(self.get_id())+"/") k.pakiKursusFaili(failinimi) if REQUEST: file = open(failinimi,"rb") export_data=file.read() file.close() os.remove(failinimi) REQUEST.RESPONSE.setHeader('Content-disposition','attachment; filename=ivaexport.zip') REQUEST.RESPONSE.setHeader('content-type','application/zip') return export_data else: import os import shutil name='iva_'+re.sub('[^a-zA-Z0-9]', '_', self.get_name()) name2 = name number = 1 searched=0 #print os.path.isfile(os.path.join(Globals.INSTANCE_HOME,'var',name+'.zip')) #print os.path.join(Globals.INSTANCE_HOME,'var',name+'.zip') while os.path.isfile(os.path.join(Globals.INSTANCE_HOME,'var',name+'.zip')): number = number + 1 name = name2+'_ver'+str(number) searched=1 name += '.zip' shutil.move(failinimi,os.path.join(Globals.INSTANCE_HOME,'var',name)) return self.get_name() + ' - ' + os.path.join(Globals.INSTANCE_HOME,'var',name) security.declareProtected(perm_manage, 'export_only_subgroups') def export_only_subgroups(self): """ export only subgroups area """ fname = 'iva_'+str(self.getId()) fname += re.sub('[^a-zA-Z0-9]', '_', self.get_name()) fname += '_subgroups.zip' from ExportIMS import Kirjutus, kirjutaYldM, User_export from ExportIMS import kirjutaYldM import tempfile, os fdeskr, failinimi = tempfile.mkstemp() yld = kirjutaYldM() yld.kirjutaYldmanifest(self.get_id(), self.get_description(),base=str(self.get_id())+"/") yld.lisaFaili(failinimi) k = Kirjutus(self.fle_root(), self.get_id()) ex = User_export() k.looKursuseFail() for sisegr in self.subgroups.objectValues('Subgroup'): k.kirjutaWebtop(sisegr,'SISEGRUPP:'+sisegr.get_name()) ex.kirjutaSisegrupid(self, sisegr) ex.kirjutaMembership(sisegr, 'krupp') try: k.kirjutavCal(sisegr.syndmused,sisegr.get_name(),'GROUP') except: pass ex.lisaMembership() ex.pakiFaili(str(self.getId())+'/', failinimi) k.kirjutaItem(k.organization, tiitel='KURSUSE KASUTAJAD',parameters='KURSUSE KASUTAJAD') k.kirjutaResource(k.resources, 'imsent_xmlv1p1','users.xml') k.pakiKursusFaili(failinimi) import shutil shutil.move(failinimi, os.path.join(Globals.INSTANCE_HOME, 'var', fname)) os.close(fdeskr) return "exported:" + fname def write_touchgraph_data(self,obj,type,children_types=None,extra_links=None): links = '' if children_types: links = ' '.join(obj.objectIds(children_types)) if extra_links: links = ' '.join((links,extra_links)) if not links: links=' ' return obj.get_id()+'\t'+\ type+"\t"+\ self.REQUEST.BASE0+self.get_url_to_object(obj)+'\t'+\ obj.get_name()+'\t'\ +links+'\t' # But should we protect it somehow? # The Java client would then need authentication def touchgraph_data(self): """This is a public method.""" data=self.write_touchgraph_data(self,"COURSE","CourseContext") for ctx in self.objectValues("CourseContext"): data=data+ctx.touchgraph_data() return data+"[END DATA]" def getQuote(self): "Kursuse uudis/moto. Vastab tyhja kui pole" return self.__quote def setQuote(self, tekst): """ set quote """ self.__quote=tekst security.declareProtected(perm_edit, 'removeLogo') def removeLogo(self): """ remove logo """ imgp = getattr(self.fle_root().images, 'courses_imgs', None) try: imgp._delObject('course_'+str(self.get_id())+'_image') except AttributeError: pass security.declareProtected(perm_edit, 'setLogo') def setLogo(self, img): """ add course logo """ resize = 1 try: from PIL import Image except ImportError: resize = 0 import cStringIO imgp = getattr(self.fle_root().images, 'courses_imgs', None) if imgp is None: return 0 try: s = cStringIO.StringIO(img) if resize: im = Image.open(s) (width, height) = im.size if height == 80 and im.format == 'JPEG': pass else: if im.mode != 'RGB': im = im.convert('RGB') if height > 300: mod = float(300)/float(height) width = width*mod height = height*mod im = im.resize((int(width),int(height))) if width > 300: mod = float(300)/float(width) width = width*mod height = height*mod im = im.resize((int(width),int(height))) s = cStringIO.StringIO() try: im.save(s, "JPEG", quality=100) except KeyError: im.save(s, "GIF", quality=100) s.seek(0) img = s.read() try: imgp._delObject('course_'+str(self.get_id())+'_image') except AttributeError: pass imgp.manage_addImage('course_'+str(self.get_id())+'_image', img, 'image') img = getattr(imgp, 'course_'+str(self.get_id())+'_image') img.ZCacheable_setManagerId('HTTPCache') img.ZCacheable_invalidate() except: return 0 self.setLogoThumbnail() return 1 security.declarePrivate('setLogoThumbnail') def setLogoThumbnail(self): """ creates thumbnail from uploaded course logo """ resize = 1 try: from PIL import Image except ImportError: resize = 0 if not resize: return 1 import cStringIO imgp = getattr(self.fle_root().images, 'courses_imgs', None) if imgp is None: return 1 oimg = getattr(imgp, 'course_'+str(self.get_id())+'_image', None) try: s = cStringIO.StringIO(oimg.data) except TypeError: s = cStringIO.StringIO(str(oimg.data)) if s is None: return 1 im = Image.open(s) (width, height) = im.size # resize image 59x59 if height > 59: mod = float(59)/float(height) width = width*mod height = height*mod im = im.resize((int(width), int(height))) if width > 59: mod = float(59)/float(width) width = width*mod height = height*mod im = im.resize((int(width), int(height))) s = cStringIO.StringIO() try: im.save(s, "JPEG", quality=100) except KeyError: im.save(s, "GIF", quality=100) s.seek(0) img = s.read() try: imgp._delObject('course_'+str(self.get_id())+'_image_thumbnail') except AttributeError: pass imgp.manage_addImage('course_'+str(self.get_id())+'_image_thumbnail', img, 'image') img = getattr(imgp, 'course_'+str(self.get_id())+'_image_thumbnail') img.ZCacheable_setManagerId('HTTPCache') img.ZCacheable_invalidate() def hasLogo(self): """ if course has its own logo """ # XXX:weird hack if not getattr(self.images.courses_imgs, 'course_'+str(self.get_id())+'_image', None): return 0 return 1 def hasLogoThumbnail(self): """ if course logo has thumbnail """ if not self.hasLogo(): return 0 if not getattr(self.images.courses_imgs, 'course_'+str(self.get_id())+'_image_thumbnail', None): self.setLogoThumbnail() return 0 return 1 def getLogoTag(self, REQUEST=None): """ ' def getLogoURL(self): """ URL to course logo """ if self.hasLogo(): return getattr(self.images.courses_imgs, 'course_'+str(self.get_id())+'_image', None).absolute_url(1) return '++resource++images/course_default.gif' def getLogoThumbnailURL(self): """ URL to course logo thumbnail """ if self.hasLogoThumbnail(): return "images/courses_imgs/course_%s_image_thumbnail" % str(self.get_id()) #return self.fle_url()+"/images/courses_imgs/course_%s_image_thumbnail" % str(self.get_id()) #return getattr(self.images.courses_imgs, 'course_'+str(self.get_id())+'_image_thumbnail', None).absolute_url(1) return '++resource++images/course_default.gif' def setStatus(self, sttus): """ set course statuse """ self.staatus=sttus def setRegStatus(self, regStatus): """ set registration status """ self.regStatus = regStatus def getStatus(self): """ get course status """ if getattr(self, 'staatus', None) is None: return 0 else: return self.staatus def getRegStatus(self): """ return how students get to course """ return getattr(self, 'regStatus', 0) security.declareProtected(perm_view,'getCourseCategory') def getCourseCategory(self): return self.course_category security.declarePrivate('setcourseCategory') def setCourseCategory(self, id): self.course_category = id self._p_changed = 1 def get_n_artefacts(self): """ number of artefacts on a course """ total = 0 for x in self.jamming.objectValues('JamSession'): total += x.get_n_artefacts() return total def total_n_unseen_jams(self,REQUEST): """ returns True if Jamming has been updated after user's last visit """ uname = str(getSecurityManager().getUser()) st = getUtility(IStatistics).__of__(getUtility(IFLE)) lv = st.getUserCourseStat(self.getId(), 'visit_jamming', uname) lu = st.getCourseStat(self.getId(), 'modified_jamming') if lu>lv: return True #total = 0 #for x in self.jamming.objectValues('JamSession'): # total += x.get_n_unread_artefacts(REQUEST) #return total return False def isCourseAlive(self): """ when was last input, new objects, artefacts? """ #XXX: isCourseAlive ZZZ, this is serious and need a rewrite! curtime = time.time() aeg = curtime-86400*30 #XXX: go away! if self.lastModify() time: time = tmp return float(time) def lastModify_printable(self,REQUEST): """ returns printable last modification date """ return time.strftime(translate(self,'short_date_format',default="%Y-%m-%d"),time.localtime(self.lastModify())) def lastActive(self): """ return last visit to course """ stool = getUtility(IStatistics, context=self) fle = getUtility(IFLE, context = self) stool = stool.__of__(fle) last = stool.getCourseStatWOUser(int(self.get_id()), 'max(lastAccess)') if last is None: return 0 return last def lastActive_printable(self,REQUEST): """ return last visit to course """ return time.strftime(translate(self,'short_date_format',default="%Y-%m-%d"),time.localtime(self.lastActive())) security.declareProtected(perm_manage, 'getUniqId') def getUniqId(self): """ return uniq id. we take it with us when exporting and put it back when importing should be random enough to avoid collision """ return self.__uniq_id security.declarePrivate('setUniqId') def setUniqId(self, uniq_id): """ set course uniq id. don't change it. method is here only for beauty """ if not uniq_id: raise 'IVA error:', 'uniq id not passed' self.__uniq_id = uniq_id return 1 security.declareProtected(perm_view, 'hasWordmaps') def hasWordmaps(self): """ if course has wordmaps """ return getattr(self, 'wordmaps', None) security.declareProtected(perm_edit, 'createWordmaps') def createWordmaps(self): """ create wordmaps """ import WordMapTools wf=WordMapTools.WordmapFolder() wf.id="wordmaps" self._setObject(wf.id, wf) security.declarePrivate('createTracker') def createTracker(self, REQUEST=None): """ create tracker. Trackers came in IVA 1.0 """ from Tracker import Tracker tracker = Tracker() try: self._setObject(tracker.getId(), tracker) except: self._delObject(tracker.getId()) self._setObject(tracker.getId(), tracker) if REQUEST: return self.restrictedTraverse('message_dialog.html')(message='Tracker created', action='tracker') else: return 0 security.declareProtected(perm_view, 'getSubgroupList') def getSubgroupList(self): """ return subgroups and list of participants """ res = {} for x in self.subgroups.getSubgroups(): res[x.get_id()] = x.subgroup_members() return res security.declareProtected(perm_view, 'getExistingWebtops') def getExistingWebtops(self): """ return a list of usernames who have webtop for this course """ urls = [] fler = '/'.join(self.fle_root().getPhysicalPath()) for x in self.get_all_users_id(): urls.append(fler+'/fle_users/'+x+'/webtop/c'+str(self.getId())) q = {'path': urls, 'meta_type': 'Portfolio'} res = self.catalog_webtop_items(q) # get_author can be someone else. it is safer to use url to get username return [ r.getRelativeContentURL.split('/')[1] for r in res ] Globals.default__class_init__(Course) @adapter(ICourse, IObjectWillBeRemovedEvent) def removing(obj, event): obj.unindex_object() stat = queryUtility(IStatistics).__of__(getUtility(IFLE)) if stat is not None: stat._delCourseHistory(int(obj.get_id())) @adapter(ICourse, IObjectAddedEvent) def added(obj, event): #Each course should have its own copy of the ThinkingTypeSets, #as the course administrator (teacher) should be able to edit them, #create new ones and so forth. These changes must not propagate to #other courses: hence the copying of the set. # give teacher and student roles to groups obj.manage_setLocalRoles(obj.id, ('Student',)) obj.manage_setLocalRoles(obj.id+'_teachers', ('Teacher',)) # add subgroups manager sg = Subgroups.SubgroupManager() obj._setObject(sg.id, sg) # add quizes qm = QuizManager() obj._setObject(qm.id, qm) # add jamming obj._setObject('jamming', Jamming('jamming')) # add wordmaps obj.createWordmaps() # add assignments k=Kodutoo.KodutoodeKataloog() k.id='kodutood' obj._setObject(k.id, k) obj.index_object() # add blog b = Blog.Blog() b.id = "Blog" obj._setObject(b.id,b) # add calendar cal = Kalender.KalendriSyndmusteKataloog() obj._setObject('syndmused', cal) # add tracker obj.createTracker() # add SQL row from Statistics import DummySQLMethods sta = getUtility(IFLE) if not isinstance(sta.iva_sql_methods, DummySQLMethods): try: sta.iva_sql_methods._sql_populate_course_table(course_list='('+str(obj.getId())+')') except DatabaseError: logger.error('MySQL database connection is broken!') # EOF