__doc__=""" Free Transactions This is an attempt to associate zodb transactions on a per connection basis. the major benefits is by disassociating the transaction to thread association we can have multiple connections in a single thread and hence multiple dbs open while maintaining per connection transaction integrity. functional changes to application code Application code can continue to use get_transaction().register(ob) as long as ob is a persistent object or a zodb connection. Application code that tries to integrate non-persistent objects for transactional control needs to call get_transaction() with a persistent object or connection as an arg so that it can be registered with an appropiate connection. Transitioning between transaction states cannot be done via get_transaction(), a persistent object or connection must be passed in so the proper transaction is returned. what doesn't work - versions... i've never really used versions, so i'm not sure about their implementation, other than it looks strange;) - mounted storages. this is an alternative to this approach for multi db systems, but requires? treating multiple dbs as one logical unit. it seems more a zopish thing at the moment. - probably most advanced usages... some things i've seen like db.open with a transaction. basically use of the full range of ZODB features (ie Zope) will not work. otoh, most basic and common usages as a POS will probably work. other caveats multi-threaded code shouldn't have share connections, its probably better for such code to use and return conns to the db as needed. installation import this module and call the install_free_txn function, which will hotpatch the connection and transaction classes and install the get_transaction accessor in globals. - transaction.py abort free transaction - connection.py sync method, pass in self tests i did some basic tests. if it breaks you get to keep both pieces license Zope Public License (ZPL) ZPL 1.0 or a later version of the ZPL at your discretion. author: kapil thangavelu """ from ZODB.DB import DB from ZODB.Connection import Connection from ZODB.Transaction import Transaction import ZODB.Transaction as TxnModule class TxnConnException(Exception): pass class TxnGuard: """ this is a wrapper around a transaction it mainly acts as a guard against legacy interaction, while still permitting the form get_transaction().register(obj). where obj is a persistent object. non persistent objects need to explicitly get a transaction with a persistent object """ def __init__(self): pass def register(self, object): return get_transaction(object).register(object) def __getattr__(self, name): if name not in ('register',): raise AttributeError ( name ) return self.register ################################# # storage and id generation _t={} def get_ob_connection(ob): conn = getattr(ob, '_p_jar', None) if conn and isinstance(conn,Connection): return conn if isinstance(ob, Connection): return ob raise TxnConnException ( " No Connection Found, this is bad:( " ) def txn_id_generator(conn): return id(conn) ################################# # external accessors def get_transaction(pobj=None, _t=_t, get=_t.get, None=None, txn_id=txn_id_generator, get_conn=get_ob_connection): if pobj is None: return TxnGuard() id=txn_id(get_conn(pobj)) t=get(id, None) if t is None: _t[id]=t=Transaction(id) return t def free_transaction(id, _t=_t): try: del _t[id] except KeyError: pass # thats it. below are hotpatches and installation functions ################################# # non patch patches aka HotPatch # patch finally to call free_transaction with self._id def txn_abort(self, subtransaction=0, freeme=1): '''Abort the transaction. This is called from the application. This means that we haven\'t entered two-phase commit yet, so no tpc_ messages are sent. ''' if subtransaction and (self._non_st_objects is not None): raise POSException.TransactionError, ( """Attempted to abort a sub-transaction, but a participating data manager doesn't support partial abort. """) t=v=tb=None subj=self._sub subjars=() if not subtransaction: # Must add in any non-subtransaction supporting objects that # may have been stowed away from previous subtransaction # commits. if self._non_st_objects is not None: append=self._objects.append for object in self._non_st_objects: append(object) self._non_st_objects = None if subj is not None: # Abort of top-level transaction after commiting # subtransactions. subjars=subj.values() self._sub=None try: # Abort the objects for o in self._objects: try: j=getattr(o, '_p_jar', o) if j is not None: j.abort(o, self) except: if t is None: t,v,tb=sys.exc_info() # Ugh, we need to abort work done in sub-transactions. while subjars: j=subjars.pop() j.abort_sub(self) # This should never fail if t is not None: raise t,v,tb finally: tb=None del self._objects[:] # Clear registered if not subtransaction and freeme: if self._id is not None: free_transaction(self._id) else: self._init() # patch finally to call free_transaction with self._id # hmm.. change hosed to TxnModule.hosed def txn_commit(self, subtransaction=None): 'Finalize the transaction' global hosed objects=self._objects jars={} jarsv = None subj=self._sub subjars=() if subtransaction: if subj is None: self._sub=subj={} else: if subj is not None: if objects: # Do an implicit sub-transaction commit: self.commit(1) objects=[] subjars=subj.values() self._sub=None # If not a subtransaction, then we need to add any non- # subtransaction-supporting objects that may have been # stowed away during subtransaction commits to _objects. if (subtransaction is None) and (self._non_st_objects is not None): append=objects.append for object in self._non_st_objects: append(object) self._non_st_objects = None t=v=tb=None if (objects or subjars) and TxnModule.hosed: # Something really bad happened and we don't # trust the system state. raise POSException.TransactionError, ( """A serious error, which was probably a system error, occurred in a previous database transaction. This application may be in an invalid state and must be restarted before database updates can be allowed. Beware though that if the error was due to a serious system problem, such as a disk full condition, then the application may not come up until you deal with the system problem. See your application log for information on the error that lead to this problem. """) try: # It's important that: # # - Every object in self._objects is either committed # or aborted. # # - For each object that is committed # we call tpc_begin on it's jar at least once # # - For every jar for which we've called tpc_begin on, # we either call tpc_abort or tpc_finish. It is OK # to call these multiple times, as the storage is # required to ignore these calls if tpc_begin has not # been called. ncommitted=0 try: for o in objects: j=getattr(o, '_p_jar', o) if j is not None: i=id(j) if not jars.has_key(i): jars[i]=j if subtransaction: # If a jar does not support subtransactions, # we need to save it away to be committed in # the outer transaction. try: j.tpc_begin(self, subtransaction) except TypeError: j.tpc_begin(self) if hasattr(j, 'commit_sub'): subj[i]=j else: if self._non_st_objects is None: self._non_st_objects = [] self._non_st_objects.append(o) continue else: j.tpc_begin(self) j.commit(o,self) ncommitted=ncommitted+1 # Commit work done in subtransactions while subjars: j=subjars.pop() i=id(j) if not jars.has_key(i): jars[i]=j j.commit_sub(self) jarsv = jars.values() for jar in jarsv: if not subtransaction: try: jar=jar.tpc_vote except: pass else: jar(self) # last chance to bail try: # Try to finish one jar, since we may be able to # recover if the first one fails. if jarsv: jarsv[-1].tpc_finish(self) # This should never fail jarsv.pop() # It didn't, so it's taken care of. except: # Bug if it does, we need to keep track of it LOG('ZODB', ERROR, "A storage error occurred in the last phase of a " "two-phase commit. This shouldn\'t happen. ", error=sys.exc_info()) raise try: while jarsv: jarsv[-1].tpc_finish(self) # This should never fail jarsv.pop() # It didn't, so it's taken care of. except: # Bug if it does, we need to yell FIRE! # Someone finished, so don't allow any more # work without at least a restart! TxnModule.hosed=1 LOG('ZODB', PANIC, "A storage error occurred in the last phase of a " "two-phase commit. This shouldn\'t happen. " "The application may be in a hosed state, so " "transactions will not be allowed to commit " "until the site/storage is reset by a restart. ", error=sys.exc_info()) raise except: t,v,tb=sys.exc_info() # Ugh, we got an got an error during commit, so we # have to clean up. # First, we have to abort any uncommitted objects. for o in objects[ncommitted:]: try: j=getattr(o, '_p_jar', o) if j is not None: j.abort(o, self) except: pass # Then, we unwind TPC for the jars that began it. if jarsv is None: jarsv = jars.values() for j in jarsv: try: j.tpc_abort(self) # This should never fail except: LOG('ZODB', ERROR, "A storage error occured during object abort " "This shouldn\'t happen. ", error=sys.exc_info()) # Ugh, we need to abort work done in sub-transactions. while subjars: j=subjars.pop() j.abort_sub(self) # This should never fail raise t,v,tb finally: tb=None del objects[:] # clear registered if not subtransaction and self._id is not None: free_transaction(self._id) # patch get_transaction to call with self def conn_sync(self): get_transaction(self).abort() sync=getattr(self._storage, 'sync', 0) if sync != 0: sync() self._cache.invalidate(self._invalidated) self._incrgc() # This is a good time to do some GC ################################# # installation def install_free_txn(): # replace txn functions TxnModule.free_transaction=free_transaction TxnModule.get_transaction=get_transaction ## Hot Patches # replace txn methods Transaction.abort = txn_abort Transaction.commit = txn_commit # replace conn methods Connection.sync = conn_sync # install globally import __main__ __main__.__builtins__.get_transaction = get_transaction