aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Vrátil <dvratil@kde.org>2016-03-31 16:36:26 (GMT)
committerDaniel Vrátil <dvratil@kde.org>2016-08-15 20:03:14 (GMT)
commitf43ab1cfc55282d1b4bf38a6cc1d3dd896999afb (patch)
treea06e1bb2b223e1d8240114fe86b4906968b18a38
parentce10712749bef7c422ac18f55b8a83f976fa2fb2 (diff)
NtfManager: process outgoing notifications in parallel
Use a QThreadPool to run NotificationSubcriber::notify() for multiple instances in parallel. This might provide a small speed up with large amount of subscribers, but nothing big. It will be more visible once we start sending notification payloads.
-rw-r--r--src/server/notificationmanager.cpp33
-rw-r--r--src/server/notificationmanager.h2
-rw-r--r--src/server/notificationsubscriber.cpp37
-rw-r--r--src/server/notificationsubscriber.h7
4 files changed, 74 insertions, 5 deletions
diff --git a/src/server/notificationmanager.cpp b/src/server/notificationmanager.cpp
index 8c622a1..f3c1010 100644
--- a/src/server/notificationmanager.cpp
+++ b/src/server/notificationmanager.cpp
@@ -30,6 +30,8 @@
#include <QLocalSocket>
#include <QSettings>
#include <QCoreApplication>
+#include <QThreadPool>
+#include <QPointer>
using namespace Akonadi;
using namespace Akonadi::Server;
@@ -55,6 +57,9 @@ void NotificationManager::init()
mTimer->setSingleShot(true);
connect(mTimer, &QTimer::timeout,
this, &NotificationManager::emitPendingNotifications);
+
+ mNotifyThreadPool = new QThreadPool(this);
+ mNotifyThreadPool->setMaxThreadCount(5);
}
void NotificationManager::quit()
@@ -99,6 +104,32 @@ void NotificationManager::slotNotify(const Protocol::ChangeNotification::List &m
}
}
+class NotifyRunnable : public QRunnable
+{
+public:
+ explicit NotifyRunnable(NotificationSubscriber *subscriber,
+ const Protocol::ChangeNotification::List &notifications)
+ : mSubscriber(subscriber)
+ , mNotifications(notifications)
+ {
+ }
+
+ ~NotifyRunnable()
+ {
+ }
+
+ void run() Q_DECL_OVERRIDE
+ {
+ if (mSubscriber) {
+ mSubscriber->notify(mNotifications);
+ }
+ }
+
+private:
+ QPointer<NotificationSubscriber> mSubscriber;
+ Protocol::ChangeNotification::List mNotifications;
+};
+
void NotificationManager::emitPendingNotifications()
{
if (mNotifications.isEmpty()) {
@@ -106,7 +137,7 @@ void NotificationManager::emitPendingNotifications()
}
Q_FOREACH (NotificationSubscriber *subscriber, mSubscribers) {
- subscriber->notify(mNotifications);
+ mNotifyThreadPool->start(new NotifyRunnable(subscriber, mNotifications));
}
mNotifications.clear();
diff --git a/src/server/notificationmanager.h b/src/server/notificationmanager.h
index 44a701f..69bc0e4 100644
--- a/src/server/notificationmanager.h
+++ b/src/server/notificationmanager.h
@@ -28,6 +28,7 @@
class NotificationManagerTest;
class QLocalSocket;
+class QThreadPool;
namespace Akonadi {
namespace Server {
@@ -61,6 +62,7 @@ private:
Protocol::ChangeNotification::List mNotifications;
QTimer *mTimer;
+ QThreadPool *mNotifyThreadPool;
QVector<NotificationSubscriber *> mSubscribers;
friend class NotificationSubscriber;
diff --git a/src/server/notificationsubscriber.cpp b/src/server/notificationsubscriber.cpp
index 5317280..96cb384 100644
--- a/src/server/notificationsubscriber.cpp
+++ b/src/server/notificationsubscriber.cpp
@@ -122,6 +122,8 @@ void NotificationSubscriber::socketDisconnected()
void NotificationSubscriber::disconnectSubscriber()
{
+ QMutexLocker locker(&mLock);
+
if (mManager) {
Protocol::SubscriptionChangeNotification changeNtf;
changeNtf.setSubscriber(mSubscriber);
@@ -140,6 +142,8 @@ void NotificationSubscriber::disconnectSubscriber()
void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscriptionCommand &command)
{
+ QMutexLocker locker(&mLock);
+
qDebug() << "Subscriber" << this << "identified as" << command.subscriberName();
mSubscriber = command.subscriberName();
@@ -154,6 +158,8 @@ void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscripti
void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscriptionCommand &command)
{
+ QMutexLocker locker(&mLock);
+
const auto modifiedParts = command.modifiedParts();
#define START_MONITORING(type) \
@@ -245,6 +251,8 @@ void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscripti
Protocol::ChangeNotification NotificationSubscriber::toChangeNotification() const
{
+ // Assumes mLock being locked by caller
+
Protocol::SubscriptionChangeNotification ntf;
ntf.setSessionId(mSession);
ntf.setSubscriber(mSubscriber);
@@ -265,6 +273,8 @@ Protocol::ChangeNotification NotificationSubscriber::toChangeNotification() cons
bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const
{
+ // Assumes mLock being locked by caller
+
if (id < 0) {
return false;
} else if (mMonitoredCollections.contains(id)) {
@@ -277,6 +287,8 @@ bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const
bool NotificationSubscriber::isMimeTypeMonitored(const QString &mimeType) const
{
+ // Assumes mLock being locked by caller
+
const QMimeType mt = sMimeDatabase.mimeTypeForName(mimeType);
if (mMonitoredMimeTypes.contains(mimeType)) {
return true;
@@ -293,6 +305,8 @@ bool NotificationSubscriber::isMimeTypeMonitored(const QString &mimeType) const
bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::ItemChangeNotification &msg) const
{
+ // Assumes mLock being locked by caller
+
if (msg.operation() != Protocol::ItemChangeNotification::Move) {
return false;
}
@@ -301,6 +315,8 @@ bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::
bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const
{
+ // Assumes mLock being locked by caller
+
if (msg.operation() != Protocol::CollectionChangeNotification::Move) {
return false;
}
@@ -310,13 +326,15 @@ bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::
bool NotificationSubscriber::acceptsItemNotification(const Protocol::ItemChangeNotification &notification) const
{
+ // Assumes mLock being locked by caller
+
if (notification.items().count() == 0) {
return false;
}
- //We always want notifications that affect the parent resource (like an item added to a referenced collection)
- const bool notificationForParentResource = (mSession == notification.resource());
if (CollectionReferenceManager::instance()->isReferenced(notification.parentCollection())) {
+ //We always want notifications that affect the parent resource (like an item added to a referenced collection)
+ const bool notificationForParentResource = (mSession == notification.resource());
return (mExclusive
|| isCollectionMonitored(notification.parentCollection())
|| isMoveDestinationResourceMonitored(notification)
@@ -364,6 +382,8 @@ bool NotificationSubscriber::acceptsItemNotification(const Protocol::ItemChangeN
bool NotificationSubscriber::acceptsCollectionNotification(const Protocol::CollectionChangeNotification &notification) const
{
+ // Assumes mLock being locked by caller
+
if (notification.id() < 0) {
return false;
}
@@ -435,6 +455,8 @@ bool NotificationSubscriber::acceptsCollectionNotification(const Protocol::Colle
bool NotificationSubscriber::acceptsTagNotification(const Protocol::TagChangeNotification &notification) const
{
+ // Assumes mLock being locked by caller
+
if (notification.id() < 0) {
return false;
}
@@ -491,6 +513,8 @@ bool NotificationSubscriber::acceptsTagNotification(const Protocol::TagChangeNot
bool NotificationSubscriber::acceptsRelationNotification(const Protocol::RelationChangeNotification &notification) const
{
+ // Assumes mLock being locked by caller
+
Q_UNUSED(notification);
if (mAllMonitored) {
@@ -506,6 +530,8 @@ bool NotificationSubscriber::acceptsRelationNotification(const Protocol::Relatio
bool NotificationSubscriber::acceptsSubscriptionNotification(const Protocol::SubscriptionChangeNotification &notification) const
{
+ // Assumes mLock being locked by caller
+
Q_UNUSED(notification);
// Unlike other types, subscription notifications must be explicitly enabled
@@ -515,6 +541,8 @@ bool NotificationSubscriber::acceptsSubscriptionNotification(const Protocol::Sub
bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotification &notification) const
{
+ // Assumes mLock being locked
+
// Uninitialized subscriber gets nothing
if (mSubscriber.isEmpty()) {
return false;
@@ -544,9 +572,12 @@ bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotificat
void NotificationSubscriber::notify(const Protocol::ChangeNotification::List &notifications)
{
+ QMutexLocker locker(&mLock);
+
Q_FOREACH (const auto &notification, notifications) {
if (acceptsNotification(notification)) {
- writeNotification(notification);
+ QMetaObject::invokeMethod(this, "writeNotification", Qt::QueuedConnection,
+ Q_ARG(Akonadi::Protocol::ChangeNotification, notification));
}
}
}
diff --git a/src/server/notificationsubscriber.h b/src/server/notificationsubscriber.h
index c5ae061..2e3af8e 100644
--- a/src/server/notificationsubscriber.h
+++ b/src/server/notificationsubscriber.h
@@ -23,6 +23,7 @@
#include <QObject>
#include <QByteArray>
#include <QMimeDatabase>
+#include <QMutex>
#include <private/protocol_p.h>
#include "entities.h"
@@ -67,12 +68,16 @@ private:
bool isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const;
Protocol::ChangeNotification toChangeNotification() const;
+
+protected Q_SLOTS:
+ virtual void writeNotification(const Akonadi::Protocol::ChangeNotification &notification);
+
protected:
explicit NotificationSubscriber(NotificationManager *manager = Q_NULLPTR);
- virtual void writeNotification(const Protocol::ChangeNotification &notification);
void writeCommand(qint64 tag, const Protocol::Command &cmd);
+ mutable QMutex mLock;
NotificationManager *mManager;
QLocalSocket *mSocket;
QByteArray mSubscriber;