00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034 #include "TransactionalStore.h"
00035 #include "RDFException.h"
00036 #include "Debug.h"
00037
00038 #include <QMutex>
00039 #include <QMutexLocker>
00040 #include <QUrl>
00041
00042 #include <iostream>
00043 #include <memory>
00044
00045 using std::auto_ptr;
00046
00047 namespace Dataquay
00048 {
00049
00050 class TransactionalStore::D
00051 {
00063 enum Context { TxContext, NonTxContext };
00064
00065 public:
00066 D(TransactionalStore *ts, Store *store, DirectWriteBehaviour dwb) :
00067 m_ts(ts),
00068 m_store(store),
00069 m_dwb(dwb),
00070 m_currentTx(NoTransaction),
00071 m_context(NonTxContext) {
00072 }
00073
00074 ~D() {
00075 if (m_currentTx != NoTransaction) {
00076 std::cerr << "WARNING: TransactionalStore deleted with transaction ongoing" << std::endl;
00077 }
00078 }
00079
00080 Transaction *startTransaction() {
00081 QMutexLocker locker(&m_mutex);
00082 DEBUG << "TransactionalStore::startTransaction" << endl;
00083 if (m_currentTx != NoTransaction) {
00084 throw RDFException("ERROR: Attempt to start transaction when another transaction from the same thread is already in train");
00085 }
00086 Transaction *tx = new TSTransaction(this);
00087 m_currentTx = tx;
00088 return tx;
00089 }
00090
00091 void commitTransaction(Transaction *tx) {
00092 {
00093 QMutexLocker locker(&m_mutex);
00094 DEBUG << "TransactionalStore::commitTransaction" << endl;
00095 if (tx != m_currentTx) {
00096 throw RDFException("Transaction integrity error");
00097 }
00098 enterTransactionContext();
00099
00100
00101
00102
00103
00104 m_currentTx = NoTransaction;
00105 m_context = NonTxContext;
00106 }
00107 m_ts->transactionCommitted();
00108 }
00109
00110 void rollbackTransaction(Transaction *tx) {
00111 QMutexLocker locker(&m_mutex);
00112 DEBUG << "TransactionalStore::rollbackTransaction" << endl;
00113 if (tx != m_currentTx) {
00114 throw RDFException("Transaction integrity error");
00115 }
00116 leaveTransactionContext();
00117
00118
00119 m_currentTx = NoTransaction;
00120 }
00121
00122 class Operation
00123 {
00124 public:
00125 Operation(const D *d, const Transaction *tx) :
00126 m_d(d), m_tx(tx) {
00127 m_d->startOperation(m_tx);
00128 }
00129 ~Operation() {
00130 m_d->endOperation(m_tx);
00131 }
00132 private:
00133 const D *m_d;
00134 const Transaction *m_tx;
00135 };
00136
00137 class NonTransactionalAccess
00138 {
00139 public:
00140 NonTransactionalAccess(D *d) :
00141 m_d(d) {
00142 m_d->startNonTransactionalAccess();
00143 }
00144 ~NonTransactionalAccess() {
00145 m_d->endNonTransactionalAccess();
00146 }
00147 private:
00148 D *m_d;
00149 };
00150
00151 bool add(Transaction *tx, Triple t) {
00152 Operation op(this, tx);
00153 bool result = m_store->add(t);
00154 return result;
00155 }
00156
00157 bool remove(Transaction *tx, Triple t) {
00158 Operation op(this, tx);
00159 bool result = m_store->remove(t);
00160 return result;
00161 }
00162
00163 bool contains(const Transaction *tx, Triple t) const {
00164 Operation op(this, tx);
00165 bool result = m_store->contains(t);
00166 return result;
00167 }
00168
00169 Triples match(const Transaction *tx, Triple t) const {
00170 Operation op(this, tx);
00171 Triples result = m_store->match(t);
00172 return result;
00173 }
00174
00175 ResultSet query(const Transaction *tx, QString sparql) const {
00176 Operation op(this, tx);
00177 ResultSet result = m_store->query(sparql);
00178 return result;
00179 }
00180
00181 Triple matchFirst(const Transaction *tx, Triple t) const {
00182 Operation op(this, tx);
00183 Triple result = m_store->matchFirst(t);
00184 return result;
00185 }
00186
00187 Node queryFirst(const Transaction *tx, QString sparql,
00188 QString bindingName) const {
00189 Operation op(this, tx);
00190 Node result = m_store->queryFirst(sparql, bindingName);
00191 return result;
00192 }
00193
00194 QUrl getUniqueUri(const Transaction *tx, QString prefix) const {
00195 Operation op(this, tx);
00196 QUrl result = m_store->getUniqueUri(prefix);
00197 return result;
00198 }
00199
00200 QUrl expand(QString uri) const {
00201 return m_store->expand(uri);
00202 }
00203
00204 bool hasWrap() const {
00205 return m_dwb == AutoTransaction;
00206 }
00207
00208 void startNonTransactionalAccess() {
00209
00210
00211
00212
00213 m_mutex.lock();
00214 DEBUG << "TransactionalStore::startNonTransactionalAccess" << endl;
00215 if (m_context == NonTxContext) DEBUG << "(note: already in non-tx context)" << endl;
00216 leaveTransactionContext();
00217
00218 }
00219
00220 void endNonTransactionalAccess() {
00221
00222
00223
00224
00225 DEBUG << "TransactionalStore::endNonTransactionalAccess" << endl;
00226 m_mutex.unlock();
00227 }
00228
00229 Store *getStore() { return m_store; }
00230 const Store *getStore() const { return m_store; }
00231
00232 private:
00233
00234
00235
00236
00237 TransactionalStore *m_ts;
00238 mutable Store *m_store;
00239 DirectWriteBehaviour m_dwb;
00240 mutable QMutex m_mutex;
00241 const Transaction *m_currentTx;
00242 mutable Context m_context;
00243
00244 void startOperation(const Transaction *tx) const {
00245
00246
00247
00248
00249
00250 m_mutex.lock();
00251 if (tx != m_currentTx) {
00252 throw RDFException("Transaction integrity error");
00253 }
00254 enterTransactionContext();
00255 }
00256
00257 void endOperation(const Transaction *tx) const {
00258 if (tx != m_currentTx) {
00259 throw RDFException("Transaction integrity error");
00260 }
00261 m_mutex.unlock();
00262 }
00263
00264 void enterTransactionContext() const {
00265
00266
00267 if (m_context == TxContext) {
00268 return;
00269 }
00270 if (m_currentTx == NoTransaction) {
00271 return;
00272 }
00273 ChangeSet cs = m_currentTx->getChanges();
00274 if (!cs.empty()) {
00275 DEBUG << "TransactionalStore::enterTransactionContext: replaying" << endl;
00276 try {
00277 m_store->change(cs);
00278 } catch (RDFException e) {
00279 throw RDFException(QString("Failed to enter transaction context. Has the store been modified non-transactionally while a transaction was in progress? Original error is: %1").arg(e.what()));
00280 }
00281 }
00282 m_context = TxContext;
00283 }
00284
00285 void leaveTransactionContext() const {
00286
00287
00288 if (m_context == NonTxContext) {
00289 return;
00290 }
00291 if (m_currentTx == NoTransaction) {
00292 m_context = NonTxContext;
00293 return;
00294 }
00295 ChangeSet cs = m_currentTx->getChanges();
00296 if (!cs.empty()) {
00297 try {
00298 m_store->revert(cs);
00299 } catch (RDFException e) {
00300 throw RDFException(QString("Failed to leave transaction context. Has the store been modified non-transactionally while a transaction was in progress? Original error is: %1").arg(e.what()));
00301 }
00302 }
00303 m_context = NonTxContext;
00304 }
00305 };
00306
00307 class TransactionalStore::TSTransaction::D
00308 {
00309 public:
00310 D(Transaction *tx, TransactionalStore::D *td) :
00311 m_tx(tx), m_td(td), m_abandoned(false) {
00312 }
00313 ~D() {
00314 if (m_abandoned) {
00315 m_td->rollbackTransaction(m_tx);
00316 } else {
00317 m_td->commitTransaction(m_tx);
00318 }
00319 }
00320
00321 void abandon() const {
00322 DEBUG << "TransactionalStore::TSTransaction::abandon: Auto-rollback triggered by exception" << endl;
00323 m_abandoned = true;
00324 }
00325
00326 void check() const {
00327 if (m_abandoned) {
00328 throw RDFException("Transaction abandoned");
00329 }
00330 }
00331
00332 bool add(Triple t) {
00333 check();
00334 try {
00335 if (m_td->add(m_tx, t)) {
00336 m_changes.push_back(Change(AddTriple, t));
00337 return true;
00338 } else {
00339 return false;
00340 }
00341 } catch (RDFException) {
00342 abandon();
00343 throw;
00344 }
00345 }
00346
00347 bool remove(Triple t) {
00348 check();
00349 try {
00350
00351
00352
00353
00354
00355 Triples tt;
00356 bool wild = false;
00357 if (t.a.type == Node::Nothing ||
00358 t.b.type == Node::Nothing ||
00359 t.c.type == Node::Nothing) {
00360 tt = m_td->match(m_tx, t);
00361 wild = true;
00362 } else {
00363 tt.push_back(t);
00364 }
00365 for (int i = 0; i < tt.size(); ++i) {
00366 if (m_td->remove(m_tx, tt[i])) {
00367 m_changes.push_back(Change(RemoveTriple, tt[i]));
00368 return true;
00369 } else if (wild) {
00370 throw RDFException("Failed to remove matched statement in remove() with wildcards");
00371 }
00372 }
00373 return false;
00374 } catch (RDFException) {
00375 abandon();
00376 throw;
00377 }
00378 }
00379
00380 void change(ChangeSet cs) {
00381
00382
00383 for (int i = 0; i < cs.size(); ++i) {
00384 ChangeType type = cs[i].first;
00385 switch (type) {
00386 case AddTriple:
00387 if (!add(cs[i].second)) {
00388 throw RDFException("Change add failed due to duplication");
00389 }
00390 break;
00391 case RemoveTriple:
00392 if (!remove(cs[i].second)) {
00393 throw RDFException("Change remove failed due to absence");
00394 }
00395 break;
00396 }
00397 }
00398 }
00399
00400 void revert(ChangeSet cs) {
00401
00402
00403 for (int i = cs.size()-1; i >= 0; --i) {
00404 ChangeType type = cs[i].first;
00405 switch (type) {
00406 case AddTriple:
00407 if (!remove(cs[i].second)) {
00408 throw RDFException("Change revert add failed due to absence");
00409 }
00410 break;
00411 case RemoveTriple:
00412 if (!add(cs[i].second)) {
00413 throw RDFException("Change revert remove failed due to duplication");
00414 }
00415 break;
00416 }
00417 }
00418 }
00419
00420 bool contains(Triple t) const {
00421 check();
00422 try {
00423 return m_td->contains(m_tx, t);
00424 } catch (RDFException) {
00425 abandon();
00426 throw;
00427 }
00428 }
00429
00430 Triples match(Triple t) const {
00431 check();
00432 try {
00433 return m_td->match(m_tx, t);
00434 } catch (RDFException) {
00435 abandon();
00436 throw;
00437 }
00438 }
00439
00440 ResultSet query(QString sparql) const {
00441 check();
00442 try {
00443 return m_td->query(m_tx, sparql);
00444 } catch (RDFException) {
00445 abandon();
00446 throw;
00447 }
00448 }
00449
00450 Triple matchFirst(Triple t) const {
00451 check();
00452 try {
00453 return m_td->matchFirst(m_tx, t);
00454 } catch (RDFException) {
00455 abandon();
00456 throw;
00457 }
00458 }
00459
00460 Node queryFirst(QString sparql, QString bindingName) const {
00461 check();
00462 try {
00463 return m_td->queryFirst(m_tx, sparql, bindingName);
00464 } catch (RDFException) {
00465 abandon();
00466 throw;
00467 }
00468 }
00469
00470 QUrl getUniqueUri(QString prefix) const {
00471 check();
00472 try {
00473 return m_td->getUniqueUri(m_tx, prefix);
00474 } catch (RDFException) {
00475 abandon();
00476 throw;
00477 }
00478 }
00479
00480 QUrl expand(QString uri) const {
00481 return m_td->expand(uri);
00482 }
00483
00484 ChangeSet getChanges() const {
00485 return m_changes;
00486 }
00487
00488 void rollback() {
00489 check();
00490 DEBUG << "TransactionalStore::TSTransaction::rollback: Abandoning" << endl;
00491 m_abandoned = true;
00492 }
00493
00494 private:
00495 Transaction *m_tx;
00496 TransactionalStore::D *m_td;
00497 ChangeSet m_changes;
00498 mutable bool m_abandoned;
00499 };
00500
00501 TransactionalStore::TransactionalStore(Store *store, DirectWriteBehaviour dwb) :
00502 m_d(new D(this, store, dwb))
00503 {
00504 }
00505
00506 TransactionalStore::~TransactionalStore()
00507 {
00508 delete m_d;
00509 }
00510
00511 Transaction *
00512 TransactionalStore::startTransaction()
00513 {
00514 return m_d->startTransaction();
00515 }
00516
00517 bool
00518 TransactionalStore::add(Triple t)
00519 {
00520 if (!m_d->hasWrap()) {
00521 throw RDFException("TransactionalStore::add() called without Transaction");
00522 }
00523
00524 auto_ptr<Transaction> tx(startTransaction());
00525 return tx->add(t);
00526 }
00527
00528 bool
00529 TransactionalStore::remove(Triple t)
00530 {
00531 if (!m_d->hasWrap()) {
00532 throw RDFException("TransactionalStore::remove() called without Transaction");
00533 }
00534 auto_ptr<Transaction> tx(startTransaction());
00535 return tx->remove(t);
00536 }
00537
00538 void
00539 TransactionalStore::change(ChangeSet cs)
00540 {
00541 if (!m_d->hasWrap()) {
00542 throw RDFException("TransactionalStore::change() called without Transaction");
00543 }
00544 auto_ptr<Transaction> tx(startTransaction());
00545 tx->change(cs);
00546 }
00547
00548 void
00549 TransactionalStore::revert(ChangeSet cs)
00550 {
00551 if (!m_d->hasWrap()) {
00552 throw RDFException("TransactionalStore::revert() called without Transaction");
00553 }
00554 auto_ptr<Transaction> tx(startTransaction());
00555 tx->revert(cs);
00556 }
00557
00558 bool
00559 TransactionalStore::contains(Triple t) const
00560 {
00561 D::NonTransactionalAccess ntxa(m_d);
00562 bool result = m_d->getStore()->contains(t);
00563 return result;
00564 }
00565
00566 Triples
00567 TransactionalStore::match(Triple t) const
00568 {
00569 D::NonTransactionalAccess ntxa(m_d);
00570 Triples result = m_d->getStore()->match(t);
00571 return result;
00572 }
00573
00574 ResultSet
00575 TransactionalStore::query(QString s) const
00576 {
00577 D::NonTransactionalAccess ntxa(m_d);
00578 ResultSet result = m_d->getStore()->query(s);
00579 return result;
00580 }
00581
00582 Triple
00583 TransactionalStore::matchFirst(Triple t) const
00584 {
00585 D::NonTransactionalAccess ntxa(m_d);
00586 Triple result = m_d->getStore()->matchFirst(t);
00587 return result;
00588 }
00589
00590 Node
00591 TransactionalStore::queryFirst(QString s, QString b) const
00592 {
00593 D::NonTransactionalAccess ntxa(m_d);
00594 Node result = m_d->getStore()->queryFirst(s, b);
00595 return result;
00596 }
00597
00598 QUrl
00599 TransactionalStore::getUniqueUri(QString prefix) const
00600 {
00601 D::NonTransactionalAccess ntxa(m_d);
00602 QUrl result = m_d->getStore()->getUniqueUri(prefix);
00603 return result;
00604 }
00605
00606 QUrl
00607 TransactionalStore::expand(QString uri) const
00608 {
00609 return m_d->expand(uri);
00610 }
00611
00612 TransactionalStore::TSTransaction::TSTransaction(TransactionalStore::D *td) :
00613 m_d(new D(this, td))
00614 {
00615 }
00616
00617 TransactionalStore::TSTransaction::~TSTransaction()
00618 {
00619 delete m_d;
00620 }
00621
00622 bool
00623 TransactionalStore::TSTransaction::add(Triple t)
00624 {
00625 return m_d->add(t);
00626 }
00627
00628 bool
00629 TransactionalStore::TSTransaction::remove(Triple t)
00630 {
00631 return m_d->remove(t);
00632 }
00633
00634 void
00635 TransactionalStore::TSTransaction::change(ChangeSet cs)
00636 {
00637 m_d->change(cs);
00638 }
00639
00640 void
00641 TransactionalStore::TSTransaction::revert(ChangeSet cs)
00642 {
00643 m_d->revert(cs);
00644 }
00645
00646 bool
00647 TransactionalStore::TSTransaction::contains(Triple t) const
00648 {
00649 return m_d->contains(t);
00650 }
00651
00652 Triples
00653 TransactionalStore::TSTransaction::match(Triple t) const
00654 {
00655 return m_d->match(t);
00656 }
00657
00658 ResultSet
00659 TransactionalStore::TSTransaction::query(QString sparql) const
00660 {
00661 return m_d->query(sparql);
00662 }
00663
00664 Triple
00665 TransactionalStore::TSTransaction::matchFirst(Triple t) const
00666 {
00667 return m_d->matchFirst(t);
00668 }
00669
00670 Node
00671 TransactionalStore::TSTransaction::queryFirst(QString sparql,
00672 QString bindingName) const
00673 {
00674 return m_d->queryFirst(sparql, bindingName);
00675 }
00676
00677 QUrl
00678 TransactionalStore::TSTransaction::getUniqueUri(QString prefix) const
00679 {
00680 return m_d->getUniqueUri(prefix);
00681 }
00682
00683 QUrl
00684 TransactionalStore::TSTransaction::expand(QString uri) const
00685 {
00686 return m_d->expand(uri);
00687 }
00688
00689 ChangeSet
00690 TransactionalStore::TSTransaction::getChanges() const
00691 {
00692 return m_d->getChanges();
00693 }
00694
00695 void
00696 TransactionalStore::TSTransaction::rollback()
00697 {
00698 m_d->rollback();
00699 }
00700
00701 }
00702