aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Vrátil <dvratil@kde.org>2016-08-09 23:22:05 (GMT)
committerDaniel Vrátil <dvratil@kde.org>2016-08-10 00:18:14 (GMT)
commit816526018ba478fc896c892928535591c75e51b3 (patch)
tree6c88710345e524d231cdfcbf9df6d827b2527b37
parentd576724f77f76119ba8cc0b55c1313473ff23f2e (diff)
Introduce batch Resource Item retrieval (ABI break)
One of the bottlenecks in the current design is the individual Item retrieval via requestItemDelivery(). The Server does a request for the first Item, then waits for the Resource to deliver before requesting the next Item and so on. This patch changes the signature of the requestItemDelivery() DBus method to support passing multiple Items (Item IDs to be specific) at once. The patch also deprecates ResourceBase::retrieveItem() in favor of a newly introduces ResourceBase::retrieveItems() overload (which takes Item::List and set of part names as arguments) allowing resource implementations to process all the requested Items in a single batch and pass them back to the Server. This should be much faster than querying the Items one by one. This change does not remove the retrieveItem() method, but only deprecates it. When Resource only implements retrieveItem() and not the new retrieveItems() then retrieveItems() will split the ResourceScheduler Task into many single-Item Tasks and will call retrieveItem() for each of them. The next step is to get rid of DBus in here and use a dedicated command bus where the Server could request the Items via the Protocol and would be handed the Items back through the bus. Once all resources are ported to the new API, the old retrieveItem() method should be removed.
-rw-r--r--CMakeLists.txt2
-rw-r--r--autotests/libs/testresource/knutresource.cpp25
-rw-r--r--autotests/libs/testresource/knutresource.h3
-rw-r--r--src/agentbase/resourcebase.cpp166
-rw-r--r--src/agentbase/resourcebase.h33
-rw-r--r--src/agentbase/resourcescheduler.cpp104
-rw-r--r--src/agentbase/resourcescheduler_p.h34
-rw-r--r--src/core/jobs/itemfetchjob.cpp19
-rw-r--r--src/core/jobs/itemfetchjob.h6
-rw-r--r--src/interfaces/org.freedesktop.Akonadi.Resource.xml7
-rw-r--r--src/server/storage/itemretrievaljob.cpp5
-rw-r--r--src/server/storage/itemretrievalmanager.cpp19
-rw-r--r--src/server/storage/itemretrievalmanager.h3
-rw-r--r--src/server/storage/itemretrievalrequest.h8
-rw-r--r--src/server/storage/itemretriever.cpp46
15 files changed, 393 insertions, 87 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 11a8a83..4df0cc6 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -20,7 +20,7 @@ include(ECMQtDeclareLoggingCategory)
include(AkonadiMacros)
-set(PIM_VERSION "5.3.40")
+set(PIM_VERSION "5.3.41")
set(QT_REQUIRED_VERSION "5.5.0")
set(AKONADI_VERSION ${PIM_VERSION})
diff --git a/autotests/libs/testresource/knutresource.cpp b/autotests/libs/testresource/knutresource.cpp
index b7b2dcd..2542096 100644
--- a/autotests/libs/testresource/knutresource.cpp
+++ b/autotests/libs/testresource/knutresource.cpp
@@ -156,6 +156,7 @@ void KnutResource::retrieveItems(const Akonadi::Collection &collection)
itemsRetrieved(items);
}
+#ifdef DO_IT_THE_OLD_WAY
bool KnutResource::retrieveItem(const Item &item, const QSet<QByteArray> &parts)
{
Q_UNUSED(parts);
@@ -171,6 +172,30 @@ bool KnutResource::retrieveItem(const Item &item, const QSet<QByteArray> &parts)
itemRetrieved(i);
return true;
}
+#endif
+
+bool KnutResource::retrieveItems(const Item::List &items, const QSet<QByteArray> &parts)
+{
+ Q_UNUSED(parts);
+
+ Item::List results;
+ results.reserve(items.size());
+ for (const auto &item : items) {
+ const QDomElement itemElem = mDocument.itemElementByRemoteId(item.remoteId());
+ if (itemElem.isNull()) {
+ cancelTask(i18n("No item found for remoteid %1", item.remoteId()));
+ return false;
+ }
+
+ Item i = XmlReader::elementToItem(itemElem, true);
+ i.setParentCollection(item.parentCollection());
+ i.setId(item.id());
+ results.push_back(i);
+ }
+
+ itemsRetrieved(results);
+ return true;
+}
void KnutResource::collectionAdded(const Akonadi::Collection &collection, const Akonadi::Collection &parent)
{
diff --git a/autotests/libs/testresource/knutresource.h b/autotests/libs/testresource/knutresource.h
index b7f9548..d70c778 100644
--- a/autotests/libs/testresource/knutresource.h
+++ b/autotests/libs/testresource/knutresource.h
@@ -50,7 +50,10 @@ public Q_SLOTS:
protected:
void retrieveCollections() Q_DECL_OVERRIDE;
void retrieveItems(const Akonadi::Collection &collection) Q_DECL_OVERRIDE;
+#ifdef DO_IT_THE_OLD_WAY
bool retrieveItem(const Akonadi::Item &item, const QSet<QByteArray> &parts) Q_DECL_OVERRIDE;
+#endif
+ bool retrieveItems(const Akonadi::Item::List &items, const QSet<QByteArray> &parts) Q_DECL_OVERRIDE;
void collectionAdded(const Akonadi::Collection &collection, const Akonadi::Collection &parent) Q_DECL_OVERRIDE;
void collectionChanged(const Akonadi::Collection &collection) Q_DECL_OVERRIDE;
diff --git a/src/agentbase/resourcebase.cpp b/src/agentbase/resourcebase.cpp
index 4bb245b..761ea99 100644
--- a/src/agentbase/resourcebase.cpp
+++ b/src/agentbase/resourcebase.cpp
@@ -163,6 +163,9 @@ public:
void slotPrepareItemRetrieval(const Akonadi::Item &item);
void slotPrepareItemRetrievalResult(KJob *job);
+ void slotPrepareItemsRetrieval(const QVector<Akonadi::Item> &item);
+ void slotPrepareItemsRetrievalResult(KJob *job);
+
void changeCommittedResult(KJob *job);
void slotRecursiveMoveReplay(RecursiveMover *mover);
@@ -505,6 +508,8 @@ ResourceBase::ResourceBase(const QString &id)
SLOT(slotSynchronizeRelations()));
connect(d->scheduler, SIGNAL(executeItemFetch(Akonadi::Item,QSet<QByteArray>)),
SLOT(slotPrepareItemRetrieval(Akonadi::Item)));
+ connect(d->scheduler, SIGNAL(executeItemsFetch(QVector<Akonadi::Item>,QSet<QByteArray>)),
+ SLOT(slotPrepareItemsRetrieval(QVector<Akonadi::Item>)));
connect(d->scheduler, SIGNAL(executeResourceCollectionDeletion()),
SLOT(slotDeleteResourceCollection()));
connect(d->scheduler, SIGNAL(executeCacheInvalidation(Akonadi::Collection)),
@@ -620,8 +625,7 @@ void ResourceBase::itemRetrieved(const Item &item)
Q_D(ResourceBase);
Q_ASSERT(d->scheduler->currentTask().type == ResourceScheduler::FetchItem);
if (!item.isValid()) {
- d->scheduler->currentTask().sendDBusReplies(i18nc("@info", "Invalid item retrieved"));
- d->scheduler->taskDone();
+ d->scheduler->itemFetchDone(i18nc("@info", "Invalid item retrieved"));
return;
}
@@ -647,8 +651,7 @@ void ResourceBasePrivate::slotDeliveryDone(KJob *job)
if (job->error()) {
emit q->error(i18nc("@info", "Error while creating item: %1", job->errorString()));
}
- scheduler->currentTask().sendDBusReplies(job->error() ? job->errorString() : QString());
- scheduler->taskDone();
+ scheduler->itemFetchDone(QString());
}
void ResourceBase::collectionAttributesRetrieved(const Collection &collection)
@@ -775,7 +778,7 @@ void ResourceBase::changeCommitted(const Tag &tag)
connect(job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)));
}
-QString ResourceBase::requestItemDelivery(qint64 uid, const QString &remoteId, const QString &mimeType, const QByteArrayList &parts)
+QString ResourceBase::requestItemDelivery(const QList<qint64> &uids, const QByteArrayList &parts)
{
Q_D(ResourceBase);
if (!isOnline()) {
@@ -785,15 +788,15 @@ QString ResourceBase::requestItemDelivery(qint64 uid, const QString &remoteId, c
}
setDelayedReply(true);
- // FIXME: we need at least the revision number too
- Item item(uid);
- item.setMimeType(mimeType);
- item.setRemoteId(remoteId);
- d->scheduler->scheduleItemFetch(item, QSet<QByteArray>::fromList(parts), message());
+ Item::List items;
+ items.reserve(uids.size());
+ for (auto uid : uids) {
+ items.push_back(Item(uid));
+ }
+ d->scheduler->scheduleItemsFetch(items, QSet<QByteArray>::fromList(parts), message());
return QString();
-
}
void ResourceBase::collectionsRetrieved(const Collection::List &collections)
@@ -1005,16 +1008,16 @@ void ResourceBasePrivate::slotSynchronizeRelations()
QMetaObject::invokeMethod(q, "retrieveRelations");
}
-void ResourceBasePrivate::slotPrepareItemRetrieval(const Akonadi::Item &item)
+void ResourceBasePrivate::slotPrepareItemRetrieval(const Item &item)
{
Q_Q(ResourceBase);
- ItemFetchJob *fetch = new ItemFetchJob(item, this);
+ auto fetch = new ItemFetchJob(item, this);
fetch->fetchScope().setAncestorRetrieval(q->changeRecorder()->itemFetchScope().ancestorRetrieval());
fetch->fetchScope().setCacheOnly(true);
// copy list of attributes to fetch
const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
- foreach (const QByteArray &attribute, attributes) {
+ for (const auto &attribute : attributes) {
fetch->fetchScope().fetchAttribute(attribute);
}
@@ -1032,13 +1035,44 @@ void ResourceBasePrivate::slotPrepareItemRetrievalResult(KJob *job)
return;
}
ItemFetchJob *fetch = qobject_cast<ItemFetchJob *>(job);
- if (fetch->items().count() != 1) {
- q->cancelTask(i18n("The requested item no longer exists"));
+ const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
+ if (!q->retrieveItem(fetch->items().at(0), parts)) {
+ q->cancelTask();
+ }
+}
+
+void ResourceBasePrivate::slotPrepareItemsRetrieval(const QVector<Item> &items)
+{
+ Q_Q(ResourceBase);
+ ItemFetchJob *fetch = new ItemFetchJob(items, this);
+ fetch->fetchScope().setAncestorRetrieval(q->changeRecorder()->itemFetchScope().ancestorRetrieval());
+ fetch->fetchScope().setCacheOnly(true);
+ // It's possible that one or more items were removed before this task was
+ // executed, so ignore it and just handle the rest.
+ fetch->fetchScope().setIgnoreRetrievalErrors(true);
+
+ // copy list of attributes to fetch
+ const QSet<QByteArray> attributes = q->changeRecorder()->itemFetchScope().attributes();
+ for (const auto &attribute : attributes) {
+ fetch->fetchScope().fetchAttribute(attribute);
+ }
+
+ q->connect(fetch, SIGNAL(result(KJob*)), SLOT(slotPrepareItemsRetrievalResult(KJob*)));
+}
+
+void ResourceBasePrivate::slotPrepareItemsRetrievalResult(KJob *job)
+{
+ Q_Q(ResourceBase);
+ Q_ASSERT_X(scheduler->currentTask().type == ResourceScheduler::FetchItems,
+ "ResourceBasePrivate::slotPrepareItemsRetrievalResult()",
+ "Preparing items retrieval although no items retrieval is in progress");
+ if (job->error()) {
+ q->cancelTask(job->errorText());
return;
}
- const Item item = fetch->items().at(0);
+ ItemFetchJob *fetch = qobject_cast<ItemFetchJob *>(job);
const QSet<QByteArray> parts = scheduler->currentTask().itemParts;
- if (!q->retrieveItem(item, parts)) {
+ if (!q->retrieveItems(fetch->items(), parts)) {
q->cancelTask();
}
}
@@ -1073,6 +1107,9 @@ void ResourceBase::itemsRetrievalDone()
if (d->mItemSyncer) {
d->mItemSyncer->deliveryDone();
} else {
+ if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
+ d->scheduler->currentTask().sendDBusReplies(QString());
+ }
// user did the sync himself, we are done now
d->scheduler->taskDone();
}
@@ -1105,7 +1142,16 @@ Item ResourceBase::currentItem() const
Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItem,
"ResourceBase::currentItem()",
"Trying to access current item although no item retrieval is in progress");
- return d->scheduler->currentTask().item;
+ return d->scheduler->currentTask().items[0];
+}
+
+Item::List ResourceBase::currentItems() const
+{
+ Q_D(const ResourceBase);
+ Q_ASSERT_X(d->scheduler->currentTask().type == ResourceScheduler::FetchItems,
+ "ResourceBase::currentItems()",
+ "Trying to access current items although no items retrieval is in progress");
+ return d->scheduler->currentTask().items;
}
void ResourceBase::synchronizeCollectionTree()
@@ -1261,12 +1307,53 @@ void ResourceBase::setItemStreamingEnabled(bool enable)
}
}
+namespace {
+
+class UpdateItemsJob : public KCompositeJob
+{
+public:
+ explicit UpdateItemsJob(const Item::List &items, QObject *parent = Q_NULLPTR)
+ : KCompositeJob(parent)
+ , mItems(items)
+ {
+ }
+
+ void start() Q_DECL_OVERRIDE
+ {
+ Q_FOREACH (const Item &item, mItems) {
+ auto job = new ItemModifyJob(item, this);
+ addSubjob(job);
+ }
+ }
+
+ void slotResult(KJob *job) Q_DECL_OVERRIDE
+ {
+ KCompositeJob::slotResult(job);
+ if (!hasSubjobs()) {
+ emitResult();
+ }
+ }
+
+private:
+ Item::List mItems;
+};
+
+}
+
+
void ResourceBase::itemsRetrieved(const Item::List &items)
{
Q_D(ResourceBase);
- d->createItemSyncInstanceIfMissing();
- if (d->mItemSyncer) {
- d->mItemSyncer->setFullSyncItems(items);
+ if (d->scheduler->currentTask().type == ResourceScheduler::FetchItems) {
+ auto job = new UpdateItemsJob(items, this);
+ connect(job, SIGNAL(result(KJob*)),
+ this, SLOT(slotItemSyncDone(KJob*)));
+ job->start();
+ } else {
+ d->createItemSyncInstanceIfMissing();
+ if (d->mItemSyncer) {
+ d->mItemSyncer->setFullSyncItems(items);
+ }
}
}
@@ -1286,6 +1373,8 @@ void ResourceBasePrivate::slotItemSyncDone(KJob *job)
Q_Q(ResourceBase);
if (job->error() && job->error() != Job::UserCanceled) {
emit q->error(job->errorString());
+ } else if (scheduler->currentTask().type == ResourceScheduler::FetchItems) {
+ scheduler->currentTask().sendDBusReplies(QString());
}
scheduler->taskDone();
}
@@ -1361,6 +1450,39 @@ void ResourceBase::retrieveRelations()
d->scheduler->taskDone();
}
+bool ResourceBase::retrieveItem(const Akonadi::Item &item, const QSet<QByteArray> &parts)
+{
+ Q_UNUSED(item);
+ Q_UNUSED(parts);
+ // retrieveItem() can no longer be pure virtual, because then we could not mark
+ // it as deprecated (i.e. implementations would still be forced to implement it),
+ // so instead we assert here.
+ // NOTE: Don't change to Q_ASSERT_X here: while the macro can be disabled at
+ // compile time, we want to hit this assert *ALWAYS*.
+ qt_assert_x("Akonadi::ResourceBase::retrieveItem()",
+ "The base implementation of retrieveItem() must never be reached. "
+ "You must implement either retrieveItem() or retrieveItems(Akonadi::Item::List, QSet<QByteArray>) overload "
+ "to handle item retrieval requests.", __FILE__, __LINE__);
+ return false;
+}
+
+bool ResourceBase::retrieveItems(const Akonadi::Item::List& items, const QSet<QByteArray>& parts)
+{
+ Q_D(ResourceBase);
+
+ // If we reach this implementation of retrieveItems() then it means that the
+ // resource is still using the deprecated retrieveItem() method, so we explode
+ // this to a myriad of tasks in scheduler and let them be processed one by one
+
+ const qint64 id = d->scheduler->currentTask().serial;
+ for (const auto &item : items) {
+ d->scheduler->scheduleItemFetch(item, parts, d->scheduler->currentTask().dbusMsgs, id);
+ }
+ taskDone();
+ return true;
+}
+
+
void Akonadi::ResourceBase::abortActivity()
{
}
diff --git a/src/agentbase/resourcebase.h b/src/agentbase/resourcebase.h
index eb50453..70de245 100644
--- a/src/agentbase/resourcebase.h
+++ b/src/agentbase/resourcebase.h
@@ -347,8 +347,24 @@ protected Q_SLOTS:
* @param parts The item parts that should be retrieved.
* @return false if there is an immediate error when retrieving the item.
* @see itemRetrieved()
+ * @deprecated Use retrieveItems(const Akonadi::Item::List &, const QSet<QByteArray> &) instead.
*/
- virtual bool retrieveItem(const Akonadi::Item &item, const QSet<QByteArray> &parts) = 0;
+ AKONADIAGENTBASE_DEPRECATED virtual bool retrieveItem(const Akonadi::Item &item, const QSet<QByteArray> &parts);
+
+ /**
+ * Retrieve given @p items from the backend.
+ * Add the requested payload parts and call itemsRetrieved() when done.
+ * @param items The items whose payload should be retrieved. Use those objects
+ * when delivering the result instead of creating new items to ensure conflict
+ * detection will work.
+ * @param parts The item parts that should be retrieved.
+ * @return false if there is an immeidate error when retrieving the items.
+ * @see itemsRetrieved()
+ * @since 5.4
+ *
+ * @todo: Make this method pure virtual once retrieveItem() is gone
+ */
+ virtual bool retrieveItems(const Akonadi::Item::List &items, const QSet<QByteArray> &parts);
/**
* Abort any activity in progress in the backend. By default this method does nothing.
@@ -643,7 +659,14 @@ protected:
* @note Calling this method is only allowed during fetching a single item, that
* is directly or indirectly from retrieveItem().
*/
- Item currentItem() const;
+ AKONADIAGENTBASE_DEPRECATED Item currentItem() const;
+
+ /**
+ * Returns the items that are currently retrieved.
+ * @note Calling this method is only allowed during item fetch, that is
+ * directly or indirectly from retrieveItems(Akonadi::Item::List,QSet<QByteArray>)
+ */
+ Item::List currentItems() const;
/**
* This method is called whenever the resource should start synchronize all data.
@@ -820,7 +843,7 @@ private:
// dbus resource interface
friend class ::Akonadi__ResourceAdaptor;
- QString requestItemDelivery(qint64 uid, const QString &remoteId, const QString &mimeType, const QByteArrayList &parts);
+ QString requestItemDelivery(const QList<qint64> &uids, const QByteArrayList &parts);
private:
Q_DECLARE_PRIVATE(ResourceBase)
@@ -841,8 +864,10 @@ private:
Q_PRIVATE_SLOT(d_func(), void slotItemSyncDone(KJob *))
Q_PRIVATE_SLOT(d_func(), void slotPercent(KJob *, unsigned long))
Q_PRIVATE_SLOT(d_func(), void slotDelayedEmitProgress())
- Q_PRIVATE_SLOT(d_func(), void slotPrepareItemRetrieval(const Akonadi::Item &item))
+ Q_PRIVATE_SLOT(d_func(), void slotPrepareItemRetrieval(const Akonadi::Item &items))
Q_PRIVATE_SLOT(d_func(), void slotPrepareItemRetrievalResult(KJob *))
+ Q_PRIVATE_SLOT(d_func(), void slotPrepareItemsRetrieval(const QVector<Akonadi::Item> &items))
+ Q_PRIVATE_SLOT(d_func(), void slotPrepareItemsRetrievalResult(KJob *))
Q_PRIVATE_SLOT(d_func(), void changeCommittedResult(KJob *))
Q_PRIVATE_SLOT(d_func(), void slotSessionReconnected())
Q_PRIVATE_SLOT(d_func(), void slotRecursiveMoveReplay(RecursiveMover *))
diff --git a/src/agentbase/resourcescheduler.cpp b/src/agentbase/resourcescheduler.cpp
index 0bd4734..e4f181c 100644
--- a/src/agentbase/resourcescheduler.cpp
+++ b/src/agentbase/resourcescheduler.cpp
@@ -124,11 +124,30 @@ void ResourceScheduler::scheduleAttributesSync(const Collection &collection)
scheduleNext();
}
-void ResourceScheduler::scheduleItemFetch(const Item &item, const QSet<QByteArray> &parts, const QDBusMessage &msg)
+void ResourceScheduler::scheduleItemFetch(const Akonadi::Item &item, const QSet<QByteArray> &parts,
+ const QList<QDBusMessage> &msgs, qint64 parentId)
+
{
Task t;
t.type = FetchItem;
- t.item = item;
+ t.items << item;
+ t.itemParts = parts;
+ t.dbusMsgs = msgs;
+ t.argument = parentId;
+
+ TaskList &queue = queueForTaskType(t.type);
+ queue << t;
+
+ signalTaskToTracker(t, "FetchItem", QString::number(item.id()));
+ scheduleNext();
+}
+
+
+void ResourceScheduler::scheduleItemsFetch(const Item::List &items, const QSet<QByteArray> &parts, const QDBusMessage &msg)
+{
+ Task t;
+ t.type = FetchItems;
+ t.items = items;
t.itemParts = parts;
// if the current task does already fetch the requested item, break here but
@@ -148,7 +167,13 @@ void ResourceScheduler::scheduleItemFetch(const Item &item, const QSet<QByteArra
t.dbusMsgs << msg;
queue << t;
- signalTaskToTracker(t, "FetchItem", QString::number(item.id()));
+
+ QStringList ids;
+ ids.reserve(items.size());
+ for (const auto &item : items) {
+ ids.push_back(QString::number(item.id()));
+ }
+ signalTaskToTracker(t, "FetchItems", ids.join(QStringLiteral(", ")));
scheduleNext();
}
@@ -282,6 +307,44 @@ void ResourceScheduler::taskDone()
scheduleNext();
}
+void ResourceScheduler::itemFetchDone(const QString &msg)
+{
+ Q_ASSERT(mCurrentTask.type == FetchItem);
+
+ TaskList &queue = queueForTaskType(mCurrentTask.type);
+
+ const qint64 parentId = mCurrentTask.argument.toLongLong();
+ // msg is empty, there was no error
+ if (msg.isEmpty() && !queue.isEmpty()) {
+ Task &nextTask = queue[0];
+ // If the next task is FetchItem too...
+ if (nextTask.type != mCurrentTask.type || nextTask.argument.toLongLong() != parentId) {
+ // If the next task is not FetchItem or the next FetchItem task has
+ // different parentId then this was the last task in the series, so
+ // send the DBus replies.
+ mCurrentTask.sendDBusReplies(msg);
+ }
+ } else {
+ // msg was not empty, there was an error.
+ // remove all subsequent FetchItem tasks with the same parentId
+ auto iter = queue.begin();
+ while (iter != queue.end()) {
+ if (iter->type != mCurrentTask.type || iter->argument.toLongLong() == parentId) {
+ iter = queue.erase(iter);
+ continue;
+ } else {
+ break;
+ }
+ }
+
+ // ... and send DBus reply with the error message
+ mCurrentTask.sendDBusReplies(msg);
+ }
+
+ taskDone();
+}
+
+
void ResourceScheduler::deferTask()
{
if (mCurrentTask.type == Invalid) {
@@ -362,7 +425,10 @@ void ResourceScheduler::executeNext()
emit executeTagSync();
break;
case FetchItem:
- emit executeItemFetch(mCurrentTask.item, mCurrentTask.itemParts);
+ emit executeItemFetch(mCurrentTask.items.at(0), mCurrentTask.itemParts);
+ break;
+ case FetchItems:
+ emit executeItemsFetch(mCurrentTask.items, mCurrentTask.itemParts);
break;
case DeleteResourceCollection:
emit executeResourceCollectionDeletion();
@@ -415,6 +481,11 @@ ResourceScheduler::Task ResourceScheduler::currentTask() const
return mCurrentTask;
}
+ResourceScheduler::Task &ResourceScheduler::currentTask()
+{
+ return mCurrentTask;
+}
+
void ResourceScheduler::setOnline(bool state)
{
if (mOnline == state) {
@@ -432,9 +503,21 @@ void ResourceScheduler::setOnline(bool state)
}
// abort pending synchronous tasks, might take longer until the resource goes online again
TaskList &itemFetchQueue = queueForTaskType(FetchItem);
+ qint64 parentId = -1;
+ Task lastTask;
for (QList< Task >::iterator it = itemFetchQueue.begin(); it != itemFetchQueue.end();) {
if ((*it).type == FetchItem) {
- (*it).sendDBusReplies(i18nc("@info", "Job canceled."));
+ qint64 idx = it->argument.toLongLong();
+ if (parentId == -1) {
+ parentId = idx;
+ }
+ if (idx != parentId) {
+ // Only emit the DBus reply once we reach the last taskwith the
+ // same "idx"
+ lastTask.sendDBusReplies(i18nc("@info", "Job canceled."));
+ parentId = idx;
+ }
+ lastTask = (*it);
it = itemFetchQueue.erase(it);
if (s_resourcetracker) {
QList<QVariant> argumentList;
@@ -510,6 +593,7 @@ ResourceScheduler::QueueType ResourceScheduler::queueTypeForTaskType(TaskType ty
case RecursiveMoveReplay:
return ChangeReplayQueue;
case FetchItem:
+ case FetchItems:
case SyncCollectionAttributes:
return UserActionQueue;
default:
@@ -582,6 +666,7 @@ static const char s_taskTypes[][27] = {
"SyncCollectionAttributes",
"SyncTags",
"FetchItem",
+ "FetchItems",
"ChangeReplay",
"RecursiveMoveReplay",
"DeleteResourceCollection",
@@ -599,8 +684,13 @@ QTextStream &Akonadi::operator<<(QTextStream &d, const ResourceScheduler::Task &
if (task.collection.isValid()) {
d << "collection " << task.collection.id() << " ";
}
- if (task.item.id() != -1) {
- d << "item " << task.item.id() << " ";
+ if (!task.items.isEmpty()) {
+ QStringList ids;
+ ids.reserve(task.items.size());
+ for (const auto &item : task.items) {
+ ids.push_back(QString::number(item.id()));
+ }
+ d << "items " << ids.join(QStringLiteral(", ")) << " ";
}
if (!task.methodName.isEmpty()) {
d << task.methodName << " " << task.argument.toString();
diff --git a/src/agentbase/resourcescheduler_p.h b/src/agentbase/resourcescheduler_p.h
index 41bf222..d4fa78e 100644
--- a/src/agentbase/resourcescheduler_p.h
+++ b/src/agentbase/resourcescheduler_p.h
@@ -57,6 +57,7 @@ public:
SyncCollectionAttributes,
SyncTags,
FetchItem,
+ FetchItems,
ChangeReplay,
RecursiveMoveReplay,
DeleteResourceCollection,
@@ -81,7 +82,7 @@ public:
qint64 serial;
TaskType type;
Collection collection;
- Item item;
+ QVector<Item> items;
QSet<QByteArray> itemParts;
QList<QDBusMessage> dbusMsgs;
QObject *receiver;
@@ -94,7 +95,7 @@ public:
{
return type == other.type
&& (collection == other.collection || (!collection.isValid() && !other.collection.isValid()))
- && (item == other.item || (!item.isValid() && !other.item.isValid()))
+ && items == other.items
&& itemParts == other.itemParts
&& receiver == other.receiver
&& methodName == other.methodName
@@ -131,11 +132,28 @@ public:
/**
Schedules fetching of a single PIM item.
- @param item The item to fetch.
+
+ This task is only ever used if the resource still uses the old deprecated
+ retrieveItem() (instead of retrieveItems(Item::List)) method. This task has
+ a special meaning to the scheduler and instead of replying to the DBus message
+ after the single @p item is retrieved, the items are accumulated until all
+ tasks from the same messages are fetched.
+
+ @param items The items to fetch.
@param parts List of names of the parts of the item to fetch.
@param msg The associated D-Bus message.
+ @param parentId ID of the original ItemsFetch task that this task was created from.
+ We can use this ID to group the tasks together
*/
- void scheduleItemFetch(const Item &item, const QSet<QByteArray> &parts, const QDBusMessage &msg);
+ void scheduleItemFetch(const Item &item, const QSet<QByteArray> &parts, const QList<QDBusMessage> &msgs, const qint64 parentId);
+
+ /**
+ Schedules batch-fetching of PIM items.
+ @param items The items to fetch.
+ @param parts List of names of the parts of the item to fetch.
+ @param msg The associated D-Bus message.
+ */
+ void scheduleItemsFetch(const Item::List &item, const QSet<QByteArray> &parts, const QDBusMessage &msg);
/**
Schedules deletion of the resource collection.
@@ -180,6 +198,8 @@ public:
*/
Task currentTask() const;
+ Task &currentTask();
+
/**
Sets the online state.
*/
@@ -220,6 +240,11 @@ public Q_SLOTS:
void taskDone();
/**
+ Like taskDone(), but special case for ItemFetch task
+ */
+ void itemFetchDone(const QString &msg);
+
+ /**
The current task can't be finished now and will be rescheduled later
*/
void deferTask();
@@ -237,6 +262,7 @@ Q_SIGNALS:
void executeTagSync();
void executeRelationSync();
void executeItemFetch(const Akonadi::Item &item, const QSet<QByteArray> &parts);
+ void executeItemsFetch(const QVector<Akonadi::Item> &items, const QSet<QByteArray> &parts);
void executeResourceCollectionDeletion();
void executeCacheInvalidation(const Akonadi::Collection &collection);
void executeChangeReplay();
diff --git a/src/core/jobs/itemfetchjob.cpp b/src/core/jobs/itemfetchjob.cpp
index 2e2b841..69cf651 100644
--- a/src/core/jobs/itemfetchjob.cpp
+++ b/src/core/jobs/itemfetchjob.cpp
@@ -129,7 +129,7 @@ ItemFetchJob::ItemFetchJob(const Item &item, QObject *parent)
d->mRequestedItems.append(item);
}
-ItemFetchJob::ItemFetchJob(const Akonadi::Item::List &items, QObject *parent)
+ItemFetchJob::ItemFetchJob(const Item::List &items, QObject *parent)
: Job(new ItemFetchJobPrivate(this), parent)
{
Q_D(ItemFetchJob);
@@ -138,13 +138,26 @@ ItemFetchJob::ItemFetchJob(const Akonadi::Item::List &items, QObject *parent)
d->mRequestedItems = items;
}
-ItemFetchJob::ItemFetchJob(const QList<Akonadi::Item::Id> &items, QObject *parent)
+ItemFetchJob::ItemFetchJob(const QList<Item::Id> &items, QObject *parent)
: Job(new ItemFetchJobPrivate(this), parent)
{
Q_D(ItemFetchJob);
d->init();
- foreach (Item::Id id, items) {
+ d->mRequestedItems.reserve(items.size());
+ for (auto id : items) {
+ d->mRequestedItems.append(Item(id));
+ }
+}
+
+ItemFetchJob::ItemFetchJob(const QVector<Item::Id> &items, QObject *parent)
+ : Job(new ItemFetchJobPrivate(this), parent)
+{
+ Q_D(ItemFetchJob);
+
+ d->init();
+ d->mRequestedItems.reserve(items.size());
+ for (auto id : items) {
d->mRequestedItems.append(Item(id));
}
}
diff --git a/src/core/jobs/itemfetchjob.h b/src/core/jobs/itemfetchjob.h
index b5a24ec..be081a9 100644
--- a/src/core/jobs/itemfetchjob.h
+++ b/src/core/jobs/itemfetchjob.h
@@ -138,6 +138,12 @@ public:
explicit ItemFetchJob(const QList<Item::Id> &items, QObject *parent = Q_NULLPTR);
/**
+ * Convenience ctor equivalent to ItemFetchJob(const Item::List &items, QObject *parent = Q_NULLPTR)
+ * @since 5.4
+ */
+ explicit ItemFetchJob(const QVector<Item::Id> &items, QObject *parent = Q_NULLPTR);
+
+ /**
* Creates a new item fetch job that retrieves all items tagged with specified @p tag.
*
* @param tag The tag to fetch all items from.
diff --git a/src/interfaces/org.freedesktop.Akonadi.Resource.xml b/src/interfaces/org.freedesktop.Akonadi.Resource.xml
index 7999960..0976ce6 100644
--- a/src/interfaces/org.freedesktop.Akonadi.Resource.xml
+++ b/src/interfaces/org.freedesktop.Akonadi.Resource.xml
@@ -11,10 +11,9 @@
<signal name="collectionTreeSynchronized"/>
<method name="requestItemDelivery">
<arg type="s" direction="out"/>
- <arg name="uid" type="x" direction="in"/>
- <arg name="remoteId" type="s" direction="in"/>
- <arg name="mimeType" type="s" direction="in"/>
- <annotation name="org.qtproject.QtDBus.QtTypeName.In3" value="QByteArrayList"/>
+ <annotation name="org.qtproject.QtDBus.QtTypeName.In0" value="QList&lt;qlonglong&gt;"/>
+ <arg name="uids" type="ax" direction="in"/>
+ <annotation name="org.qtproject.QtDBus.QtTypeName.In1" value="QByteArrayList"/>
<arg name="parts" type="aay" direction="in"/>
</method>
<method name="synchronize">
diff --git a/src/server/storage/itemretrievaljob.cpp b/src/server/storage/itemretrievaljob.cpp
index 0b7d3cb..662cbee 100644
--- a/src/server/storage/itemretrievaljob.cpp
+++ b/src/server/storage/itemretrievaljob.cpp
@@ -35,14 +35,13 @@ ItemRetrievalJob::~ItemRetrievalJob()
void ItemRetrievalJob::start(org::freedesktop::Akonadi::Resource *interface)
{
Q_ASSERT(m_request);
- qCDebug(AKONADISERVER_LOG) << "processing retrieval request for item" << m_request->id << " parts:" << m_request->parts << " of resource:" << m_request->resourceId;
+ qCDebug(AKONADISERVER_LOG) << "processing retrieval request for item" << m_request->ids << " parts:" << m_request->parts << " of resource:" << m_request->resourceId;
m_interface = interface;
// call the resource
if (interface) {
m_active = true;
- auto reply = interface->requestItemDelivery(m_request->id, m_request->remoteId,
- m_request->mimeType, m_request->parts);
+ auto reply = interface->requestItemDelivery(m_request->ids, m_request->parts);
QDBusPendingCallWatcher *watcher = new QDBusPendingCallWatcher(reply, this);
connect(watcher, &QDBusPendingCallWatcher::finished,
this, &ItemRetrievalJob::callFinished);
diff --git a/src/server/storage/itemretrievalmanager.cpp b/src/server/storage/itemretrievalmanager.cpp
index 5d144fb..88e91b9 100644
--- a/src/server/storage/itemretrievalmanager.cpp
+++ b/src/server/storage/itemretrievalmanager.cpp
@@ -126,13 +126,10 @@ org::freedesktop::Akonadi::Resource *ItemRetrievalManager::resourceInterface(con
}
// called from any thread
-void ItemRetrievalManager::requestItemDelivery(qint64 uid, const QByteArray &remoteId, const QByteArray &mimeType,
- const QString &resource, const QVector<QByteArray> &parts)
+void ItemRetrievalManager::requestItemDelivery(qint64 uid, const QString &resource, const QVector<QByteArray> &parts)
{
ItemRetrievalRequest *req = new ItemRetrievalRequest();
- req->id = uid;
- req->remoteId = QString::fromUtf8(remoteId);
- req->mimeType = QString::fromUtf8(mimeType);
+ req->ids << uid;
req->resourceId = resource;
req->parts = parts.toList();
@@ -142,7 +139,7 @@ void ItemRetrievalManager::requestItemDelivery(qint64 uid, const QByteArray &rem
void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest *req)
{
mLock->lockForWrite();
- qCDebug(AKONADISERVER_LOG) << "posting retrieval request for item" << req->id << " there are "
+ qCDebug(AKONADISERVER_LOG) << "posting retrieval request for items" << req->ids << " there are "
<< mPendingRequests.size() << " queues and "
<< mPendingRequests[req->resourceId].size() << " items in mine";
mPendingRequests[req->resourceId].append(req);
@@ -159,14 +156,14 @@ void ItemRetrievalManager::requestItemDelivery(ItemRetrievalRequest *req)
const QString errorMsg = req->errorMsg;
mLock->unlock();
if (errorMsg.isEmpty()) {
- qCDebug(AKONADISERVER_LOG) << "request for item" << req->id << "succeeded";
+ qCDebug(AKONADISERVER_LOG) << "request for items" << req->ids << "succeeded";
return;
} else {
- qCDebug(AKONADISERVER_LOG) << "request for item" << req->id << req->remoteId << "failed:" << errorMsg;
+ qCDebug(AKONADISERVER_LOG) << "request for items" << req->ids << "failed:" << errorMsg;
throw ItemRetrieverException(errorMsg);
}
} else {
- qCDebug(AKONADISERVER_LOG) << "request for item" << req->id << "still pending - waiting";
+ qCDebug(AKONADISERVER_LOG) << "request for items" << req->ids << "still pending - waiting";
mWaitCondition->wait(mLock);
qCDebug(AKONADISERVER_LOG) << "continuing";
}
@@ -222,8 +219,8 @@ void ItemRetrievalManager::retrievalJobFinished(ItemRetrievalRequest *request, c
mCurrentJobs.remove(request->resourceId);
// TODO check if (*it)->parts is a subset of currentRequest->parts
for (QList<ItemRetrievalRequest *>::Iterator it = mPendingRequests[request->resourceId].begin(); it != mPendingRequests[request->resourceId].end();) {
- if ((*it)->id == request->id) {
- qCDebug(AKONADISERVER_LOG) << "someone else requested item" << request->id << "as well, marking as processed";
+ if ((*it)->ids == request->ids) {
+ qCDebug(AKONADISERVER_LOG) << "someone else requested item" << request->ids << "as well, marking as processed";
(*it)->errorMsg = errorMsg;
(*it)->processed = true;
it = mPendingRequests[request->resourceId].erase(it);
diff --git a/src/server/storage/itemretrievalmanager.h b/src/server/storage/itemretrievalmanager.h
index 9090b91..80d3fc4 100644
--- a/src/server/storage/itemretrievalmanager.h
+++ b/src/server/storage/itemretrievalmanager.h
@@ -47,8 +47,7 @@ public:
ItemRetrievalManager(QObject *parent = Q_NULLPTR);
~ItemRetrievalManager();
- void requestItemDelivery(qint64 uid, const QByteArray &remoteId, const QByteArray &mimeType,
- const QString &resource, const QVector<QByteArray> &parts);
+ void requestItemDelivery(qint64 uid, const QString &resource, const QVector<QByteArray> &parts);
/**
* Added for convenience. ItemRetrievalManager takes ownership over the
diff --git a/src/server/storage/itemretrievalrequest.h b/src/server/storage/itemretrievalrequest.h
index 4f34fd0..77b1ac4 100644
--- a/src/server/storage/itemretrievalrequest.h
+++ b/src/server/storage/itemretrievalrequest.h
@@ -22,6 +22,7 @@
#include <QByteArray>
#include <QString>
+#include <QList>
namespace Akonadi {
namespace Server {
@@ -31,14 +32,11 @@ class ItemRetrievalRequest
{
public:
ItemRetrievalRequest()
- : id(-1)
- , processed(false)
+ : processed(false)
{
}
- qint64 id;
- QString remoteId;
- QString mimeType;
+ QList<qint64> ids;
QString resourceId;
QByteArrayList parts; // list instead of vector to simplify client-side handling
QString errorMsg;
diff --git a/src/server/storage/itemretriever.cpp b/src/server/storage/itemretriever.cpp
index 7d03b58..f0220ab 100644
--- a/src/server/storage/itemretriever.cpp
+++ b/src/server/storage/itemretriever.cpp
@@ -199,9 +199,6 @@ bool ItemRetriever::exec()
verifyCache();
QSqlQuery query = buildQuery();
- ItemRetrievalRequest *lastRequest = 0;
- QList<ItemRetrievalRequest *> requests;
-
QByteArrayList parts;
Q_FOREACH (const QByteArray &part, mParts) {
if (part.startsWith(AKONADI_PARAM_PLD)) {
@@ -209,28 +206,35 @@ bool ItemRetriever::exec()
}
}
- QHash<qint64, QString> mimeTypeIdNameCache;
QHash<qint64, QString> resourceIdNameCache;
+ QVector<ItemRetrievalRequest *> requests;
+ QHash<QString /* resourceId */, ItemRetrievalRequest*> resRequests;
+ QHash<qint64, ItemRetrievalRequest*> itemRequests;
while (query.isValid()) {
const qint64 pimItemId = query.value(PimItemIdColumn).toLongLong();
- if (!lastRequest || lastRequest->id != pimItemId) {
- lastRequest = new ItemRetrievalRequest();
- lastRequest->id = pimItemId;
- lastRequest->remoteId = Utils::variantToString(query.value(PimItemRidColumn));
- const qint64 mtId = query.value(MimeTypeIdColumn).toLongLong();
- auto mtIter = mimeTypeIdNameCache.find(mtId);
- if (mtIter == mimeTypeIdNameCache.end()) {
- mtIter = mimeTypeIdNameCache.insert(mtId, MimeType::retrieveById(mtId).name());
- }
- lastRequest->mimeType = *mtIter;
- const qint64 resourceId = query.value(ResourceIdColumn).toLongLong();
- auto resIter = resourceIdNameCache.find(resourceId);
- if (resIter == resourceIdNameCache.end()) {
- resIter = resourceIdNameCache.insert(resourceId, Resource::retrieveById(resourceId).name());
+ const qint64 resourceId = query.value(ResourceIdColumn).toLongLong();
+ auto resIter = resourceIdNameCache.find(resourceId);
+ if (resIter == resourceIdNameCache.end()) {
+ resIter = resourceIdNameCache.insert(resourceId, Resource::retrieveById(resourceId).name());
+ }
+ ItemRetrievalRequest *lastRequest = Q_NULLPTR;
+ auto itemIter = itemRequests.constFind(pimItemId);
+ if (itemIter != itemRequests.constEnd()) {
+ lastRequest = *itemIter;
+ } else {
+ lastRequest = resRequests.value(*resIter);
+ if (!lastRequest || lastRequest->ids.size() > 100) {
+ lastRequest = new ItemRetrievalRequest();
+ lastRequest->ids.push_back(pimItemId);
+ lastRequest->resourceId = *resIter;
+ lastRequest->parts = parts;
+ resRequests.insert(*resIter, lastRequest);
+ itemRequests.insert(pimItemId, lastRequest);
+ requests << lastRequest;
+ } else {
+ lastRequest->ids.push_back(pimItemId);
+ itemRequests.insert(pimItemId, lastRequest);
}
- lastRequest->resourceId = *resIter;
- lastRequest->parts = parts;
- requests << lastRequest;
}
if (query.value(PartTypeNameColumn).isNull()) {