#!/usr/bin/python '''zopesh: Zope Shell. Usage: zopesh [-u username] [-p password] [-h host] [-P port]. Then type 'help' for a list of commands. Copyright 2000 St�fane Fermigier, www.fermigier.com. Use it at your own risk under the Python (www.python.org) license. Large portions of this code were added by, rewritten, heavily modified by Scott Parish . ''' __version__ = '$Revision: 1.4 $'[11:-2] __todo__ = ''' *doesn't work with ZPublisher.Client from Zope 2.2 *property sheet editing in vi *creation of objects *copy/move *exporting files .zexp *password not on commandline option *consider switch to soap (if ever implimented) or xmlrpc (if this: http://classic.zope.org:8080/Collector/803/view is ever fixed) or WebDAV *have method default check to see if a command is valid (error checking) *ls -l (meta_type, possibly size, creation of modified date) *figure out how to better allow working in zclass objects *allow work in zope versions *improve help/docs *uploading into dtml *importing .zexp *downloading files/images/dtml *completion support for 'l' commands ''' try: import readline except: pass import sys, getopt, cmd, os, tempfile, string, re, traceback, types import ZPublisher.Client import __main__ EDITOR = os.environ.get('EDITOR', 'vi') class Shell(cmd.Cmd): def __init__(self, base_url, username, password): self.base_url = base_url self.username = username self.password = password self.curr_folder = '/' self.prompt = self.curr_folder + "$ " self.client = ZPublisher.Client.Object( self.base_url, username=self.username, password=self.password ) #figure out what commands are options self.commands = [] for m in dir(Shell): if m[:3] == "do_": self.commands.append(m[3:]) #add completion support if __main__.__dict__.has_key('readline'): readline.set_completer(self._complete) readline.parse_and_bind("tab: complete") def emptyline(self): pass def onecmd(self, line): try: cmd.Cmd.onecmd(self, line) except SystemExit: sys.exit() except: traceback.print_exc() def _edit(self, data): # Edit using external editor. tempname = tempfile.mktemp() open(tempname, 'w').write(data) os.system('%s %s' % (EDITOR, tempname)) new_data = open(tempname).read() os.unlink(tempname) return new_data def _getClient(self, path=""): p = self.base_url if type(path) is types.StringType and len(path) == 0: p = p + self.curr_folder elif path[0] == "/": p = p + path else: p = p + self.curr_folder + '/' + path return ZPublisher.Client.Object(p, username=self.username, password=self.password) def _parsePath(self, path): """accepts a path/name and returns (client, name)""" x = string.rfind(path, "/") if x == -1: return self._getClient(""), path else: return self._getClient(path[:x]), path[x+1:] def _exists(self, id): """check for the existance of 'id' in this folder""" try: client, name = self._parsePath(id) id_list = eval(client.objectIds()[1]) except: if sys.exc_info()[0] == 'bci.ServerError': id_list = [] elif sys.exc_info()[0] == 'bci.NotFound': print "%s: No such file or directory" % path return None else: traceback.print_exc() return None if id in id_list: return 1 else: return 0 # # Basic commands. # def do_ls(self, arg): (path,) = string.split(arg, " ") try: client = self._getClient(path) l = eval(client.objectIds()[1]) except: if sys.exc_info()[0] == 'bci.ServerError': l = [] elif sys.exc_info()[0] == 'bci.NotFound': print "ls: %s: No such file or directory" % path return else: traceback.print_exc() return l.sort() for id in l: print id def do_cd(self, id): try: ls = self._getClient(id).objectIds()[1] except: if sys.exc_info()[0] == 'bci.ServerError': ls = [] elif sys.exc_info()[0] == 'bci.NotFound': print "cd: %s: No such file or directory" % id return else: traceback.print_exc() return if id[0] == "/": new_folder = id elif id == '..': new_folder = '/' + string.join( string.split(self.curr_folder, '/')[0:-1], '/') else: new_folder = self.curr_folder + '/' + id new_folder = os.path.normpath(new_folder) new_folder = re.sub('//+', '/', new_folder) try: client = ZPublisher.Client.Object( self.base_url + new_folder, username=self.username, password=self.password) self.client = client self.curr_folder = new_folder except: traceback.print_exc() self.prompt = self.curr_folder + "$ " def do_pwd(self, arg): print self.curr_folder def do_vi(self, id): # Check if document doesn't exists already if self._exists(id) == 0: client, name = self._parsePath(id) old_data = '' create_document = 1 else: client = self._getClient(id) old_data = client.document_src()[1] create_document = 0 new_data = self._edit(old_data) if create_document: try: #self.client.manage_addDTMLDocument(id=id) client.manage_addDTMLDocument(id=name) except: # There's an exception even in the regular case. (FIXME) pass client = self._getClient(id) try: client.manage_upload(file=new_data) except: traceback.print_exc() file = tempfile.mktemp() open(file, 'w').write(new_data) print "Error: your edited data was saved to: %s" % file def do_upload(self, args): """handle upload upload = File | Image """ if len(args) < 2: print "Usage: upload type file..." print " type should be one of 'File' or 'Image'" return args = string.split(args) type = args[0] names = args[1:] for name in names: id = name[string.rfind(name,"/")+1:] client, n = self._parsePath(id) # Check if id already exists if self._exists(id): client.manage_delObjects(ids=[id]) data = open(name,"r").read() try: add = getattr(client, "manage_add%s" % type)(id=id, file=data) except: # There is an exception even inthe regular case. (FIXME) pass do_emacs = do_joe = do_nedit = do_vi def do_more(self, id): client = self._getClient(id) data = client.document_src()[1] tempname = tempfile.mktemp() open(tempname, 'w').write(data) os.system('more %s' % tempname) os.unlink(tempname) def do_less(self, id): client = self._getClient(id) data = client.document_src()[1] tempname = tempfile.mktemp() open(tempname, 'w').write(data) os.system('less %s' % tempname) os.unlink(tempname) def do_rm(self, ids): ids = string.split(ids) for id in ids: client, name = self._parsePath(id) client.manage_delObjects(ids=[name]) def do_rename(self, arg): old_id, new_id = string.split(arg, ' ') self.client.manage_renameObject(id=old_id, new_id=new_id) def do_mkdir(self, id): client, name = self._parsePath(id) client.manage_addFolder(id=name) def do_quit(self, arg): print sys.exit() do_EOF = do_quit def do_restart(self, arg): print "Restarting..." self._getClient("/Control_Panel").manage_restart() def do_stop(self, arg): print "Stopping..." self._getClient("/Control_Panel").manage_shutdown() do_shutdown = do_stop # # Default: if the name given isn't a valid command, check to see if it # is a zope object, if so execute it, if not gripe # def default(self, arg): """try to run the method""" args = {} x = string.find(arg, " ") if x != -1: command = arg[:x] # when given something like: "1=2 3='4' 5="6" # r should have ['','1','2','','3',"'4'",'','5','"6"',''] r = re.compile("\s+(\w+)=([^\s\"']+|\".*?\"|'.*?')") arg_lst = r.split(arg[x:]) i = 1 l = len(arg_lst) - 2 while i < l: if arg_lst[i+1][0]=="'" or arg_lst[i+1][0]=='"': ali1 = arg_lst[i+1] args[arg_lst[i]] = ali1[1:len(ali1)-1] else: args[arg_lst[i]] = arg_lst[i+1] i = i + 3 else: command = arg client, name = self._parsePath(command) print apply(getattr(client, name),(),args)[1] # '*** Unknown command: %s' # # Toing with properties. # def do_lsprop(self, id): client = self._getClient(id) print client.propertyItems() l = eval(client.propertyItems()[1]) for k, v in l: print "%s: %s" % (k, str(v)) def do_editprop(self, arg): id, property_id = string.split(arg, ' ') client = self._getClient(id) old_data = client.getProperty(id=property_id)[1] new_data = self._edit(old_data) if old_data: apply(client.manage_changeProperties, (), {property_id: new_data}) else: try: client.manage_addProperty( id=property_id, value=new_data, type='string') except: apply(client.manage_changeProperties, (), {property_id: new_data}) # # local commands # def do_lpwd(self, arg): print os.getcwd() def do_lcd(self, arg): os.chdir(arg) def do_lls(self, arg): arg = arg or "." lst = os.listdir(arg) lst.sort for x in lst: print x def do_lmkdir(self, arg): os.mkdir(arg) def do_lrmdir(self, arg): os.rmdir(arg) def do_lrm(self, arg): os.unlink(arg) # #completion code # def _complete(self, text, state): """Returns completion possibilities for 'text'""" if state == 0: self.matches = self._get_matches(text) try: return self.matches[state] except IndexError: return None def _get_matches(self, text): matches = [] text = string.lstrip(readline.get_line_buffer()) l = len(text) self.client.objectIds # don't know why, but don't remove that client, name = self._parsePath(text[string.rfind(text, " ")+1:]) l2 = len(name) ls = eval(client.objectIds()[1]) for m in self.commands: if m[:l] == text: matches.append(m) for m in ls: if m[:l2] == name: # check against name, it has path rm'ed matches.append(m) return matches def main(): host = 'localhost' user = 'superuser' port = 80 password = '' opts, args = getopt.getopt(sys.argv[1:], 'P:p:u:h:') for name, value in opts: if name == '-p': password = value elif name == '-P': port = int(value) elif name == '-u': user = value elif name == '-h': host = value else: print __doc__ sys.exit(1) shell = Shell('http://%s:%d' % (host, port), user, password) shell.cmdloop() if __name__ == '__main__': main() # vim:ts=8