aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Vrátil <dvratil@kde.org>2016-03-20 10:59:13 (GMT)
committerDaniel Vrátil <dvratil@kde.org>2016-08-15 20:01:12 (GMT)
commit4a3d8d206c2a6b0032fba8793569b6e721fc757a (patch)
tree3edc44a33871f092bb7bb4b92b04f6bea5aa3a55
parent040ce64b962e738f726d3478f57783feb3ddb02c (diff)
Implement SubscriptionChangeNotification to monitor subscribers
-rw-r--r--src/core/CMakeLists.txt2
-rw-r--r--src/core/subscriptionmonitor.cpp247
-rw-r--r--src/core/subscriptionmonitor.h123
-rw-r--r--src/private/protocol.cpp443
-rw-r--r--src/private/protocol_p.h100
-rw-r--r--src/server/notificationmanager.cpp2
-rw-r--r--src/server/notificationmanager.h2
-rw-r--r--src/server/notificationsubscriber.cpp107
-rw-r--r--src/server/notificationsubscriber.h8
9 files changed, 1000 insertions, 34 deletions
diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt
index 366dd35..2dca11d 100644
--- a/src/core/CMakeLists.txt
+++ b/src/core/CMakeLists.txt
@@ -61,6 +61,7 @@ set(akonadicore_base_SRCS
sessionthread.cpp
specialcollectionattribute.cpp
specialcollections.cpp
+ subscriptionmonitor.cpp
tag.cpp
tagattribute.cpp
tagfetchscope.cpp
@@ -114,6 +115,7 @@ ecm_generate_headers(AkonadiCore_base_HEADERS
SpecialCollections
SpecialCollectionAttribute
Supertrait
+ SubscriptionMonitor
Tag
TagAttribute
TagFetchScope
diff --git a/src/core/subscriptionmonitor.cpp b/src/core/subscriptionmonitor.cpp
new file mode 100644
index 0000000..5b0bb84
--- /dev/null
+++ b/src/core/subscriptionmonitor.cpp
@@ -0,0 +1,247 @@
+/*
+ Copyright (c) 2016 Daniel Vrátil <dvratil@kde.org>
+
+ This library is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Library General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or (at your
+ option) any later version.
+
+ This library is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
+ License for more details.
+
+ You should have received a copy of the GNU Library General Public License
+ along with this library; see the file COPYING.LIB. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA.
+*/
+
+#include "subscriptionmonitor.h"
+#include "monitor_p.h"
+
+using namespace Akonadi;
+
+namespace Akonadi
+{
+
+class SubscriberPrivate : public QSharedData
+{
+public:
+ SubscriberPrivate()
+ : QSharedData()
+ , allMonitored(false)
+ , isExclusive(false)
+ {}
+
+ SubscriberPrivate(const SubscriberPrivate &other)
+ : QSharedData(other)
+ , name(other.name)
+ , sessionId(other.sessionId)
+ , collections(other.collections)
+ , items(other.items)
+ , tags(other.tags)
+ , types(other.types)
+ , mimeTypes(other.mimeTypes)
+ , resources(other.resources)
+ , ignoredSessions(other.ignoredSessions)
+ , allMonitored(other.allMonitored)
+ , isExclusive(other.isExclusive)
+ {}
+
+ QByteArray name;
+ QByteArray sessionId;
+ QSet<Collection::Id> collections;
+ QSet<Item::Id> items;
+ QSet<Tag::Id> tags;
+ QSet<Subscriber::Type> types;
+ QSet<QString> mimeTypes;
+ QSet<QByteArray> resources;
+ QSet<QByteArray> ignoredSessions;
+ bool allMonitored;
+ bool isExclusive;
+};
+
+class SubscriptionMonitorPrivate : public MonitorPrivate
+{
+public:
+ SubscriptionMonitorPrivate(SubscriptionMonitor *qq)
+ : MonitorPrivate(Q_NULLPTR, qq)
+ {
+ pendingModification.startMonitoringType(Protocol::ModifySubscriptionCommand::SubscriptionChanges);
+ }
+
+ void slotNotify(const Protocol::ChangeNotification &msg) Q_DECL_OVERRIDE
+ {
+ if (msg.type() == Protocol::Command::SubscriptionChangeNotification) {
+ emitSubscriptionNotification(msg);
+ } else {
+ MonitorPrivate::slotNotify(msg);
+ }
+ }
+
+ void emitSubscriptionNotification(const Protocol::SubscriptionChangeNotification &msg)
+ {
+ Q_Q(SubscriptionMonitor);
+
+ Subscriber subscriber;
+ subscriber.d->name = msg.subscriber();
+ if (!msg.isRemove()) {
+ // NOTE: I have slightly over-designed the change notifications to
+ // have the "added" and "removed" API without realizing we don't really
+ // need that in the client API. The server does not use the "removed"
+ // API either but if we need that in the future, we can easily change
+ // it to use it (i.e. to send incremental changes rather than current
+ // full state of subscription) and adjust the signal APIs in this class
+ subscriber.d->sessionId = msg.sessionId();
+ subscriber.d->collections = msg.addedCollections();
+ subscriber.d->items = msg.addedItems();
+ subscriber.d->tags = msg.addedTags();
+ Q_FOREACH (Protocol::ModifySubscriptionCommand::ChangeType type, msg.addedTypes()) {
+ subscriber.d->types.insert(static_cast<Subscriber::Type>(type));
+ }
+ subscriber.d->mimeTypes = msg.addedMimeTypes();
+ subscriber.d->resources = msg.addedResources();
+ subscriber.d->ignoredSessions = msg.addedIgnoredSessions();
+ subscriber.d->allMonitored = msg.isAllMonitored();
+ subscriber.d->isExclusive = msg.isExclusive();
+ }
+
+ switch (msg.operation()) {
+ case Protocol::SubscriptionChangeNotification::Add:
+ Q_EMIT q->subscriptionAdded(subscriber);
+ break;
+ case Protocol::SubscriptionChangeNotification::Modify:
+ Q_EMIT q->subscriptionChanged(subscriber);
+ break;
+ case Protocol::SubscriptionChangeNotification::Remove:
+ Q_EMIT q->subscriptionRemoved(subscriber.name());
+ break;
+ default:
+ Q_ASSERT_X(false, __FUNCTION__, "Unknown operation type");
+ break;
+ }
+ }
+
+private:
+ Q_DECLARE_PUBLIC(SubscriptionMonitor)
+};
+
+}
+
+Subscriber::Subscriber()
+ : d(new SubscriberPrivate)
+{
+}
+
+Subscriber::Subscriber(const Subscriber &other)
+ : d(other.d)
+{
+}
+
+Subscriber::~Subscriber()
+{
+}
+
+Subscriber &Subscriber::operator=(const Subscriber &other)
+{
+ if (*this == other) {
+ return *this;
+ }
+ d = other.d;
+ return *this;
+}
+
+bool Subscriber::operator==(const Subscriber &other) const
+{
+ return d->name == other.d->name
+ && d->sessionId == other.d->sessionId
+ && d->collections == other.d->collections
+ && d->items == other.d->items
+ && d->tags == other.d->tags
+ && d->types == other.d->types
+ && d->mimeTypes == other.d->mimeTypes
+ && d->resources == other.d->resources
+ && d->ignoredSessions == other.d->ignoredSessions
+ && d->allMonitored == other.d->allMonitored
+ && d->isExclusive == other.d->isExclusive;
+}
+
+QByteArray Subscriber::name() const
+{
+ return d->name;
+}
+
+QByteArray Subscriber::sessionId() const
+{
+ return d->sessionId;
+}
+
+QSet<Collection::Id> Subscriber::monitoredCollections() const
+{
+ return d->collections;
+}
+
+QSet<Item::Id> Subscriber::monitoredItems() const
+{
+ return d->items;
+}
+
+QSet<Tag::Id> Subscriber::monitoredTags() const
+{
+ return d->tags;
+}
+
+QSet<Subscriber::Type> Subscriber::monitoredTypes() const
+{
+ return d->types;
+}
+
+QSet<QString> Subscriber::monitoredMimeTypes() const
+{
+ return d->mimeTypes;
+}
+
+QSet<QByteArray> Subscriber::monitoredResources() const
+{
+ return d->resources;
+}
+
+QSet<QByteArray> Subscriber::ignoredSessions() const
+{
+ return d->ignoredSessions;
+}
+
+bool Subscriber::monitorsAll() const
+{
+ return d->allMonitored;
+}
+
+bool Subscriber::isExclusive() const
+{
+ return d->isExclusive;
+}
+
+
+
+
+SubscriptionMonitor::SubscriptionMonitor(QObject *parent)
+ : Monitor(new SubscriptionMonitorPrivate(this), parent)
+{
+}
+
+SubscriptionMonitor::~SubscriptionMonitor()
+{
+}
+
+void SubscriptionMonitor::setSession(Akonadi::Session *session)
+{
+ Monitor::setSession(session);
+}
+
+Session *SubscriptionMonitor::session() const
+{
+ return Monitor::session();
+}
+
+
diff --git a/src/core/subscriptionmonitor.h b/src/core/subscriptionmonitor.h
new file mode 100644
index 0000000..525977f
--- /dev/null
+++ b/src/core/subscriptionmonitor.h
@@ -0,0 +1,123 @@
+/*
+ Copyright (c) 2016 Daniel Vrátil <dvratil@kde.org>
+
+ This library is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Library General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or (at your
+ option) any later version.
+
+ This library is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
+ License for more details.
+
+ You should have received a copy of the GNU Library General Public License
+ along with this library; see the file COPYING.LIB. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA.
+*/
+
+#ifndef AKONADI_SUBSCRIPTIONMONITOR
+#define AKONADI_SUBSCRIPTIONMONITOR
+
+#include "monitor.h"
+#include "akonadicore_export.h"
+
+
+namespace Akonadi
+{
+
+class SubscriptionMonitorPrivate;
+class SubscriberPrivate;
+class AKONADICORE_EXPORT Subscriber
+{
+public:
+ enum Type {
+ InvalidType,
+ Collection,
+ Item,
+ Tag,
+ Relation,
+ Subscription
+ };
+
+ explicit Subscriber();
+ Subscriber(const Subscriber &other);
+ ~Subscriber();
+
+ Subscriber &operator=(const Subscriber &other);
+ bool operator==(const Subscriber &other) const;
+
+ QByteArray name() const;
+ QByteArray sessionId() const;
+
+ QSet<Collection::Id> monitoredCollections() const;
+ QSet<Item::Id> monitoredItems() const;
+ QSet<Tag::Id> monitoredTags() const;
+ QSet<Type> monitoredTypes() const;
+ QSet<QString> monitoredMimeTypes() const;
+ QSet<QByteArray> monitoredResources() const;
+ QSet<QByteArray> ignoredSessions() const;
+ bool monitorsAll() const;
+ bool isExclusive() const;
+
+private:
+ QSharedDataPointer<SubscriberPrivate> d;
+
+ friend class SubscriptionMonitorPrivate;
+};
+
+class SubscriptionMonitorPrivate;
+/**
+ * @short Monitors change notification subscription changes
+ *
+ * The SubscriptionMonitor is a specialized Monitor which monitors only for
+ * changes in notification subscriptions.
+ *
+ * This class is only useful for developer tools like Akonadi Console which
+ * want to show and introspect the notification system and should not ever be
+ * user in normal applications.
+ *
+ * @since 16.04
+ */
+class AKONADICORE_EXPORT SubscriptionMonitor : protected Monitor
+{
+ Q_OBJECT
+
+public:
+ explicit SubscriptionMonitor(QObject *parent = Q_NULLPTR);
+
+ ~SubscriptionMonitor();
+
+ void setSession(Session *session);
+
+ Session *session() const;
+
+Q_SIGNALS:
+ /**
+ * This signal is emitted when a new Monitor subscribes to notifications.
+ * Once this monitor is set up and registered to Akonadi it will also be
+ * emitted once for each existing subscriber so that applications can
+ * initially populate their list of subscribers.
+ */
+ void subscriptionAdded(const Subscriber &subscription);
+
+ /**
+ * This signal is emitted when an existing subscriber changes its subscription
+ * settings.
+ */
+ void subscriptionChanged(const Subscriber &subscription);
+
+ /**
+ * This signal is emitted when an existing subscriber unsubscribes from the
+ * server, i.e. when a Monitor is destroyed.
+ */
+ void subscriptionRemoved(const QByteArray &subscriber);
+
+private:
+ friend class SubscriptionMonitorPrivate;
+};
+
+}
+
+#endif
diff --git a/src/private/protocol.cpp b/src/private/protocol.cpp
index d0351f9..285df44 100644
--- a/src/private/protocol.cpp
+++ b/src/private/protocol.cpp
@@ -140,6 +140,8 @@ QDebug operator<<(QDebug _dbg, Akonadi::Protocol::Command::Type type)
return dbg << "TagChangeNotification";
case Akonadi::Protocol::Command::RelationChangeNotification:
return dbg << "RelationChangeNotification";
+ case Akonadi::Protocol::Command::SubscriptionChangeNotification:
+ return dbg << "SubscriptionChangeNotification";
case Akonadi::Protocol::Command::CreateSubscription:
return dbg << "CreateSubscription";
case Akonadi::Protocol::Command::ModifySubscription:
@@ -8232,6 +8234,8 @@ bool ChangeNotification::isRemove() const
return static_cast<const Protocol::TagChangeNotification*>(this)->operation() == TagChangeNotification::Remove;
case Command::RelationChangeNotification:
return static_cast<const Protocol::RelationChangeNotification*>(this)->operation() == RelationChangeNotification::Remove;
+ case Command::SubscriptionChangeNotification:
+ return static_cast<const Protocol::SubscriptionChangeNotification*>(this)->operation() == SubscriptionChangeNotification::Remove;
default:
Q_ASSERT_X(false, __FUNCTION__, "Unknown ChangeNotification type");
}
@@ -8250,6 +8254,7 @@ bool ChangeNotification::isMove() const
return static_cast<const Protocol::CollectionChangeNotification*>(this)->operation() == CollectionChangeNotification::Move;
case Command::TagChangeNotification:
case Command::RelationChangeNotification:
+ case Command::SubscriptionChangeNotification:
return false;
default:
Q_ASSERT_X(false, __FUNCTION__, "Unknown ChangeNotification type");
@@ -9834,5 +9839,443 @@ ModifySubscriptionResponse::ModifySubscriptionResponse(const Command &other)
}
+
+class SubscriptionChangeNotificationPrivate : public ChangeNotificationPrivate
+{
+public:
+ SubscriptionChangeNotificationPrivate()
+ : ChangeNotificationPrivate(Command::SubscriptionChangeNotification)
+ , modifiedParts(SubscriptionChangeNotification::None)
+ , operation(SubscriptionChangeNotification::InvalidOp)
+ , isAllMonitored(false)
+ , isExclusive(false)
+ {}
+
+ SubscriptionChangeNotificationPrivate(const SubscriptionChangeNotificationPrivate &other)
+ : ChangeNotificationPrivate(other)
+ , subscriber(other.subscriber)
+ , addedCollections(other.addedCollections)
+ , removedCollections(other.removedCollections)
+ , addedItems(other.addedItems)
+ , removedItems(other.removedItems)
+ , addedTags(other.addedTags)
+ , removedTags(other.removedTags)
+ , addedTypes(other.addedTypes)
+ , removedTypes(other.removedTypes)
+ , addedMimeTypes(other.addedMimeTypes)
+ , removedMimeTypes(other.removedMimeTypes)
+ , addedResources(other.addedResources)
+ , removedResources(other.removedResources)
+ , addedIgnoredSessions(other.addedIgnoredSessions)
+ , removedIgnoredSessions(other.removedIgnoredSessions)
+ , modifiedParts(other.modifiedParts)
+ , operation(other.operation)
+ , isAllMonitored(other.isAllMonitored)
+ , isExclusive(other.isExclusive)
+ {}
+
+ bool compare(const CommandPrivate *other) const Q_DECL_OVERRIDE
+ {
+ return ChangeNotificationPrivate::compare(other)
+ && COMPARE(modifiedParts)
+ && COMPARE(operation)
+ && COMPARE(subscriber)
+ && COMPARE(addedCollections)
+ && COMPARE(removedCollections)
+ && COMPARE(addedItems)
+ && COMPARE(removedItems)
+ && COMPARE(addedTags)
+ && COMPARE(removedTags)
+ && COMPARE(addedTypes)
+ && COMPARE(removedTypes)
+ && COMPARE(addedMimeTypes)
+ && COMPARE(removedMimeTypes)
+ && COMPARE(addedResources)
+ && COMPARE(removedResources)
+ && COMPARE(addedIgnoredSessions)
+ && COMPARE(removedIgnoredSessions)
+ && COMPARE(isAllMonitored)
+ && COMPARE(isExclusive);
+ }
+
+ void debugString(DebugBlock &blck) const Q_DECL_OVERRIDE
+ {
+ ChangeNotificationPrivate::debugString(blck);
+ blck.write("Subscriber", subscriber);
+ blck.write("Operation", operation);
+ blck.write("Modified parts", modifiedParts);
+ if (modifiedParts & SubscriptionChangeNotification::Collections) {
+ blck.write("Added cols", addedCollections);
+ blck.write("Removed cols", removedCollections);
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Items) {
+ blck.write("Added items", addedItems);
+ blck.write("Removed items", removedItems);
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Tags) {
+ blck.write("Added tags", addedTags);
+ blck.write("Removed tags", removedTags);
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Types) {
+ blck.write("Added types", addedTypes);
+ blck.write("Removed types", removedTypes);
+ }
+ if (modifiedParts & SubscriptionChangeNotification::MimeTypes) {
+ blck.write("Added mimetypes", addedMimeTypes);
+ blck.write("Removed mimetypes", removedMimeTypes);
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Resources) {
+ blck.write("Added resources", addedResources);
+ blck.write("Removed resources", removedResources);
+ }
+ if (modifiedParts & SubscriptionChangeNotification::IgnoredSessions) {
+ blck.write("Added ignored sessions", addedIgnoredSessions);
+ blck.write("Removed ignored sessions", removedIgnoredSessions);
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Monitored) {
+ blck.write("All monitored", isAllMonitored);
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Exclusive) {
+ blck.write("Is exclusive", isExclusive);
+ }
+ }
+
+ DataStream &serialize(DataStream &stream) const Q_DECL_OVERRIDE
+ {
+ ChangeNotificationPrivate::serialize(stream)
+ << subscriber
+ << operation
+ << modifiedParts;
+ if (modifiedParts & SubscriptionChangeNotification::Collections) {
+ stream << addedCollections
+ << removedCollections;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Items) {
+ stream << addedItems
+ << removedItems;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Tags) {
+ stream << addedTags
+ << removedTags;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Types) {
+ stream << addedTypes
+ << removedTypes;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::MimeTypes) {
+ stream << addedMimeTypes
+ << removedMimeTypes;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Resources) {
+ stream << addedResources
+ << removedResources;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::IgnoredSessions) {
+ stream << addedIgnoredSessions
+ << removedIgnoredSessions;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Monitored) {
+ stream << isAllMonitored;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Exclusive) {
+ stream << isExclusive;
+ }
+ return stream;
+ }
+
+ DataStream &deserialize(DataStream &stream) Q_DECL_OVERRIDE
+ {
+ ChangeNotificationPrivate::deserialize(stream)
+ >> subscriber
+ >> operation
+ >> modifiedParts;
+ if (modifiedParts & SubscriptionChangeNotification::Collections) {
+ stream >> addedCollections
+ >> removedCollections;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Items) {
+ stream >> addedItems
+ >> removedItems;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Tags) {
+ stream >> addedTags
+ >> removedTags;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Types) {
+ stream >> addedTypes
+ >> removedTypes;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::MimeTypes) {
+ stream >> addedMimeTypes
+ >> removedMimeTypes;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Resources) {
+ stream >> addedResources
+ >> removedResources;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::IgnoredSessions) {
+ stream >> addedIgnoredSessions
+ >> removedIgnoredSessions;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Monitored) {
+ stream >> isAllMonitored;
+ }
+ if (modifiedParts & SubscriptionChangeNotification::Exclusive) {
+ stream >> isExclusive;
+ }
+ return stream;
+ }
+
+ CommandPrivate *clone() const Q_DECL_OVERRIDE
+ {
+ return new SubscriptionChangeNotificationPrivate(*this);
+ }
+
+ QByteArray subscriber;
+ QSet<ChangeNotification::Id> addedCollections;
+ QSet<ChangeNotification::Id> removedCollections;
+ QSet<ChangeNotification::Id> addedItems;
+ QSet<ChangeNotification::Id> removedItems;
+ QSet<ChangeNotification::Id> addedTags;
+ QSet<ChangeNotification::Id> removedTags;
+ QSet<ModifySubscriptionCommand::ChangeType> addedTypes;
+ QSet<ModifySubscriptionCommand::ChangeType> removedTypes;
+ QSet<QString> addedMimeTypes;
+ QSet<QString> removedMimeTypes;
+ QSet<QByteArray> addedResources;
+ QSet<QByteArray> removedResources;
+ QSet<QByteArray> addedIgnoredSessions;
+ QSet<QByteArray> removedIgnoredSessions;
+ SubscriptionChangeNotification::ModifiedParts modifiedParts;
+ SubscriptionChangeNotification::Operation operation;
+ bool isAllMonitored;
+ bool isExclusive;
+};
+
+AKONADI_DECLARE_PRIVATE(SubscriptionChangeNotification)
+
+SubscriptionChangeNotification::SubscriptionChangeNotification()
+ : ChangeNotification(new SubscriptionChangeNotificationPrivate())
+{
+}
+
+SubscriptionChangeNotification::SubscriptionChangeNotification(const Command &other)
+ : ChangeNotification(other)
+{
+ checkCopyInvariant(SubscriptionChangeNotification);
+}
+
+SubscriptionChangeNotification::Operation SubscriptionChangeNotification::operation() const
+{
+ return d_func()->operation;
+}
+
+void SubscriptionChangeNotification::setOperation(Operation operation)
+{
+ d_func()->operation = operation;
+}
+
+SubscriptionChangeNotification::ModifiedParts SubscriptionChangeNotification::modifiedParts() const
+{
+ return d_func()->modifiedParts;
+}
+
+QByteArray SubscriptionChangeNotification::subscriber() const
+{
+ return d_func()->subscriber;
+}
+
+void SubscriptionChangeNotification::setSubscriber(const QByteArray &subscriber)
+{
+ d_func()->subscriber = subscriber;
+}
+
+QSet<ChangeNotification::Id> SubscriptionChangeNotification::addedCollections() const
+{
+ return d_func()->addedCollections;
+}
+
+void SubscriptionChangeNotification::setAddedCollections(const QSet<Id> &addedCols)
+{
+ d_func()->addedCollections = addedCols;
+ d_func()->modifiedParts |= Collections;
+}
+
+QSet<ChangeNotification::Id> SubscriptionChangeNotification::removedCollections() const
+{
+ return d_func()->removedCollections;
+}
+
+void SubscriptionChangeNotification::setRemovedCollections(const QSet<Id> &removedCols)
+{
+ d_func()->removedCollections = removedCols;
+ d_func()->modifiedParts|= Collections;
+}
+
+QSet<ChangeNotification::Id> SubscriptionChangeNotification::addedItems() const
+{
+ return d_func()->addedItems;
+}
+
+void SubscriptionChangeNotification::setAddedItems(const QSet<Id> &addedItems)
+{
+ d_func()->addedItems = addedItems;
+ d_func()->modifiedParts |= Items;
+}
+
+QSet<ChangeNotification::Id> SubscriptionChangeNotification::removedItems() const
+{
+ return d_func()->removedItems;
+}
+
+void SubscriptionChangeNotification::setRemovedItems(const QSet<Id> &removedItems)
+{
+ d_func()->removedItems = removedItems;
+ d_func()->modifiedParts |= Items;
+}
+
+QSet<ChangeNotification::Id> SubscriptionChangeNotification::addedTags() const
+{
+ return d_func()->addedTags;
+}
+
+void SubscriptionChangeNotification::setAddedTags(const QSet<Id> &addedTags)
+{
+ d_func()->addedTags = addedTags;
+ d_func()->modifiedParts |= Tags;
+}
+
+QSet<ChangeNotification::Id> SubscriptionChangeNotification::removedTags() const
+{
+ return d_func()->removedTags;
+}
+
+void SubscriptionChangeNotification::setRemovedTags(const QSet<Id> &removedTags)
+{
+ d_func()->removedTags = removedTags;
+ d_func()->modifiedParts |= Tags;
+}
+
+QSet<ModifySubscriptionCommand::ChangeType> SubscriptionChangeNotification::addedTypes() const
+{
+ return d_func()->addedTypes;
+}
+
+void SubscriptionChangeNotification::setAddedTypes(const QSet<ModifySubscriptionCommand::ChangeType> &addedTypes)
+{
+ d_func()->addedTypes = addedTypes;
+ d_func()->modifiedParts |= Types;
+}
+
+QSet<ModifySubscriptionCommand::ChangeType> SubscriptionChangeNotification::removedTypes() const
+{
+ return d_func()->removedTypes;
+}
+
+void SubscriptionChangeNotification::setRemovedTypes(const QSet<ModifySubscriptionCommand::ChangeType> &removedTypes)
+{
+ d_func()->removedTypes = removedTypes;
+ d_func()->modifiedParts |= Types;
+}
+
+QSet<QString> SubscriptionChangeNotification::addedMimeTypes() const
+{
+ return d_func()->addedMimeTypes;
+}
+
+void SubscriptionChangeNotification::setAddedMimeTypes(const QSet<QString> &addedMt)
+{
+ d_func()->addedMimeTypes = addedMt;
+ d_func()->modifiedParts |= MimeTypes;
+}
+
+QSet<QString> SubscriptionChangeNotification::removedMimeTypes() const
+{
+ return d_func()->removedMimeTypes;
+}
+
+void SubscriptionChangeNotification::setRemovedMimeTypes(const QSet<QString> &removedMt)
+{
+ d_func()->removedMimeTypes = removedMt;
+ d_func()->modifiedParts |= MimeTypes;
+}
+
+QSet<QByteArray> SubscriptionChangeNotification::addedResources() const
+{
+ return d_func()->addedResources;
+}
+
+void SubscriptionChangeNotification::setAddedResources(const QSet<QByteArray> &addedRes)
+{
+ d_func()->addedResources = addedRes;
+ d_func()->modifiedParts |= Resources;
+}
+
+QSet<QByteArray> SubscriptionChangeNotification::removedResources() const
+{
+ return d_func()->removedResources;
+}
+
+void SubscriptionChangeNotification::setRemovedResources(const QSet<QByteArray> &removedRes)
+{
+ d_func()->removedResources = removedRes;
+ d_func()->modifiedParts |= Resources;
+}
+
+QSet<QByteArray> SubscriptionChangeNotification::addedIgnoredSessions() const
+{
+ return d_func()->addedIgnoredSessions;
+}
+
+void SubscriptionChangeNotification::setAddedIgnoredSessions(const QSet<QByteArray> &addedSessions)
+{
+ d_func()->addedIgnoredSessions = addedSessions;
+ d_func()->modifiedParts |= IgnoredSessions;
+}
+
+QSet<QByteArray> SubscriptionChangeNotification::removedIgnoredSessions() const
+{
+ return d_func()->removedIgnoredSessions;
+}
+
+void SubscriptionChangeNotification::setRemovedIgnoredSessions(const QSet<QByteArray> &removedSessions)
+{
+ d_func()->removedIgnoredSessions = removedSessions;
+ d_func()->modifiedParts |= IgnoredSessions;
+}
+
+bool SubscriptionChangeNotification::isAllMonitored() const
+{
+ return d_func()->isAllMonitored;
+}
+
+void SubscriptionChangeNotification::setAllMonitored(bool allMonitored)
+{
+ d_func()->isAllMonitored = allMonitored;
+ d_func()->modifiedParts |= Monitored;
+}
+
+bool SubscriptionChangeNotification::isExclusive() const
+{
+ return d_func()->isExclusive;
+}
+
+void SubscriptionChangeNotification::setExclusive(bool isExclusive)
+{
+ d_func()->isExclusive = isExclusive;
+ d_func()->modifiedParts |= Exclusive;
+}
+
+DataStream &operator<<(DataStream &stream, const SubscriptionChangeNotification &ntf)
+{
+ return ntf.d_func()->serialize(stream);
+}
+
+DataStream &operator>>(DataStream &stream, SubscriptionChangeNotification &ntf)
+{
+ return ntf.d_func()->deserialize(stream);
+}
+
+
} // namespace Protocol
} // namespace Akonadi
diff --git a/src/private/protocol_p.h b/src/private/protocol_p.h
index 2ee5a36..bee1cad 100644
--- a/src/private/protocol_p.h
+++ b/src/private/protocol_p.h
@@ -135,8 +135,10 @@ public:
CollectionChangeNotification,
TagChangeNotification,
RelationChangeNotification,
+ SubscriptionChangeNotification,
CreateSubscription,
ModifySubscription,
+ FetchSubscriptions,
_ResponseBit = 0x80 // reserved
};
@@ -2291,7 +2293,7 @@ public:
};
explicit RelationChangeNotification();
- RelationChangeNotification(const Command& other);
+ RelationChangeNotification(const Command &other);
Operation operation() const;
void setOperation(Operation operation);
@@ -2317,6 +2319,7 @@ private:
+
class CreateSubscriptionCommandPrivate;
class AKONADIPRIVATE_EXPORT CreateSubscriptionCommand : public Command
{
@@ -2372,7 +2375,8 @@ public:
ItemChanges,
CollectionChanges,
TagChanges,
- RelationChanges
+ RelationChanges,
+ SubscriptionChanges
};
explicit ModifySubscriptionCommand();
@@ -2439,7 +2443,6 @@ private:
};
-
class ModifySubscriptionResponse;
class AKONADIPRIVATE_EXPORT ModifySubscriptionResponse : public Response
{
@@ -2453,6 +2456,97 @@ private:
};
+
+
+class SubscriptionChangeNotificationPrivate;
+class AKONADIPRIVATE_EXPORT SubscriptionChangeNotification : public ChangeNotification
+{
+public:
+ enum Operation {
+ InvalidOp,
+ Add,
+ Modify,
+ Remove
+ };
+
+ enum ModifiedPart {
+ None = 0,
+ Collections = 1 << 0,
+ Items = 1 << 1,
+ Tags = 1 << 2,
+ Types = 1 << 3,
+ MimeTypes = 1 << 4,
+ Resources = 1 << 5,
+ IgnoredSessions = 1 << 6,
+ Monitored = 1 << 7,
+ Exclusive = 1 << 8
+ };
+ Q_DECLARE_FLAGS(ModifiedParts, ModifiedPart)
+
+ explicit SubscriptionChangeNotification();
+ SubscriptionChangeNotification(const Command &other);
+
+ Operation operation() const;
+ void setOperation(Operation operation);
+
+ ModifiedParts modifiedParts() const;
+
+ QByteArray subscriber() const;
+ void setSubscriber(const QByteArray &subscriber);
+
+ QSet<Id> addedCollections() const;
+ void setAddedCollections(const QSet<Id> &addedCols);
+ QSet<Id> removedCollections() const;
+ void setRemovedCollections(const QSet<Id> &removedCols);
+
+ QSet<Id> addedItems() const;
+ void setAddedItems(const QSet<Id> &addedItems);
+ QSet<Id> removedItems() const;
+ void setRemovedItems(const QSet<Id> &removedItems);
+
+ QSet<Id> addedTags() const;
+ void setAddedTags(const QSet<Id> &addedTags);
+ QSet<Id> removedTags() const;
+ void setRemovedTags(const QSet<Id> &removedTags);
+
+ QSet<ModifySubscriptionCommand::ChangeType> addedTypes() const;
+ void setAddedTypes(const QSet<ModifySubscriptionCommand::ChangeType> &addedTypes);
+ QSet<ModifySubscriptionCommand::ChangeType> removedTypes() const;
+ void setRemovedTypes(const QSet<ModifySubscriptionCommand::ChangeType> &removedTypes);
+
+ QSet<QString> addedMimeTypes() const;
+ void setAddedMimeTypes(const QSet<QString> &addedMimeTypes);
+ QSet<QString> removedMimeTypes() const;
+ void setRemovedMimeTypes(const QSet<QString> &removedMimeTypes);
+
+ QSet<QByteArray> addedResources() const;
+ void setAddedResources(const QSet<QByteArray> &addedResources);
+ QSet<QByteArray> removedResources() const;
+ void setRemovedResources(const QSet<QByteArray> &removedResouces);
+
+ QSet<QByteArray> addedIgnoredSessions() const;
+ void setAddedIgnoredSessions(const QSet<QByteArray> &ignoredSessions);
+ QSet<QByteArray> removedIgnoredSessions() const;
+ void setRemovedIgnoredSessions(const QSet<QByteArray> &ignoredSessions);
+
+ bool isAllMonitored() const;
+ void setAllMonitored(bool allMonitored);
+
+ bool isExclusive() const;
+ void setExclusive(bool exclusive);
+
+private:
+ AKONADI_DECLARE_PRIVATE(SubscriptionChangeNotification)
+
+ friend DataStream &operator<<(DataStream &stream, const Akonadi::Protocol::SubscriptionChangeNotification &ntf);
+ friend DataStream &operator>>(DataStream &stream, Akonadi::Protocol::SubscriptionChangeNotification &ntf);
+};
+
+
+
+
+
+
} // namespace Protocol
} // namespace Akonadi
diff --git a/src/server/notificationmanager.cpp b/src/server/notificationmanager.cpp
index ff8d6fa..8c622a1 100644
--- a/src/server/notificationmanager.cpp
+++ b/src/server/notificationmanager.cpp
@@ -68,7 +68,7 @@ void NotificationManager::registerConnection(quintptr socketDescriptor)
{
Q_ASSERT(thread() == QThread::currentThread());
- NotificationSubscriber *subscriber = new NotificationSubscriber(socketDescriptor);
+ NotificationSubscriber *subscriber = new NotificationSubscriber(this, socketDescriptor);
qDebug() << "NotificationManager: new connection (registered as" << subscriber << ")";
connect(subscriber, &QObject::destroyed,
[this, subscriber]() {
diff --git a/src/server/notificationmanager.h b/src/server/notificationmanager.h
index bd2597c..44a701f 100644
--- a/src/server/notificationmanager.h
+++ b/src/server/notificationmanager.h
@@ -63,7 +63,7 @@ private:
QVector<NotificationSubscriber *> mSubscribers;
- friend class NotificationSource;
+ friend class NotificationSubscriber;
friend class ::NotificationManagerTest;
};
diff --git a/src/server/notificationsubscriber.cpp b/src/server/notificationsubscriber.cpp
index 2c94fec..c21a4f7 100644
--- a/src/server/notificationsubscriber.cpp
+++ b/src/server/notificationsubscriber.cpp
@@ -19,6 +19,7 @@
#include "notificationsubscriber.h"
#include "akonadiserver_debug.h"
+#include "notificationmanager.h"
#include "collectionreferencemanager.h"
#include <QLocalSocket>
@@ -32,8 +33,9 @@ using namespace Akonadi::Server;
QMimeDatabase NotificationSubscriber::sMimeDatabase;
-NotificationSubscriber::NotificationSubscriber()
+NotificationSubscriber::NotificationSubscriber(NotificationManager *manager)
: QObject()
+ , mManager(manager)
, mSocket(Q_NULLPTR)
, mAllMonitored(false)
, mExclusive(false)
@@ -41,8 +43,8 @@ NotificationSubscriber::NotificationSubscriber()
}
-NotificationSubscriber::NotificationSubscriber(quintptr socketDescriptor)
- : NotificationSubscriber()
+NotificationSubscriber::NotificationSubscriber(NotificationManager *manager, quintptr socketDescriptor)
+ : NotificationSubscriber(manager)
{
mSocket = new QLocalSocket(this);
connect(mSocket, &QLocalSocket::readyRead,
@@ -120,6 +122,14 @@ void NotificationSubscriber::socketDisconnected()
void NotificationSubscriber::disconnectSubscriber()
{
+ if (mManager) {
+ Protocol::SubscriptionChangeNotification changeNtf;
+ changeNtf.setSubscriber(mSubscriber);
+ changeNtf.setSessionId(mSession);
+ changeNtf.setOperation(Protocol::SubscriptionChangeNotification::Remove);
+ mManager->slotNotify({ changeNtf });
+ }
+
disconnect(mSocket, &QLocalSocket::readyRead,
this, &NotificationSubscriber::socketReadyRead);
disconnect(mSocket, &QLocalSocket::disconnected,
@@ -132,6 +142,14 @@ void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscripti
{
qDebug() << "Subscriber identified:" << command.subscriberName();
mSubscriber = command.subscriberName();
+
+ if (mManager) {
+ Protocol::SubscriptionChangeNotification changeNtf;
+ changeNtf.setSubscriber(mSubscriber);
+ changeNtf.setSessionId(mSession);
+ changeNtf.setOperation(Protocol::SubscriptionChangeNotification::Add);
+ mManager->slotNotify({ changeNtf });
+ }
}
void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscriptionCommand &command)
@@ -149,78 +167,101 @@ void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscripti
Q_FOREACH (const auto &entity, newItems) { \
set.insert(entity); \
}
+
#define REMOVE(set, items) \
Q_FOREACH (const auto &entity, items) { \
- set.remove(entity); \
+ set.insert(entity); \
}
- qCDebug(AKONADISERVER_LOG) << "Subscription for" << mSubscriber << "updated:";
if (START_MONITORING(Types)) {
APPEND(mMonitoredTypes, command.startMonitoringTypes())
- qCDebug(AKONADISERVER_LOG) << "\tStart monitoring types:" << command.startMonitoringTypes();
}
if (STOP_MONITORING(Types)) {
- REMOVE(mMonitoredTypes, command.stopMonitoringTypes());
- qCDebug(AKONADISERVER_LOG) << "\tStop monitoring types:" << command.stopMonitoringTypes();
+ REMOVE(mMonitoredTypes, command.stopMonitoringTypes())
}
if (START_MONITORING(Collections)) {
APPEND(mMonitoredCollections, command.startMonitoringCollections())
- qCDebug(AKONADISERVER_LOG) << "\tStart monitoring collections:" << command.startMonitoringCollections();
}
if (STOP_MONITORING(Collections)) {
REMOVE(mMonitoredCollections, command.stopMonitoringCollections())
- qCDebug(AKONADISERVER_LOG) << "\tStop monitoring collections:" << command.stopMonitoringCollections();
}
if (START_MONITORING(Items)) {
APPEND(mMonitoredItems, command.startMonitoringItems())
- qCDebug(AKONADISERVER_LOG) << "\tStart monitoring items:" << command.startMonitoringItems();
}
if (STOP_MONITORING(Items)) {
REMOVE(mMonitoredItems, command.stopMonitoringItems())
- qCDebug(AKONADISERVER_LOG) << "\tStop monitoring items:" << command.stopMonitoringItems();
}
if (START_MONITORING(Tags)) {
APPEND(mMonitoredTags, command.startMonitoringTags())
- qCDebug(AKONADISERVER_LOG) << "\tStart monitoring tags:" << command.startMonitoringTags();
}
if (STOP_MONITORING(Tags)) {
REMOVE(mMonitoredTags, command.stopMonitoringTags())
- qCDebug(AKONADISERVER_LOG) << "\tStop monitoring tags:" << command.stopMonitoringTags();
}
if (START_MONITORING(Resources)) {
APPEND(mMonitoredResources, command.startMonitoringResources())
- qCDebug(AKONADISERVER_LOG) << "\tStart monitoring resources:" << command.startMonitoringResources();
}
if (STOP_MONITORING(Resources)) {
REMOVE(mMonitoredResources, command.stopMonitoringResources())
- qCDebug(AKONADISERVER_LOG) << "\tStop monitoring resourceS:" << command.stopMonitoringResources();
}
if (START_MONITORING(MimeTypes)) {
APPEND(mMonitoredMimeTypes, command.startMonitoringMimeTypes())
- qCDebug(AKONADISERVER_LOG) << "\tStart monitoring mime types:" << command.startMonitoringMimeTypes();
}
if (STOP_MONITORING(MimeTypes)) {
REMOVE(mMonitoredMimeTypes, command.stopMonitoringMimeTypes())
- qCDebug(AKONADISERVER_LOG) << "\tStop monitoring mime types:" << command.stopMonitoringCollections();
}
if (START_MONITORING(Sessions)) {
APPEND(mIgnoredSessions, command.startIgnoringSessions())
- qCDebug(AKONADISERVER_LOG) << "\tStart ignoring sessions:" << command.startIgnoringSessions();
}
if (STOP_MONITORING(Sessions)) {
REMOVE(mIgnoredSessions, command.stopIgnoringSessions())
- qCDebug(AKONADISERVER_LOG) << "\tStop ignoring sessions:" << command.stopIgnoringSessions();
}
if (modifiedParts & Protocol::ModifySubscriptionCommand::AllFlag) {
mAllMonitored = command.allMonitored();
- qCDebug(AKONADISERVER_LOG) << "\tAll monitored:" << command.allMonitored();
}
if (modifiedParts & Protocol::ModifySubscriptionCommand::ExclusiveFlag) {
mExclusive = command.isExclusive();
- qCDebug(AKONADISERVER_LOG) << "\tExclusive:" << command.isExclusive();
+ }
+
+ if (mManager) {
+ // Did the caller just subscribed to subscription changes?
+ if ((modifiedParts & Protocol::ModifySubscriptionCommand::Types)
+ && command.startMonitoringTypes().contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges))
+ {
+ // If yes, then send them list of all existing subscribers
+ Protocol::ChangeNotification::List ntfs;
+ Q_FOREACH (const NotificationSubscriber *subscriber, mManager->mSubscribers) {
+ ntfs << subscriber->toChangeNotification();
+ }
+ // Send them back to caller
+ notify(ntfs);
+ }
+
+ // Emit subscription change notification
+ Protocol::SubscriptionChangeNotification changeNtf = toChangeNotification();
+ changeNtf.setOperation(Protocol::SubscriptionChangeNotification::Modify);
+ mManager->slotNotify({ changeNtf });
}
}
+Protocol::ChangeNotification NotificationSubscriber::toChangeNotification() const
+{
+ Protocol::SubscriptionChangeNotification ntf;
+ ntf.setSessionId(mSession);
+ ntf.setSubscriber(mSubscriber);
+ ntf.setOperation(Protocol::SubscriptionChangeNotification::Add);
+ ntf.setAddedCollections(mMonitoredCollections);
+ ntf.setAddedItems(mMonitoredItems);
+ ntf.setAddedTags(mMonitoredTags);
+ ntf.setAddedTypes(mMonitoredTypes);
+ ntf.setAddedMimeTypes(mMonitoredMimeTypes);
+ ntf.setAddedResources(mMonitoredResources);
+ ntf.setAddedIgnoredSessions(mIgnoredSessions);
+ ntf.setAllMonitored(mAllMonitored);
+ ntf.setExclusive(mExclusive);
+ return ntf;
+}
+
+
bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const
{
@@ -462,6 +503,15 @@ bool NotificationSubscriber::acceptsRelationNotification(const Protocol::Relatio
return true;
}
+bool NotificationSubscriber::acceptsSubscriptionNotification(const Protocol::SubscriptionChangeNotification &notification) const
+{
+ Q_UNUSED(notification);
+
+ // Unlike other types, subscription notifications must be explicitly enabled
+ // by caller and are excluded from "monitor all" as well
+ return mMonitoredTypes.contains(Protocol::ModifySubscriptionCommand::SubscriptionChanges);
+}
+
bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotification &notification) const
{
// session is ignored
@@ -469,15 +519,18 @@ bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotificat
return false;
}
- if (notification.type() == Protocol::Command::ItemChangeNotification) {
+ switch (notification.type()) {
+ case Protocol::Command::ItemChangeNotification:
return acceptsItemNotification(notification);
- } else if (notification.type() == Protocol::Command::CollectionChangeNotification) {
+ case Protocol::Command::CollectionChangeNotification:
return acceptsCollectionNotification(notification);
- } else if (notification.type() == Protocol::Command::TagChangeNotification) {
+ case Protocol::Command::TagChangeNotification:
return acceptsTagNotification(notification);
- } else if (notification.type() == Protocol::Command::RelationChangeNotification) {
+ case Protocol::Command::RelationChangeNotification:
return acceptsRelationNotification(notification);
- } else {
+ case Protocol::Command::SubscriptionChangeNotification:
+ return acceptsSubscriptionNotification(notification);
+ default:
qCDebug(AKONADISERVER_LOG) << "Received invalid change notification!";
return false;
}
diff --git a/src/server/notificationsubscriber.h b/src/server/notificationsubscriber.h
index f1f68e7..c5ae061 100644
--- a/src/server/notificationsubscriber.h
+++ b/src/server/notificationsubscriber.h
@@ -32,13 +32,14 @@ class QLocalSocket;
namespace Akonadi {
namespace Server {
+class NotificationManager;
class NotificationSubscriber : public QObject
{
Q_OBJECT
public:
- explicit NotificationSubscriber(quintptr socketDescriptor);
+ explicit NotificationSubscriber(NotificationManager *manager, quintptr socketDescriptor);
~NotificationSubscriber();
void notify(const Protocol::ChangeNotification::List &notifications);
@@ -58,18 +59,21 @@ private:
bool acceptsCollectionNotification(const Protocol::CollectionChangeNotification &notification) const;
bool acceptsTagNotification(const Protocol::TagChangeNotification &notification) const;
bool acceptsRelationNotification(const Protocol::RelationChangeNotification &notification) const;
+ bool acceptsSubscriptionNotification(const Protocol::SubscriptionChangeNotification &notification) const;
bool isCollectionMonitored(Entity::Id id) const;
bool isMimeTypeMonitored(const QString &mimeType) const;
bool isMoveDestinationResourceMonitored(const Protocol::ItemChangeNotification &msg) const;
bool isMoveDestinationResourceMonitored(const Protocol::CollectionChangeNotification &msg) const;
+ Protocol::ChangeNotification toChangeNotification() const;
protected:
- explicit NotificationSubscriber();
+ explicit NotificationSubscriber(NotificationManager *manager = Q_NULLPTR);
virtual void writeNotification(const Protocol::ChangeNotification &notification);
void writeCommand(qint64 tag, const Protocol::Command &cmd);
+ NotificationManager *mManager;
QLocalSocket *mSocket;
QByteArray mSubscriber;
QSet<Entity::Id> mMonitoredCollections;