aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Vrátil <dvratil@kde.org>2016-08-15 12:04:01 (GMT)
committerDaniel Vrátil <dvratil@kde.org>2016-08-15 12:18:01 (GMT)
commit39bcc7d5bb101188f14c4327c002f374e92d8ffa (patch)
tree15defd7c2a5d1fd974afbd742da148be483e0c27
parent77da41c2431ed9403779aab24f7b6d9abdc74a49 (diff)
Fix a loop in ItemRetriever and make it interactive, add a unit test
Fix a loop in ItemRetriever when we have more parts available than requested as well as when only some requested parts need to be retrieved. Make ItemRetriever interactive by emitting a signal with list of already retrieved items and using QEventLoop to block inside ItemRetriever::exec() instead of blocking on a QWaitCondition inside ItemRetrievalManager. Add a unit-test with several different scenarios.
-rw-r--r--autotests/server/dbinitializer.cpp22
-rw-r--r--autotests/server/dbinitializer.h1
-rw-r--r--autotests/server/itemretrievertest.cpp276
-rw-r--r--src/server/storage/itemretrievaljob.cpp18
-rw-r--r--src/server/storage/itemretrievaljob.h37
-rw-r--r--src/server/storage/itemretrievalmanager.cpp49
-rw-r--r--src/server/storage/itemretrievalmanager.h19
-rw-r--r--src/server/storage/itemretriever.cpp104
-rw-r--r--src/server/storage/itemretriever.h11
9 files changed, 477 insertions, 60 deletions
diff --git a/autotests/server/dbinitializer.cpp b/autotests/server/dbinitializer.cpp
index 029bd4e..8b34790 100644
--- a/autotests/server/dbinitializer.cpp
+++ b/autotests/server/dbinitializer.cpp
@@ -21,6 +21,7 @@
#include <storage/querybuilder.h>
#include <storage/datastore.h>
+#include <storage/parttypehelper.h>
using namespace Akonadi;
using namespace Akonadi::Server;
@@ -76,6 +77,22 @@ PimItem DbInitializer::createItem(const char *name, const Collection &parent)
return item;
}
+Part DbInitializer::createPart(qint64 pimItem, const QByteArray &partName, const QByteArray &partData)
+{
+ auto partType = PartTypeHelper::parseFqName(QString::fromLatin1(partName));
+ PartType type = PartType::retrieveByFQNameOrCreate(partType.first, partType.second);
+
+ Part part;
+ part.setPimItemId(pimItem);
+ part.setPartTypeId(type.id());
+ part.setData(partData);
+ part.setDatasize(partData.size());
+ const bool ret = part.insert();
+ Q_ASSERT(ret);
+ Q_UNUSED(ret);
+ return part;
+}
+
QByteArray DbInitializer::toByteArray(bool enabled)
{
if (enabled) {
@@ -178,7 +195,7 @@ void DbInitializer::cleanup()
if (DataStore::self()->database().isOpen()) {
{
- QueryBuilder qb( Relation::tableName(), QueryBuilder::Delete );
+ QueryBuilder qb(Relation::tableName(), QueryBuilder::Delete);
qb.exec();
}
{
@@ -191,6 +208,9 @@ void DbInitializer::cleanup()
}
}
+ Q_FOREACH(Part part, Part::retrieveAll()) {
+ part.remove();
+ }
Q_FOREACH(PimItem item, PimItem::retrieveAll()) {
item.remove();
}
diff --git a/autotests/server/dbinitializer.h b/autotests/server/dbinitializer.h
index b7da20b..ec25d24 100644
--- a/autotests/server/dbinitializer.h
+++ b/autotests/server/dbinitializer.h
@@ -30,6 +30,7 @@ public:
Akonadi::Server::Collection createCollection(const char *name,
const Akonadi::Server::Collection &parent = Akonadi::Server::Collection());
Akonadi::Server::PimItem createItem(const char *name, const Akonadi::Server::Collection &parent);
+ Akonadi::Server::Part createPart(qint64 pimitemId, const QByteArray &partname, const QByteArray &data);
QByteArray toByteArray(bool enabled);
QByteArray toByteArray(Akonadi::Tristate tristate);
Akonadi::Protocol::FetchCollectionsResponse listResponse(const Akonadi::Server::Collection &col,
diff --git a/autotests/server/itemretrievertest.cpp b/autotests/server/itemretrievertest.cpp
index 2a73e7a..bd79e83 100644
--- a/autotests/server/itemretrievertest.cpp
+++ b/autotests/server/itemretrievertest.cpp
@@ -19,15 +19,158 @@
#include <QObject>
#include <QtTest/QTest>
-#include <QtCore/QBuffer>
+#include <QTimer>
#include "storage/itemretriever.h"
+#include "storage/itemretrievaljob.h"
+#include "storage/itemretrievalmanager.h"
+#include "storage/itemretrievalrequest.h"
+
+#include "fakeakonadiserver.h"
+#include "dbinitializer.h"
+
+#include <aktest.h>
using namespace Akonadi::Server;
+struct JobResult
+{
+ qint64 pimItemId;
+ QByteArray partname;
+ QByteArray partdata;
+ QString error;
+};
+
+class FakeItemRetrievalJob : public AbstractItemRetrievalJob
+{
+ Q_OBJECT
+public:
+ FakeItemRetrievalJob(ItemRetrievalRequest *req, DbInitializer &dbInitializer,
+ const QVector<JobResult> &results, QObject *parent)
+ : AbstractItemRetrievalJob(req, parent)
+ , mDbInitializer(dbInitializer)
+ , mResults(results)
+ {
+ }
+
+ void start() Q_DECL_OVERRIDE
+ {
+ Q_FOREACH (const JobResult &res, mResults) {
+ if (res.error.isEmpty()) {
+ // This is analogous to what STORE/MERGE does
+ const PimItem item = PimItem::retrieveById(res.pimItemId);
+ const auto parts = item.parts();
+ // Try to find the part by name
+ auto it = std::find_if(parts.begin(), parts.end(),
+ [res](const Part &part) {
+ return part.partType().name().toLatin1() == res.partname;
+ });
+ if (it == parts.end()) {
+ // Does not exist, create it
+ mDbInitializer.createPart(res.pimItemId, "PLD:" + res.partname, res.partdata);
+ } else {
+ // Exist, update it
+ Part part(*it);
+ part.setData(res.partdata);
+ part.setDatasize(res.partdata.size());
+ part.update();
+ }
+ } else {
+ mError = res.error;
+ break;
+ }
+ }
+
+ QTimer::singleShot(0, this, [this]() {
+ Q_EMIT requestCompleted(m_request, mError);
+ });
+ }
+
+ void kill() Q_DECL_OVERRIDE
+ {
+ // TODO?
+ }
+
+private:
+ DbInitializer &mDbInitializer;
+ QVector<JobResult> mResults;
+ QString mError;
+};
+
+class FakeItemRetrievalJobFactory : public AbstractItemRetrievalJobFactory
+{
+public:
+ FakeItemRetrievalJobFactory(DbInitializer &initializer)
+ : mJobsCount(0)
+ , mDbInitializer(initializer)
+ {
+ }
+
+ void addJobResult(qint64 itemId, const QByteArray &partname, const QByteArray &partdata)
+ {
+ mJobResults.insert(itemId, JobResult{ itemId, partname, partdata, QString() });
+ }
+
+ void addJobResult(qint64 itemId, const QString &error)
+ {
+ mJobResults.insert(itemId, JobResult{ itemId, QByteArray(), QByteArray(), error });
+ }
+
+ AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest *request, QObject *parent) Q_DECL_OVERRIDE
+ {
+ QVector<JobResult> results;
+ Q_FOREACH (auto id, request->ids) {
+ auto it = mJobResults.constFind(id);
+ while (it != mJobResults.constEnd() && it.key() == id) {
+ if (request->parts.contains(it->partname)) {
+ results << *it;
+ }
+ ++it;
+ }
+ }
+
+ ++mJobsCount;
+ return new FakeItemRetrievalJob(request, mDbInitializer, results, parent);
+ }
+
+ int jobsCount() const
+ {
+ return mJobsCount;
+ }
+
+private:
+ int mJobsCount;
+ DbInitializer &mDbInitializer;
+ QMultiHash<qint64, JobResult> mJobResults;
+};
+
class ItemRetrieverTest : public QObject
{
Q_OBJECT
+
+
+ using ExistingParts = QVector<QPair<QByteArray /* name */, QByteArray /* data */>>;
+ using AvailableParts = QVector<QPair<QByteArray /* name */, QByteArray /* data */>>;
+ using RequestedParts = QVector<QByteArray /* FQ name */>;
+
+public:
+ ItemRetrieverTest()
+ : QObject()
+ {
+ try {
+ FakeAkonadiServer::instance()->setPopulateDb(false);
+ FakeAkonadiServer::instance()->init();
+ } catch (const FakeAkonadiServerException &e) {
+ qWarning() << "Server exception: " << e.what();
+ qFatal("Fake Akonadi Server failed to start up, aborting test");
+ }
+ }
+
+ ~ItemRetrieverTest()
+ {
+ FakeAkonadiServer::instance()->quit();
+ }
+
private Q_SLOTS:
void testFullPayload()
{
@@ -38,8 +181,137 @@ private Q_SLOTS:
r1.setRetrieveParts({ "PLD:FOO" });
QCOMPARE(r1.retrieveParts().size(), 2);
}
+
+ void testRetrieval_data()
+ {
+ QTest::addColumn<ExistingParts>("existingParts");
+ QTest::addColumn<AvailableParts>("availableParts");
+ QTest::addColumn<RequestedParts>("requestedParts");
+ QTest::addColumn<int>("expectedRetrievalJobs");
+ QTest::addColumn<int>("expectedSignals");
+ QTest::addColumn<int>("expectedParts");
+
+ QTest::newRow("should retrieve missing payload part")
+ << ExistingParts()
+ << AvailableParts{ { "RFC822", "somedata" } }
+ << RequestedParts{ "PLD:RFC822" }
+ << 1 << 1 << 1;
+
+ QTest::newRow("should retrieve multiple missing payload parts")
+ << ExistingParts()
+ << AvailableParts{ { "RFC822", "somedata" }, { "HEAD", "head" } }
+ << RequestedParts{ "PLD:HEAD", "PLD:RFC822" }
+ << 1 << 1 << 2;
+
+ QTest::newRow("should not retrieve existing payload part")
+ << ExistingParts{ { "PLD:RFC822", "somedata" } }
+ << AvailableParts()
+ << RequestedParts{ "PLD:RFC822" }
+ << 0 << 1 << 1;
+
+ QTest::newRow("should not retrieve multiple existing payload parts")
+ << ExistingParts{ { "PLD:RFC822", "somedata" }, { "PLD:HEAD", "head" } }
+ << AvailableParts()
+ << RequestedParts{ "PLD:RFC822", "PLD:HEAD" }
+ << 0 << 1 << 2;
+
+ QTest::newRow("should retrieve missing but not existing payload part")
+ << ExistingParts{ { "PLD:HEAD", "head" } }
+ << AvailableParts{ { "RFC822", "somedata" } }
+ << RequestedParts{ "PLD:HEAD", "PLD:RFC822" }
+ << 1 << 1 << 2;
+
+ QTest::newRow("should retrieve expired payload part")
+ << ExistingParts{ { "PLD:RFC822", QByteArray() } }
+ << AvailableParts{ { "RFC822", "somedata" } }
+ << RequestedParts{ "PLD:RFc822" }
+ << 1 << 1 << 1;
+
+ QTest::newRow("should not retrieve one out of multiple existing payload parts")
+ << ExistingParts{ { "PLD:RFC822", "somedata" }, { "PLD:HEAD", "head" }, { "PLD:ENVELOPE", "envelope" } }
+ << AvailableParts()
+ << RequestedParts{ "PLD:RFC822", "PLD:HEAD" }
+ << 0 << 1 << 3;
+
+ QTest::newRow("should retrieve missing payload part and ignore attributes")
+ << ExistingParts{ { "ATR:MYATTR", "myattrdata" } }
+ << AvailableParts{ { "RFC822", "somedata" } }
+ << RequestedParts{ "PLD:RFC822" }
+ << 1 << 1 << 2;
+ }
+
+ void testRetrieval()
+ {
+ QFETCH(ExistingParts, existingParts);
+ QFETCH(AvailableParts, availableParts);
+ QFETCH(RequestedParts, requestedParts);
+ QFETCH(int, expectedRetrievalJobs);
+ QFETCH(int, expectedSignals);
+ QFETCH(int, expectedParts);
+
+
+ // Setup
+ DbInitializer dbInitializer;
+ FakeItemRetrievalJobFactory factory(dbInitializer);
+ ItemRetrievalManager mgr(&factory);
+ QTest::qWait(100);
+
+ // Given a PimItem with existing parts
+ Resource res = dbInitializer.createResource("testresource");
+ Collection col = dbInitializer.createCollection("col1");
+ PimItem item = dbInitializer.createItem("1", col);
+ Q_FOREACH (const auto &existingPart, existingParts) {
+ dbInitializer.createPart(item.id(), existingPart.first, existingPart.second);
+ }
+
+ Q_FOREACH (const auto &availablePart, availableParts) {
+ factory.addJobResult(item.id(), availablePart.first, availablePart.second);
+ }
+
+ // ItemRetriever should...
+ ItemRetriever retriever;
+ retriever.setItem(item.id());
+ retriever.setRetrieveParts(requestedParts);
+ QSignalSpy spy(&retriever, &ItemRetriever::itemsRetrieved);
+
+ // Succeed
+ QVERIFY(retriever.exec());
+ // Run exactly one retrieval job
+ QCOMPARE(factory.jobsCount(), expectedRetrievalJobs);
+ // Emit exactly one signal ...
+ QCOMPARE(spy.count(), expectedSignals);
+ // ... with that one item
+ if (expectedSignals > 0) {
+ QCOMPARE(spy.at(0).at(0).value<QList<qint64>>(), QList<qint64>{ item.id() });
+ }
+
+ // and the part exists in the DB
+ const auto parts = item.parts();
+ QCOMPARE(parts.count(), expectedParts);
+ Q_FOREACH (const Part &dbPart, item.parts()) {
+ const QString fqname = dbPart.partType().ns() + QLatin1Char(':') + dbPart.partType().name();
+ if (!requestedParts.contains(fqname.toLatin1())) {
+ continue;
+ }
+
+ auto it = std::find_if(availableParts.constBegin(), availableParts.constEnd(),
+ [dbPart](const QPair<QByteArray, QByteArray> &p) {
+ return dbPart.partType().name().toLatin1() == p.first;
+ });
+ if (it == availableParts.constEnd()) {
+ it = std::find_if(existingParts.constBegin(), existingParts.constEnd(),
+ [fqname](const QPair<QByteArray, QByteArray> &p) {
+ return fqname.toLatin1() == p.first;
+ });
+ QVERIFY(it != existingParts.constEnd());
+ }
+
+ QCOMPARE(dbPart.data(), it->second);
+ QCOMPARE(dbPart.datasize(), it->second.size());
+ }
+ }
};
-QTEST_MAIN(ItemRetrieverTest)
+AKTEST_FAKESERVER_MAIN(ItemRetrieverTest)
#include "itemretrievertest.moc"
diff --git a/src/server/storage/itemretrievaljob.cpp b/src/server/storage/itemretrievaljob.cpp
index 662cbee..c699faa 100644
--- a/src/server/storage/itemretrievaljob.cpp
+++ b/src/server/storage/itemretrievaljob.cpp
@@ -27,21 +27,31 @@
using namespace Akonadi::Server;
+AbstractItemRetrievalJob::AbstractItemRetrievalJob(ItemRetrievalRequest *req, QObject *parent)
+ : QObject(parent)
+ , m_request(req)
+{
+}
+
+AbstractItemRetrievalJob::~AbstractItemRetrievalJob()
+{
+}
+
+
ItemRetrievalJob::~ItemRetrievalJob()
{
Q_ASSERT(!m_active);
}
-void ItemRetrievalJob::start(org::freedesktop::Akonadi::Resource *interface)
+void ItemRetrievalJob::start()
{
Q_ASSERT(m_request);
qCDebug(AKONADISERVER_LOG) << "processing retrieval request for item" << m_request->ids << " parts:" << m_request->parts << " of resource:" << m_request->resourceId;
- m_interface = interface;
// call the resource
- if (interface) {
+ if (m_interface) {
m_active = true;
- auto reply = interface->requestItemDelivery(m_request->ids, m_request->parts);
+ auto reply = m_interface->requestItemDelivery(m_request->ids, m_request->parts);
QDBusPendingCallWatcher *watcher = new QDBusPendingCallWatcher(reply, this);
connect(watcher, &QDBusPendingCallWatcher::finished,
this, &ItemRetrievalJob::callFinished);
diff --git a/src/server/storage/itemretrievaljob.h b/src/server/storage/itemretrievaljob.h
index 1909899..8bf01d7 100644
--- a/src/server/storage/itemretrievaljob.h
+++ b/src/server/storage/itemretrievaljob.h
@@ -31,30 +31,49 @@ namespace Server {
class ItemRetrievalRequest;
+class AbstractItemRetrievalJob : public QObject
+{
+ Q_OBJECT
+public:
+ AbstractItemRetrievalJob(ItemRetrievalRequest *req, QObject *parent);
+ virtual ~AbstractItemRetrievalJob();
+
+ virtual void start() = 0;
+ virtual void kill() = 0;
+
+Q_SIGNALS:
+ void requestCompleted(ItemRetrievalRequest *request, const QString &errorMsg);
+
+protected:
+ ItemRetrievalRequest *m_request;
+};
+
+
/// Async D-Bus retrieval, no modification of the request (thus no need for locking)
-class ItemRetrievalJob : public QObject
+class ItemRetrievalJob : public AbstractItemRetrievalJob
{
Q_OBJECT
public:
ItemRetrievalJob(ItemRetrievalRequest *req, QObject *parent)
- : QObject(parent)
- , m_request(req)
+ : AbstractItemRetrievalJob(req, parent)
, m_active(false)
, m_interface(0)
{
}
- ~ItemRetrievalJob();
- void start(OrgFreedesktopAkonadiResourceInterface *interface);
- void kill();
-Q_SIGNALS:
- void requestCompleted(ItemRetrievalRequest *req, const QString &errorMsg);
+ void setInterface(OrgFreedesktopAkonadiResourceInterface *interface)
+ {
+ m_interface = interface;
+ }
+
+ ~ItemRetrievalJob() Q_DECL_OVERRIDE;
+ void start() Q_DECL_OVERRIDE;
+ void kill() Q_DECL_OVERRIDE;
private Q_SLOTS:
void callFinished(QDBusPendingCallWatcher *watcher);
private:
- ItemRetrievalRequest *m_request;
bool m_active;
OrgFreedesktopAkonadiResourceInterface *m_interface;
diff --git a/src/server/storage/itemretrievalmanager.cpp b/src/server/storage/itemretrievalmanager.cpp
index 88e91b9..65f5e78 100644
--- a/src/server/storage/itemretrievalmanager.cpp
+++ b/src/server/storage/itemretrievalmanager.cpp
@@ -39,8 +39,23 @@ using namespace Akonadi::Server;
ItemRetrievalManager *ItemRetrievalManager::sInstance = 0;
+class ItemRetrievalJobFactory : public AbstractItemRetrievalJobFactory
+{
+ AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest *request, QObject *parent) Q_DECL_OVERRIDE
+ {
+ return new ItemRetrievalJob(request, parent);
+ }
+};
+
+
ItemRetrievalManager::ItemRetrievalManager(QObject *parent)
+ : ItemRetrievalManager(new ItemRetrievalJobFactory, parent)
+{
+}
+
+ItemRetrievalManager::ItemRetrievalManager(AbstractItemRetrievalJobFactory *factory, QObject *parent)
: AkThread(QThread::HighPriority, parent)
+ , mJobFactory(factory)
{
qDBusRegisterMetaType<QByteArrayList>();
@@ -126,16 +141,6 @@ org::freedesktop::Akonadi::Resource *ItemRetrievalManager::resourceInterface(con
}
// called from any thread
-void ItemRetrievalManager::requestItemDelivery(qint64 uid, const QString &resource, const QVector<QByteArray> &parts)
-{
- ItemRetrievalRequest *req = new ItemRetrievalRequest();
- req->ids << uid;
- req->resourceId = resource;
- req->parts = parts.toList();
-
- requestItemDelivery(req);
-}
-
void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest *req)
{
mLock->lockForWrite();
@@ -146,7 +151,7 @@ void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest *req)
mLock->unlock();
Q_EMIT requestAdded();
-
+#if 0
mLock->lockForRead();
Q_FOREVER {
//qCDebug(AKONADISERVER_LOG) << "checking if request for item" << req->id << "has been processed...";
@@ -170,16 +175,16 @@ void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest *req)
}
throw ItemRetrieverException("WTF?");
+#endif
}
// called within the retrieval thread
void ItemRetrievalManager::processRequest()
{
- QVector<QPair<ItemRetrievalJob *, QString> > newJobs;
-
+ QVector<QPair<AbstractItemRetrievalJob *, QString> > newJobs;
mLock->lockForWrite();
// look for idle resources
- for (QHash< QString, QList< ItemRetrievalRequest *> >::iterator it = mPendingRequests.begin(); it != mPendingRequests.end();) {
+ for (auto it = mPendingRequests.begin(); it != mPendingRequests.end();) {
if (it.value().isEmpty()) {
it = mPendingRequests.erase(it);
continue;
@@ -188,11 +193,12 @@ void ItemRetrievalManager::processRequest()
// TODO: check if there is another one for the same uid with more parts requested
ItemRetrievalRequest *req = it.value().takeFirst();
Q_ASSERT(req->resourceId == it.key());
- ItemRetrievalJob *job = new ItemRetrievalJob(req, this);
- connect(job, &ItemRetrievalJob::requestCompleted, this, &ItemRetrievalManager::retrievalJobFinished);
+ AbstractItemRetrievalJob *job = mJobFactory->retrievalJob(req, this);
+ connect(job, &AbstractItemRetrievalJob::requestCompleted, this, &ItemRetrievalManager::retrievalJobFinished);
mCurrentJobs.insert(req->resourceId, job);
// delay job execution until after we unlocked the mutex, since the job can emit the finished signal immediately in some cases
newJobs.append(qMakePair(job, req->resourceId));
+ qCDebug(AKONADISERVER_LOG) << "ItemRetrievalJob" << job << "started for request" << req;
}
++it;
}
@@ -201,17 +207,20 @@ void ItemRetrievalManager::processRequest()
mLock->unlock();
if (nothingGoingOn) { // someone asked as to process requests although everything is done already, he might still be waiting
- mWaitCondition->wakeAll();
return;
}
- for (QVector<QPair<ItemRetrievalJob *, QString> >::const_iterator it = newJobs.constBegin(); it != newJobs.constEnd(); ++it) {
- (*it).first->start(resourceInterface((*it).second));
+ for (auto it = newJobs.constBegin(), end = newJobs.constEnd(); it != end; ++it) {
+ if (ItemRetrievalJob *j = qobject_cast<ItemRetrievalJob*>((*it).first)) {
+ j->setInterface(resourceInterface((*it).second));
+ }
+ (*it).first->start();
}
}
void ItemRetrievalManager::retrievalJobFinished(ItemRetrievalRequest *request, const QString &errorMsg)
{
+ qCDebug(AKONADISERVER_LOG) << "ItemRetrievalJob finished for request" << request << ", error:" << errorMsg;
mLock->lockForWrite();
request->errorMsg = errorMsg;
request->processed = true;
@@ -228,8 +237,8 @@ void ItemRetrievalManager::retrievalJobFinished(ItemRetrievalRequest *request, c
++it;
}
}
- mWaitCondition->wakeAll();
mLock->unlock();
+ Q_EMIT requestFinished(request);
Q_EMIT requestAdded(); // trigger processRequest() again, in case there is more in the queues
}
diff --git a/src/server/storage/itemretrievalmanager.h b/src/server/storage/itemretrievalmanager.h
index 80d3fc4..529d550 100644
--- a/src/server/storage/itemretrievalmanager.h
+++ b/src/server/storage/itemretrievalmanager.h
@@ -38,6 +38,16 @@ namespace Server {
class Collection;
class ItemRetrievalJob;
class ItemRetrievalRequest;
+class AbstractItemRetrievalJob;
+
+class AbstractItemRetrievalJobFactory
+{
+public:
+ explicit AbstractItemRetrievalJobFactory() {}
+ virtual ~AbstractItemRetrievalJobFactory() {}
+
+ virtual AbstractItemRetrievalJob *retrievalJob(ItemRetrievalRequest *request, QObject *parent) = 0;
+};
/** Manages and processes item retrieval requests. */
class ItemRetrievalManager : public AkThread
@@ -45,10 +55,9 @@ class ItemRetrievalManager : public AkThread
Q_OBJECT
public:
ItemRetrievalManager(QObject *parent = Q_NULLPTR);
+ ItemRetrievalManager(AbstractItemRetrievalJobFactory *factory, QObject *parent = Q_NULLPTR);
~ItemRetrievalManager();
- void requestItemDelivery(qint64 uid, const QString &resource, const QVector<QByteArray> &parts);
-
/**
* Added for convenience. ItemRetrievalManager takes ownership over the
* pointer and deletes it when the request is processed.
@@ -58,6 +67,7 @@ public:
static ItemRetrievalManager *instance();
Q_SIGNALS:
+ void requestFinished(ItemRetrievalRequest *request);
void requestAdded();
private:
@@ -74,6 +84,9 @@ private Q_SLOTS:
private:
static ItemRetrievalManager *sInstance;
+
+ AbstractItemRetrievalJobFactory *mJobFactory;
+
/// Protects mPendingRequests and every Request object posted to it
QReadWriteLock *mLock;
/// Used to let requesting threads wait until the request has been processed
@@ -81,7 +94,7 @@ private:
/// Pending requests queues, one per resource
QHash<QString, QList<ItemRetrievalRequest *> > mPendingRequests;
/// Currently running jobs, one per resource
- QHash<QString, ItemRetrievalJob *> mCurrentJobs;
+ QHash<QString, AbstractItemRetrievalJob *> mCurrentJobs;
// resource dbus interface cache
QHash<QString, OrgFreedesktopAkonadiResourceInterface *> mResourceInterfaces;
diff --git a/src/server/storage/itemretriever.cpp b/src/server/storage/itemretriever.cpp
index 1d04501..22994af 100644
--- a/src/server/storage/itemretriever.cpp
+++ b/src/server/storage/itemretriever.cpp
@@ -33,6 +33,9 @@
#include <private/protocol_p.h>
+#include <QMetaObject>
+#include <QEventLoop>
+
#include "akonadiserver_debug.h"
using namespace Akonadi;
@@ -187,6 +190,18 @@ QSqlQuery ItemRetriever::buildQuery() const
return qb.query();
}
+namespace {
+static bool hasAllParts(ItemRetrievalRequest *req, const QSet<QByteArray> &availableParts)
+{
+ Q_FOREACH (const auto &part, req->parts) {
+ if (!availableParts.contains(part)) {
+ return false;
+ }
+ }
+ return true;
+}
+}
+
bool ItemRetriever::exec()
{
if (mParts.isEmpty() && !mFullPayload) {
@@ -207,22 +222,38 @@ bool ItemRetriever::exec()
QVector<ItemRetrievalRequest *> requests;
QHash<qint64 /* collection */, ItemRetrievalRequest*> colRequests;
QHash<qint64 /* item */, ItemRetrievalRequest*> itemRequests;
+ QVector<qint64> readyItems;
qint64 prevPimItemId = -1;
+ QSet<QByteArray> availableParts;
+ ItemRetrievalRequest *lastRequest = Q_NULLPTR;
while (query.isValid()) {
const qint64 pimItemId = query.value(PimItemIdColumn).toLongLong();
const qint64 collectionId = query.value(CollectionIdColumn).toLongLong();
const qint64 resourceId = query.value(ResourceIdColumn).toLongLong();
- ItemRetrievalRequest *lastRequest = Q_NULLPTR;
const auto itemIter = itemRequests.constFind(pimItemId);
- if (pimItemId == prevPimItemId && query.value(PartTypeNameColumn).isNull()) {
- // This is not the first part of the Item we saw, but LEFT JOIN PartTable
- // returned a null row - that means the row is an ATR part
- // which we don't care about
- query.next();
- continue;
+ if (pimItemId == prevPimItemId) {
+ if (query.value(PartTypeNameColumn).isNull()) {
+ // This is not the first part of the Item we saw, but LEFT JOIN PartTable
+ // returned a null row - that means the row is an ATR part
+ // which we don't care about
+ query.next();
+ continue;
+ }
+ } else {
+ if (lastRequest) {
+ if (hasAllParts(lastRequest, availableParts)) {
+ // We went through all parts of a single item, if we have all
+ // parts available in the DB and they are not expired, then
+ // exclude this item from the retrieval
+ lastRequest->ids.removeOne(prevPimItemId);
+ itemRequests.remove(prevPimItemId);
+ readyItems.push_back(prevPimItemId);
+ }
+ }
+ availableParts.clear();
+ prevPimItemId = pimItemId;
}
- prevPimItemId = pimItemId;
if (itemIter != itemRequests.constEnd()) {
lastRequest = *itemIter;
@@ -261,36 +292,69 @@ bool ItemRetriever::exec()
lastRequest->parts << partName;
}
} else {
- // This particular item already has this particular part. If that's
- // the only part we are requesting so far, then just remove the item
- // from the queue.
- if (lastRequest->parts.size() == 1 && lastRequest->parts.at(0) == partName) {
- // TODO: Is this too expensive? Should we use a QSet?
- lastRequest->ids.removeOne(pimItemId);
- itemRequests.remove(pimItemId);
- }
+ // add the part to list of available parts, we will compare it with
+ // the list of request parts once we handle all parts of this item
+ availableParts.insert(partName);
}
query.next();
}
- //qCDebug(AKONADISERVER_LOG) << "Closing queries and sending out requests.";
+ // Post-check in case we only queried one item thus did not reach the check
+ // at the beginning of the while() loop above
+ if (lastRequest && hasAllParts(lastRequest, availableParts)) {
+ lastRequest->ids.removeOne(prevPimItemId);
+ readyItems.push_back(prevPimItemId);
+ // No need to update the hashtable at this point
+ }
+ //qCDebug(AKONADISERVER_LOG) << "Closing queries and sending out requests.";
query.finish();
- Q_FOREACH (ItemRetrievalRequest *request, requests) {
+ if (!readyItems.isEmpty()) {
+ Q_EMIT itemsRetrieved(readyItems.toList());
+ }
+
+ QEventLoop eventLoop;
+ connect(ItemRetrievalManager::instance(), &ItemRetrievalManager::requestFinished,
+ this, [&](ItemRetrievalRequest *finishedRequest) {
+ if (!finishedRequest->errorMsg.isEmpty()) {
+ mLastError = finishedRequest->errorMsg.toUtf8();
+ eventLoop.exit(1);
+ } else {
+ requests.removeOne(finishedRequest);
+ Q_EMIT itemsRetrieved(finishedRequest->ids);
+ if (requests.isEmpty()) {
+ eventLoop.quit();
+ }
+ }
+ }, Qt::UniqueConnection);
+
+ auto it = requests.begin();
+ while (it != requests.end()) {
+ auto request = (*it);
if ((!mFullPayload && request->parts.isEmpty()) || request->ids.isEmpty()) {
+ it = requests.erase(it);
delete request;
continue;
}
// TODO: how should we handle retrieval errors here? so far they have been ignored,
// which makes sense in some cases, do we need a command parameter for this?
try {
+ // Request is deleted inside ItemRetrievalManager, so we need to take
+ // a copy here
+ const auto ids = request->ids;
ItemRetrievalManager::instance()->requestItemDelivery(request);
} catch (const ItemRetrieverException &e) {
qCCritical(AKONADISERVER_LOG) << e.type() << ": " << e.what();
mLastError = e.what();
return false;
}
+ ++it;
+ }
+ if (!requests.isEmpty()) {
+ if (eventLoop.exec(QEventLoop::ExcludeSocketNotifiers)) {
+ return false;
+ }
}
// retrieve items in child collections if requested
@@ -301,6 +365,8 @@ bool ItemRetriever::exec()
retriever.setCollection(col, mRecursive);
retriever.setRetrieveParts(mParts);
retriever.setRetrieveFullPayload(mFullPayload);
+ connect(&retriever, &ItemRetriever::itemsRetrieved,
+ this, &ItemRetriever::itemsRetrieved);
result = retriever.exec();
if (!result) {
break;
@@ -313,7 +379,7 @@ bool ItemRetriever::exec()
void ItemRetriever::verifyCache()
{
- if (!connection()->verifyCacheOnRetrieval()) {
+ if (!connection() || !connection()->verifyCacheOnRetrieval()) {
return;
}
diff --git a/src/server/storage/itemretriever.h b/src/server/storage/itemretriever.h
index a1665a8..8f44f7f 100644
--- a/src/server/storage/itemretriever.h
+++ b/src/server/storage/itemretriever.h
@@ -20,6 +20,8 @@
#ifndef ITEMRETRIEVER_H
#define ITEMRETRIEVER_H
+#include <QObject>
+
#include "../exception.h"
#include "entities.h"
@@ -43,10 +45,12 @@ class QueryBuilder;
@todo make usable for Fetch by allowing to share queries
*/
-class ItemRetriever
+class ItemRetriever : public QObject
{
+ Q_OBJECT
+
public:
- ItemRetriever(Connection *connection);
+ ItemRetriever(Connection *connection = Q_NULLPTR);
Connection *connection() const;
@@ -68,6 +72,9 @@ public:
QByteArray lastError() const;
+Q_SIGNALS:
+ void itemsRetrieved(const QList<qint64> &ids);
+
private:
QSqlQuery buildQuery() const;