#!/usr/bin/python '''zopesh: Zope Shell. Usage: zopesh [-u username] [-p password] [-h host] [-P port]. Then type 'help' for a list of commands. Copyright 2000 Stéfane Fermigier, www.fermigier.com. Use it at your own risk under the Python (www.python.org) license. Large portions of this code were added by, rewritten, heavily modified by Scott Parish . ''' __version__ = '$Revision: 1.4 $'[11:-2] __todo__ = ''' *doesn't work with ZPublisher.Client from Zope 2.2 *property sheet editing in vi *creation of objects *copy/move *exporting files .zexp *password not on commandline option *consider switch to soap (if ever implimented) or xmlrpc (if this: http://classic.zope.org:8080/Collector/803/view is ever fixed) or WebDAV *have method default check to see if a command is valid (error checking) *ls -l (meta_type, possibly size, creation of modified date) *figure out how to better allow working in zclass objects *allow work in zope versions *improve help/docs *uploading into dtml *importing .zexp *downloading files/images/dtml *completion support for 'l' commands ''' try: import readline except: pass import sys, getopt, cmd, os, tempfile, string, re, traceback, types import ZPublisher.Client import __main__ EDITOR = os.environ.get('EDITOR', 'vi') class Shell(cmd.Cmd): def __init__(self, base_url, username, password): self.base_url = base_url self.username = username self.password = password self.curr_folder = '/' self.prompt = self.curr_folder + "$ " self.client = ZPublisher.Client.Object( self.base_url, username=self.username, password=self.password ) #figure out what commands are options self.commands = [] for m in dir(Shell): if m[:3] == "do_": self.commands.append(m[3:]) #add completion support if __main__.__dict__.has_key('readline'): readline.set_completer(self._complete) readline.parse_and_bind("tab: complete") def emptyline(self): pass def onecmd(self, line): try: cmd.Cmd.onecmd(self, line) except SystemExit: sys.exit() except: traceback.print_exc() def _edit(self, data): # Edit using external editor. tempname = tempfile.mktemp() open(tempname, 'w').write(data) os.system('%s %s' % (EDITOR, tempname)) new_data = open(tempname).read() os.unlink(tempname) return new_data def _getClient(self, path=""): p = self.base_url if type(path) is types.StringType and len(path) == 0: p = p + self.curr_folder elif path[0] == "/": p = p + path else: p = p + self.curr_folder + '/' + path return ZPublisher.Client.Object(p, username=self.username, password=self.password) def _parsePath(self, path): """accepts a path/name and returns (client, name)""" x = string.rfind(path, "/") if x == -1: return self._getClient(""), path else: return self._getClient(path[:x]), path[x+1:] def _exists(self, id): """check for the existance of 'id' in this folder""" try: client, name = self._parsePath(id) id_list = eval(client.objectIds()[1]) except: if sys.exc_info()[0] == 'bci.ServerError': id_list = [] elif sys.exc_info()[0] == 'bci.NotFound': print "%s: No such file or directory" % path return None else: traceback.print_exc() return None if id in id_list: return 1 else: return 0 # # Basic commands. # def do_ls(self, arg): (path,) = string.split(arg, " ") try: client = self._getClient(path) l = eval(client.objectIds()[1]) except: if sys.exc_info()[0] == 'bci.ServerError': l = [] elif sys.exc_info()[0] == 'bci.NotFound': print "ls: %s: No such file or directory" % path return else: traceback.print_exc() return l.sort() for id in l: print id def do_cd(self, id): try: ls = self._getClient(id).objectIds()[1] except: if sys.exc_info()[0] == 'bci.ServerError': ls = [] elif sys.exc_info()[0] == 'bci.NotFound': print "cd: %s: No such file or directory" % id return else: traceback.print_exc() return if id[0] == "/": new_folder = id elif id == '..': new_folder = '/' + string.join( string.split(self.curr_folder, '/')[0:-1], '/') else: new_folder = self.curr_folder + '/' + id new_folder = os.path.normpath(new_folder) new_folder = re.sub('//+', '/', new_folder) try: client = ZPublisher.Client.Object( self.base_url + new_folder, username=self.username, password=self.password) self.client = client self.curr_folder = new_folder except: traceback.print_exc() self.prompt = self.curr_folder + "$ " def do_pwd(self, arg): print self.curr_folder def do_vi(self, id): # Check if document doesn't exists already if self._exists(id) == 0: client, name = self._parsePath(id) old_data = '' create_document = 1 else: client = self._getClient(id) old_data = client.document_src()[1] create_document = 0 new_data = self._edit(old_data) if create_document: try: #self.client.manage_addDTMLDocument(id=id) client.manage_addDTMLDocument(id=name) except: # There's an exception even in the regular case. (FIXME) pass client = self._getClient(id) try: client.manage_upload(file=new_data) except: traceback.print_exc() file = tempfile.mktemp() open(file, 'w').write(new_data) print "Error: your edited data was saved to: %s" % file def do_upload(self, args): """handle upload upload = File | Image """ if len(args) < 2: print "Usage: upload type file..." print " type should be one of 'File' or 'Image'" return args = string.split(args) type = args[0] names = args[1:] for name in names: id = name[string.rfind(name,"/")+1:] client, n = self._parsePath(id) # Check if id already exists if self._exists(id): client.manage_delObjects(ids=[id]) data = open(name,"r").read() try: add = getattr(client, "manage_add%s" % type)(id=id, file=data) except: # There is an exception even inthe regular case. (FIXME) pass do_emacs = do_joe = do_nedit = do_vi def do_more(self, id): client = self._getClient(id) data = client.document_src()[1] tempname = tempfile.mktemp() open(tempname, 'w').write(data) os.system('more %s' % tempname) os.unlink(tempname) def do_less(self, id): client = self._getClient(id) data = client.document_src()[1] tempname = tempfile.mktemp() open(tempname, 'w').write(data) os.system('less %s' % tempname) os.unlink(tempname) def do_rm(self, ids): ids = string.split(ids) for id in ids: client, name = self._parsePath(id) client.manage_delObjects(ids=[name]) def do_rename(self, arg): old_id, new_id = string.split(arg, ' ') self.client.manage_renameObject(id=old_id, new_id=new_id) def do_mkdir(self, id): client, name = self._parsePath(id) client.manage_addFolder(id=name) def do_quit(self, arg): print sys.exit() do_EOF = do_quit def do_restart(self, arg): print "Restarting..." self._getClient("/Control_Panel").manage_restart() def do_stop(self, arg): print "Stopping..." self._getClient("/Control_Panel").manage_shutdown() do_shutdown = do_stop # # Default: if the name given isn't a valid command, check to see if it # is a zope object, if so execute it, if not gripe # def default(self, arg): """try to run the method""" args = {} x = string.find(arg, " ") if x != -1: command = arg[:x] # when given something like: "1=2 3='4' 5="6" # r should have ['','1','2','','3',"'4'",'','5','"6"',''] r = re.compile("\s+(\w+)=([^\s\"']+|\".*?\"|'.*?')") arg_lst = r.split(arg[x:]) i = 1 l = len(arg_lst) - 2 while i < l: if arg_lst[i+1][0]=="'" or arg_lst[i+1][0]=='"': ali1 = arg_lst[i+1] args[arg_lst[i]] = ali1[1:len(ali1)-1] else: args[arg_lst[i]] = arg_lst[i+1] i = i + 3 else: command = arg client, name = self._parsePath(command) print apply(getattr(client, name),(),args)[1] # '*** Unknown command: %s' # # Toing with properties. # def do_lsprop(self, id): client = self._getClient(id) print client.propertyItems() l = eval(client.propertyItems()[1]) for k, v in l: print "%s: %s" % (k, str(v)) def do_editprop(self, arg): id, property_id = string.split(arg, ' ') client = self._getClient(id) old_data = client.getProperty(id=property_id)[1] new_data = self._edit(old_data) if old_data: apply(client.manage_changeProperties, (), {property_id: new_data}) else: try: client.manage_addProperty( id=property_id, value=new_data, type='string') except: apply(client.manage_changeProperties, (), {property_id: new_data}) # # local commands # def do_lpwd(self, arg): print os.getcwd() def do_lcd(self, arg): os.chdir(arg) def do_lls(self, arg): arg = arg or "." lst = os.listdir(arg) lst.sort for x in lst: print x def do_lmkdir(self, arg): os.mkdir(arg) def do_lrmdir(self, arg): os.rmdir(arg) def do_lrm(self, arg): os.unlink(arg) # #completion code # def _complete(self, text, state): """Returns completion possibilities for 'text'""" if state == 0: self.matches = self._get_matches(text) try: return self.matches[state] except IndexError: return None def _get_matches(self, text): matches = [] text = string.lstrip(readline.get_line_buffer()) l = len(text) self.client.objectIds # don't know why, but don't remove that client, name = self._parsePath(text[string.rfind(text, " ")+1:]) l2 = len(name) ls = eval(client.objectIds()[1]) for m in self.commands: if m[:l] == text: matches.append(m) for m in ls: if m[:l2] == name: # check against name, it has path rm'ed matches.append(m) return matches def main(): host = 'localhost' user = 'superuser' port = 80 password = '' opts, args = getopt.getopt(sys.argv[1:], 'P:p:u:h:') for name, value in opts: if name == '-p': password = value elif name == '-P': port = int(value) elif name == '-u': user = value elif name == '-h': host = value else: print __doc__ sys.exit(1) shell = Shell('http://%s:%d' % (host, port), user, password) shell.cmdloop() if __name__ == '__main__': main() # vim:ts=8