aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Vrátil <dvratil@kde.org>2016-01-21 01:20:06 (GMT)
committerDaniel Vrátil <dvratil@kde.org>2016-08-15 19:55:44 (GMT)
commitf257797f1c8e10507d51696aaf9da8655c79da52 (patch)
tree9ecfaed0f0b733ab41de085bafdcbce063909d1a
parent0e478e2b0fb7ed01ddf1fd4f856c3157ed9076c9 (diff)
Implement notification subscription management via Protocol
NotificationManager now lives in separete thread. It creates NotificationSubscriber object for each new connection on NotificationServer. Each connection does not have its own thread as in case of Connection, because the communication there is minimal and we don't mind if the NotificationManager thread gets blocked for a short moment. Each Subscriber keeps state of the subscription. When a notification is delivered to NotificationManager, it will pass them to all subscribers which will then decide whether they accept the notification and eventually send it to subscribed client. Currently this breaks notification and subscriber debugging because we completely removed the DBus introspection. The plan is to introduce a subscription change notification (a type of Protocol::ChangeNotification) that clients like AkonadiConsole could subscribe to to get information about new or removed subscribers and their subscription state.
-rw-r--r--autotests/libs/fakeentitycache.h24
-rw-r--r--autotests/libs/inspectablechangerecorder.h9
-rw-r--r--autotests/libs/inspectablemonitor.h9
-rw-r--r--autotests/libs/monitornotificationtest.cpp9
-rw-r--r--autotests/private/protocoltest.cpp2
-rw-r--r--autotests/server/notificationmanagertest.cpp94
-rw-r--r--src/core/CMakeLists.txt4
-rw-r--r--src/core/changenotificationdependenciesfactory.cpp55
-rw-r--r--src/core/changenotificationdependenciesfactory_p.h5
-rw-r--r--src/core/connection.cpp (renamed from src/core/connectionthread.cpp)69
-rw-r--r--src/core/connection_p.h (renamed from src/core/connectionthread_p.h)24
-rw-r--r--src/core/monitor.cpp81
-rw-r--r--src/core/monitor.h7
-rw-r--r--src/core/monitor_p.cpp135
-rw-r--r--src/core/monitor_p.h14
-rw-r--r--src/core/notificationbus_p.cpp86
-rw-r--r--src/core/session.cpp57
-rw-r--r--src/core/session_p.h9
-rw-r--r--src/core/sessionthread.cpp86
-rw-r--r--src/core/sessionthread_p.h (renamed from src/core/notificationbus_p.h)39
-rw-r--r--src/private/protocol.cpp2
-rw-r--r--src/private/protocol_p.h1
-rw-r--r--src/server/CMakeLists.txt4
-rw-r--r--src/server/akonadi.cpp78
-rw-r--r--src/server/akonadi.h2
-rw-r--r--src/server/connection.cpp45
-rw-r--r--src/server/connection.h5
-rw-r--r--src/server/handler/login.cpp4
-rw-r--r--src/server/notificationmanager.cpp200
-rw-r--r--src/server/notificationmanager.h78
-rw-r--r--src/server/notificationsource.h156
-rw-r--r--src/server/notificationsubscriber.cpp (renamed from src/server/notificationsource.cpp)402
-rw-r--r--src/server/notificationsubscriber.h88
-rw-r--r--src/server/storage/datastore.cpp6
34 files changed, 807 insertions, 1082 deletions
diff --git a/autotests/libs/fakeentitycache.h b/autotests/libs/fakeentitycache.h
index b06f29e..b12b214 100644
--- a/autotests/libs/fakeentitycache.h
+++ b/autotests/libs/fakeentitycache.h
@@ -22,13 +22,12 @@
#include "monitor_p.h"
#include "notificationsource_p.h"
-#include "notificationbus_p.h"
-
#include "collectionfetchscope.h"
#include "itemfetchscope.h"
#include "akonaditestfake_export.h"
#include "private/protocol_p.h"
+
template<typename T, typename Cache>
class FakeEntityCache : public Cache
{
@@ -122,25 +121,27 @@ public Q_SLOTS:
}
};
-class AKONADITESTFAKE_EXPORT FakeNotificationBus : public QObject
+class AKONADITESTFAKE_EXPORT FakeNotificationConnection : public Akonadi::Connection
{
Q_OBJECT
public:
- explicit FakeNotificationBus(QObject *parent = Q_NULLPTR)
- : QObject(parent)
+ explicit FakeNotificationConnection(QObject *parent = Q_NULLPTR)
+ : Connection(Connection::NotificationConnection, "testConn", parent)
{}
- virtual ~FakeNotificationBus()
+ virtual ~FakeNotificationConnection()
{}
void emitNotify(const Akonadi::Protocol::ChangeNotification &ntf)
{
- Q_EMIT notify(ntf);
+ Q_EMIT commandReceived(3, ntf);
}
+ /*
Q_SIGNALS:
void notify(const Akonadi::Protocol::ChangeNotification &ntf);
+ */
};
class FakeMonitorDependeciesFactory : public Akonadi::ChangeNotificationDependenciesFactory
@@ -154,13 +155,8 @@ public:
{
}
- Akonadi::NotificationSource *createNotificationSource(QObject *parent) Q_DECL_OVERRIDE {
- return new Akonadi::NotificationSource(new FakeNotificationSource(parent));
- }
-
- QObject *createNotificationBus(QObject *parent, Akonadi::NotificationSource *source) Q_DECL_OVERRIDE {
- Q_UNUSED(source);
- return new FakeNotificationBus(parent);
+ Akonadi::Connection *createNotificationConnection(Akonadi::Session *parent) Q_DECL_OVERRIDE {
+ return new FakeNotificationConnection(parent);
}
Akonadi::CollectionCache *createCollectionCache(int maxCapacity, Akonadi::Session *session) Q_DECL_OVERRIDE {
diff --git a/autotests/libs/inspectablechangerecorder.h b/autotests/libs/inspectablechangerecorder.h
index 2f3e606..a742350 100644
--- a/autotests/libs/inspectablechangerecorder.h
+++ b/autotests/libs/inspectablechangerecorder.h
@@ -50,14 +50,9 @@ class AKONADITESTFAKE_EXPORT InspectableChangeRecorder : public Akonadi::ChangeR
public:
InspectableChangeRecorder(FakeMonitorDependeciesFactory *dependenciesFactory, QObject *parent = Q_NULLPTR);
- FakeNotificationSource *notifier() const
+ FakeNotificationConnection *notificationConnection() const
{
- return qobject_cast<FakeNotificationSource *>(d_ptr->notificationSource->source());
- }
-
- FakeNotificationBus *notificationBus() const
- {
- return qobject_cast<FakeNotificationBus *>(d_ptr->notificationBus);
+ return qobject_cast<FakeNotificationConnection *>(d_ptr->ntfConnection);
}
QQueue<Akonadi::Protocol::ChangeNotification> pendingNotifications() const
diff --git a/autotests/libs/inspectablemonitor.h b/autotests/libs/inspectablemonitor.h
index 0528cf3..68c6567 100644
--- a/autotests/libs/inspectablemonitor.h
+++ b/autotests/libs/inspectablemonitor.h
@@ -50,14 +50,9 @@ class AKONADITESTFAKE_EXPORT InspectableMonitor : public Akonadi::Monitor
public:
InspectableMonitor(FakeMonitorDependeciesFactory *dependenciesFactory, QObject *parent = Q_NULLPTR);
- FakeNotificationSource *notifier() const
+ FakeNotificationConnection *notificationConnection() const
{
- return qobject_cast<FakeNotificationSource *>(d_ptr->notificationSource->source());
- }
-
- FakeNotificationBus *notificationBus() const
- {
- return qobject_cast<FakeNotificationBus *>(d_ptr->notificationBus);
+ return qobject_cast<FakeNotificationConnection *>(d_ptr->ntfConnection);
}
QQueue<Akonadi::Protocol::ChangeNotification> pendingNotifications() const
diff --git a/autotests/libs/monitornotificationtest.cpp b/autotests/libs/monitornotificationtest.cpp
index 983c5b4..397a2bc 100644
--- a/autotests/libs/monitornotificationtest.cpp
+++ b/autotests/libs/monitornotificationtest.cpp
@@ -93,6 +93,9 @@ void MonitorNotificationTest::testSingleMessage_impl(MonitorImpl *monitor, FakeC
{
Q_UNUSED(itemCache)
+ // Workaround for the QTimer::singleShot() in fake monitors to happen
+ QTest::qWait(10);
+
monitor->setSession(m_fakeSession);
monitor->fetchCollection(true);
@@ -116,7 +119,7 @@ void MonitorNotificationTest::testSingleMessage_impl(MonitorImpl *monitor, FakeC
QVERIFY(monitor->pipeline().isEmpty());
QVERIFY(monitor->pendingNotifications().isEmpty());
- monitor->notificationBus()->emitNotify(msg);
+ monitor->notificationConnection()->emitNotify(msg);
QCOMPARE(monitor->pipeline().size(), 1);
QVERIFY(monitor->pendingNotifications().isEmpty());
@@ -185,7 +188,7 @@ void MonitorNotificationTest::testFillPipeline_impl(MonitorImpl *monitor, FakeCo
QVERIFY(monitor->pendingNotifications().isEmpty());
Q_FOREACH (const Protocol::ChangeNotification &ntf, list) {
- monitor->notificationBus()->emitNotify(ntf);
+ monitor->notificationConnection()->emitNotify(ntf);
}
QCOMPARE(monitor->pipeline().size(), 5);
@@ -272,7 +275,7 @@ void MonitorNotificationTest::testMonitor_impl(MonitorImpl *monitor, FakeCollect
QVERIFY(monitor->pendingNotifications().isEmpty());
Q_FOREACH (const Protocol::ChangeNotification &ntf, list) {
- monitor->notificationBus()->emitNotify(ntf);
+ monitor->notificationConnection()->emitNotify(ntf);
}
// Collection 6 is not notified, because Collection 5 has held up the pipeline
diff --git a/autotests/private/protocoltest.cpp b/autotests/private/protocoltest.cpp
index 17bb636..0398e87 100644
--- a/autotests/private/protocoltest.cpp
+++ b/autotests/private/protocoltest.cpp
@@ -488,13 +488,11 @@ void ProtocolTest::testLoginCommand()
QVERIFY(!in.isResponse());
QVERIFY(in.isValid());
in.setSessionId("MySession-123-notifications");
- in.setSessionMode(LoginCommand::NotificationBus);
const LoginCommand out = serializeAndDeserialize(in);
QVERIFY(out.isValid());
QVERIFY(!out.isResponse());
QCOMPARE(out.sessionId(), QByteArray("MySession-123-notifications"));
- QCOMPARE(out.sessionMode(), LoginCommand::NotificationBus);
QCOMPARE(out, in);
const bool notEquals = (out != in);
QVERIFY(!notEquals);
diff --git a/autotests/server/notificationmanagertest.cpp b/autotests/server/notificationmanagertest.cpp
index 7fed294..8a6efd7 100644
--- a/autotests/server/notificationmanagertest.cpp
+++ b/autotests/server/notificationmanagertest.cpp
@@ -21,7 +21,7 @@
#include "entities.h"
#include "notificationmanager.h"
-#include "notificationsource.h"
+#include "notificationsubscriber.h"
#include <QtCore/QObject>
#include <QtTest/QTest>
@@ -33,11 +33,77 @@ using namespace Akonadi::Server;
Q_DECLARE_METATYPE(QVector<QString>)
+class TestableNotificationSubscriber : public NotificationSubscriber
+{
+public:
+ TestableNotificationSubscriber()
+ : NotificationSubscriber()
+ {
+ }
+
+ void setAllMonitored(bool allMonitored)
+ {
+ mAllMonitored = allMonitored;
+ }
+
+ void setMonitoredCollection(qint64 collection, bool monitored)
+ {
+ if (monitored) {
+ mMonitoredCollections.insert(collection);
+ } else {
+ mMonitoredCollections.remove(collection);
+ }
+ }
+
+ void setMonitoredItem(qint64 item, bool monitored)
+ {
+ if (monitored) {
+ mMonitoredItems.insert(item);
+ } else {
+ mMonitoredItems.remove(item);
+ }
+ }
+
+ void setMonitoredResource(const QByteArray &resource, bool monitored)
+ {
+ if (monitored) {
+ mMonitoredResources.insert(resource);
+ } else {
+ mMonitoredResources.remove(resource);
+ }
+ }
+
+ void setMonitoredMimeType(const QString &mimeType, bool monitored)
+ {
+ if (monitored) {
+ mMonitoredMimeTypes.insert(mimeType);
+ } else {
+ mMonitoredMimeTypes.remove(mimeType);
+ }
+ }
+
+ void setIgnoredSession(const QByteArray &session, bool ignored)
+ {
+ if (ignored) {
+ mIgnoredSessions.insert(session);
+ } else {
+ mIgnoredSessions.remove(session);
+ }
+ }
+
+ void writeNotification(const Protocol::ChangeNotification &notification) Q_DECL_OVERRIDE
+ {
+ emittedNotifications << notification;
+ }
+
+ Protocol::ChangeNotification::List emittedNotifications;
+};
+
class NotificationManagerTest : public QObject
{
Q_OBJECT
- typedef QList<NotificationSource *> NSList;
+ typedef QList<NotificationSubscriber *> NSList;
private Q_SLOTS:
void testSourceFilter_data()
@@ -267,37 +333,33 @@ private Q_SLOTS:
QFETCH(Protocol::ChangeNotification, notification);
QFETCH(bool, accepted);
- NotificationManager mgr;
- NotificationSource source(QStringLiteral("testSource"), QString(), &mgr);
- mgr.registerSource(&source);
+ TestableNotificationSubscriber subscriber;
- source.setAllMonitored(allMonitored);
+ subscriber.setAllMonitored(allMonitored);
Q_FOREACH (Entity::Id id, monitoredCollections) {
- source.setMonitoredCollection(id, true);
+ subscriber.setMonitoredCollection(id, true);
}
Q_FOREACH (Entity::Id id, monitoredItems) {
- source.setMonitoredItem(id, true);
+ subscriber.setMonitoredItem(id, true);
}
Q_FOREACH (const QByteArray &res, monitoredResources) {
- source.setMonitoredResource(res, true);
+ subscriber.setMonitoredResource(res, true);
}
Q_FOREACH (const QString &mimeType, monitoredMimeTypes) {
- source.setMonitoredMimeType(mimeType, true);
+ subscriber.setMonitoredMimeType(mimeType, true);
}
Q_FOREACH (const QByteArray &session, ignoredSessions) {
- source.setIgnoredSession(session, true);
+ subscriber.setIgnoredSession(session, true);
}
- QSignalSpy spy(&source, SIGNAL(notify(Akonadi::Protocol::Command)));
Protocol::ChangeNotification::List list;
list << notification;
- mgr.slotNotify(list);
- mgr.emitPendingNotifications();
+ subscriber.notify(list);
- QCOMPARE(spy.count(), accepted ? 1 : 0);
+ QCOMPARE(subscriber.emittedNotifications.count(), accepted ? 1 : 0);
if (accepted) {
- Protocol::ChangeNotification ntf = spy.at(0).at(0).value<Protocol::Command>();
+ const Protocol::ChangeNotification ntf = subscriber.emittedNotifications.at(0);
QVERIFY(ntf.isValid());
}
}
diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt
index 2e3496b..366dd35 100644
--- a/src/core/CMakeLists.txt
+++ b/src/core/CMakeLists.txt
@@ -11,7 +11,7 @@ set(akonadicore_base_SRCS
changenotificationdependenciesfactory.cpp
changerecorder.cpp
changerecorder_p.cpp
- connectionthread.cpp
+ connection.cpp
collection.cpp
collectioncolorattribute.cpp
collectionfetchscope.cpp
@@ -46,7 +46,6 @@ set(akonadicore_base_SRCS
monitor.cpp
monitor_p.cpp
newmailnotifierattribute.cpp
- notificationbus_p.cpp
notificationsource_p.cpp
partfetcher.cpp
pastehelper.cpp
@@ -59,6 +58,7 @@ set(akonadicore_base_SRCS
searchquery.cpp
servermanager.cpp
session.cpp
+ sessionthread.cpp
specialcollectionattribute.cpp
specialcollections.cpp
tag.cpp
diff --git a/src/core/changenotificationdependenciesfactory.cpp b/src/core/changenotificationdependenciesfactory.cpp
index d9baf40..5f039fe 100644
--- a/src/core/changenotificationdependenciesfactory.cpp
+++ b/src/core/changenotificationdependenciesfactory.cpp
@@ -18,70 +18,25 @@
*/
#include "changenotificationdependenciesfactory_p.h"
-#include "KDBusConnectionPool"
-#include "notificationsource_p.h"
-#include "notificationbus_p.h"
-#include "notificationsourceinterface.h"
-#include "notificationmanagerinterface.h"
+#include "sessionthread_p.h"
+#include "connection_p.h"
#include "changemediator_p.h"
#include "servermanager.h"
#include "akonadicore_debug.h"
+#include "session_p.h"
#include <KRandom>
#include <qdbusextratypes.h>
using namespace Akonadi;
-NotificationSource *ChangeNotificationDependenciesFactory::createNotificationSource(QObject *parent)
+Connection *ChangeNotificationDependenciesFactory::createNotificationConnection(Session *session)
{
if (!Akonadi::ServerManager::self()->isRunning()) {
return 0;
}
- org::freedesktop::Akonadi::NotificationManager *manager =
- new org::freedesktop::Akonadi::NotificationManager(
- ServerManager::serviceName(Akonadi::ServerManager::Server),
- QStringLiteral("/notifications"),
- KDBusConnectionPool::threadConnection());
-
- if (!manager) {
- // :TODO: error handling
- return 0;
- }
-
- const QString name =
- QStringLiteral("%1_%2_%3").arg(
- QCoreApplication::applicationName(),
- QString::number(QCoreApplication::applicationPid()),
- KRandom::randomString(6));
- QDBusObjectPath p = manager->subscribe(name, false);
- const bool validError = manager->lastError().isValid();
- if (validError) {
- qCWarning(AKONADICORE_LOG) << manager->lastError().name() << manager->lastError().message();
- // :TODO: What to do?
- delete manager;
- return 0;
- }
- delete manager;
- org::freedesktop::Akonadi::NotificationSource *notificationSource =
- new org::freedesktop::Akonadi::NotificationSource(
- ServerManager::serviceName(Akonadi::ServerManager::Server),
- p.path(),
- KDBusConnectionPool::threadConnection(), parent);
-
- if (!notificationSource) {
- // :TODO: error handling
- return 0;
- }
- return new NotificationSource(notificationSource);
-}
-
-QObject *ChangeNotificationDependenciesFactory::createNotificationBus(QObject *parent, NotificationSource *source)
-{
- NotificationBusPrivate *priv = new NotificationBusPrivate;
- Session *session = new Session(priv, source->identifier().toLatin1(), parent);
- priv->setParent(session);
- return priv;
+ return session->d->sessionThread()->createConnection(Connection::NotificationConnection, session->sessionId());
}
QObject *ChangeNotificationDependenciesFactory::createChangeMediator(QObject *parent)
diff --git a/src/core/changenotificationdependenciesfactory_p.h b/src/core/changenotificationdependenciesfactory_p.h
index 414984f..141a679 100644
--- a/src/core/changenotificationdependenciesfactory_p.h
+++ b/src/core/changenotificationdependenciesfactory_p.h
@@ -26,7 +26,7 @@
namespace Akonadi
{
-class NotificationSource;
+class Connection;
/**
* This class exists so that we can create a fake notification source in
@@ -38,8 +38,7 @@ public:
virtual ~ChangeNotificationDependenciesFactory()
{
}
- virtual NotificationSource *createNotificationSource(QObject *parent);
- virtual QObject *createNotificationBus(QObject *parent, NotificationSource *source);
+ virtual Connection *createNotificationConnection(Session *parent);
virtual QObject *createChangeMediator(QObject *parent);
virtual Akonadi::CollectionCache *createCollectionCache(int maxCapacity, Session *session);
diff --git a/src/core/connectionthread.cpp b/src/core/connection.cpp
index baaff78..04a40eb 100644
--- a/src/core/connectionthread.cpp
+++ b/src/core/connection.cpp
@@ -17,7 +17,7 @@
* 02110-1301, USA.
*/
-#include "connectionthread_p.h"
+#include "connection_p.h"
#include "session_p.h"
#include "servermanager_p.h"
#include "akonadicore_debug.h"
@@ -37,8 +37,9 @@
using namespace Akonadi;
-ConnectionThread::ConnectionThread(const QByteArray &sessionId, QObject *parent)
+Connection::Connection(ConnectionType connType, const QByteArray &sessionId, QObject *parent)
: QObject(parent)
+ , mConnectionType(connType)
, mSocket(Q_NULLPTR)
, mLogFile(Q_NULLPTR)
, mSessionId(sessionId)
@@ -46,10 +47,6 @@ ConnectionThread::ConnectionThread(const QByteArray &sessionId, QObject *parent)
qRegisterMetaType<Protocol::Command>();
qRegisterMetaType<QAbstractSocket::SocketState>();
- QThread *thread = new QThread();
- moveToThread(thread);
- thread->start();
-
const QByteArray sessionLogFile = qgetenv("AKONADI_SESSION_LOGFILE");
if (!sessionLogFile.isEmpty()) {
mLogFile = new QFile(QStringLiteral("%1.%2.%3").arg(QString::fromLatin1(sessionLogFile),
@@ -63,44 +60,24 @@ ConnectionThread::ConnectionThread(const QByteArray &sessionId, QObject *parent)
}
}
-ConnectionThread::~ConnectionThread()
+Connection::~Connection()
{
- if (QCoreApplication::instance()) {
- QMetaObject::invokeMethod(this, "doThreadQuit");
- } else {
- // QCoreApplication already destroyed -> invokeMethod would just not get the message delivered
- // We leak the socket, but at least we don't block for 10s
- qWarning() << "Akonadi ConnectionThread deleted after QCoreApplication is destroyed. Clean up your sessions earlier!";
- thread()->quit();
- }
- if (!thread()->wait(10 * 1000)) {
- thread()->terminate();
- // Make sure to wait until it's done, otherwise it can crash when the pthread callback is called
- thread()->wait();
- }
delete mLogFile;
- delete thread();
-}
-
-void ConnectionThread::doThreadQuit()
-{
- Q_ASSERT(QThread::currentThread() == thread());
if (mSocket) {
mSocket->disconnect(this);
mSocket->close();
delete mSocket;
}
- thread()->quit();
}
-void ConnectionThread::reconnect()
+void Connection::reconnect()
{
const bool ok = QMetaObject::invokeMethod(this, "doReconnect", Qt::QueuedConnection);
Q_ASSERT(ok);
Q_UNUSED(ok)
}
-void ConnectionThread::doReconnect()
+void Connection::doReconnect()
{
Q_ASSERT(QThread::currentThread() == thread());
@@ -146,9 +123,15 @@ void ConnectionThread::doReconnect()
<< XdgBaseDirs::systemPathList("config");
}
const QSettings connectionSettings(connectionConfigFile, QSettings::IniFormat);
-
const QString defaultSocketDir = StandardDirs::saveDir("data");
- serverAddress = connectionSettings.value(QStringLiteral("Data/UnixPath"), QString(defaultSocketDir + QStringLiteral("/akonadiserver.socket"))).toString();
+
+ if (mConnectionType == CommandConnection) {
+ const QString defaultSocketPath = defaultSocketDir % QStringLiteral("akonadiserver-cmd.socket");
+ serverAddress = connectionSettings.value(QStringLiteral("Data/UnixPath"), defaultSocketPath).toString();
+ } else if (mConnectionType == NotificationConnection) {
+ const QString defaultSocketPath = defaultSocketDir % QStringLiteral("akonadiserver-ntf.socket");
+ serverAddress = connectionSettings.value(QStringLiteral("Notifications/UnixPath"), defaultSocketPath).toString();
+ }
}
// create sockets if not yet done, note that this does not yet allow changing socket types on the fly
@@ -157,11 +140,12 @@ void ConnectionThread::doReconnect()
mSocket = new QLocalSocket(this);
connect(mSocket, static_cast<void(QLocalSocket::*)(QLocalSocket::LocalSocketError)>(&QLocalSocket::error), this,
[this](QLocalSocket::LocalSocketError) {
+ qCWarning(AKONADICORE_LOG) << mSocket->errorString();
Q_EMIT socketError(mSocket->errorString());
Q_EMIT socketDisconnected();
});
- connect(mSocket, &QLocalSocket::disconnected, this, &ConnectionThread::socketDisconnected);
- connect(mSocket, &QLocalSocket::readyRead, this, &ConnectionThread::dataReceived);
+ connect(mSocket, &QLocalSocket::disconnected, this, &Connection::socketDisconnected);
+ connect(mSocket, &QLocalSocket::readyRead, this, &Connection::dataReceived);
}
// actually do connect
@@ -171,7 +155,7 @@ void ConnectionThread::doReconnect()
Q_EMIT reconnected();
}
-void ConnectionThread::forceReconnect()
+void Connection::forceReconnect()
{
const bool ok = QMetaObject::invokeMethod(this, "doForceReconnect",
Qt::QueuedConnection);
@@ -179,7 +163,7 @@ void ConnectionThread::forceReconnect()
Q_UNUSED(ok)
}
-void ConnectionThread::doForceReconnect()
+void Connection::doForceReconnect()
{
Q_ASSERT(QThread::currentThread() == thread());
@@ -191,14 +175,14 @@ void ConnectionThread::doForceReconnect()
mSocket = Q_NULLPTR;
}
-void ConnectionThread::disconnect()
+void Connection::closeConnection()
{
- const bool ok = QMetaObject::invokeMethod(this, "doDisconnect", Qt::QueuedConnection);
+ const bool ok = QMetaObject::invokeMethod(this, "doCloseConnection", Qt::QueuedConnection);
Q_ASSERT(ok);
Q_UNUSED(ok)
}
-void ConnectionThread::doDisconnect()
+void Connection::doCloseConnection()
{
Q_ASSERT(QThread::currentThread() == thread());
@@ -208,13 +192,12 @@ void ConnectionThread::doDisconnect()
}
}
-void ConnectionThread::dataReceived()
+void Connection::dataReceived()
{
Q_ASSERT(QThread::currentThread() == thread());
QElapsedTimer timer;
timer.start();
-
while (mSocket->bytesAvailable() > 0) {
QDataStream stream(mSocket);
qint64 tag;
@@ -263,13 +246,13 @@ void ConnectionThread::dataReceived()
// to the jobs through event loop. That will be overall slower but should
// result in much more responsive applications.
if (timer.elapsed() > 50) {
- QThread::currentThread()->eventDispatcher()->processEvents(QEventLoop::ExcludeSocketNotifiers);
+ thread()->eventDispatcher()->processEvents(QEventLoop::ExcludeSocketNotifiers);
timer.restart();
}
}
}
-void ConnectionThread::sendCommand(qint64 tag, const Protocol::Command &cmd)
+void Connection::sendCommand(qint64 tag, const Protocol::Command &cmd)
{
const bool ok = QMetaObject::invokeMethod(this, "doSendCommand",
Qt::QueuedConnection,
@@ -279,7 +262,7 @@ void ConnectionThread::sendCommand(qint64 tag, const Protocol::Command &cmd)
Q_UNUSED(ok)
}
-void ConnectionThread::doSendCommand(qint64 tag, const Protocol::Command &cmd)
+void Connection::doSendCommand(qint64 tag, const Protocol::Command &cmd)
{
Q_ASSERT(QThread::currentThread() == thread());
diff --git a/src/core/connectionthread_p.h b/src/core/connection_p.h
index 3f221a1..9724a20 100644
--- a/src/core/connectionthread_p.h
+++ b/src/core/connection_p.h
@@ -27,23 +27,33 @@
#include <private/protocol_p.h>
+#include "akonadicore_export.h"
+
class QAbstractSocket;
class QFile;
namespace Akonadi
{
-class ConnectionThread : public QObject
+class SessionThread;
+
+class AKONADICORE_EXPORT Connection : public QObject
{
Q_OBJECT
public:
- explicit ConnectionThread(const QByteArray &sessionId, QObject *parent = Q_NULLPTR);
- ~ConnectionThread();
+ enum ConnectionType {
+ CommandConnection,
+ NotificationConnection
+ };
+ Q_ENUMS(ConnectionType)
+
+ explicit Connection(ConnectionType connType, const QByteArray &sessionId, QObject *parent = Q_NULLPTR);
+ ~Connection();
Q_INVOKABLE void reconnect();
void forceReconnect();
- void disconnect();
+ void closeConnection();
void sendCommand(qint64 tag, const Protocol::Command &command);
Q_SIGNALS:
@@ -54,10 +64,9 @@ Q_SIGNALS:
void socketError(const QString &message);
private Q_SLOTS:
- void doThreadQuit();
void doReconnect();
void doForceReconnect();
- void doDisconnect();
+ void doCloseConnection();
void doSendCommand(qint64 tag, const Akonadi::Protocol::Command &command);
void dataReceived();
@@ -66,6 +75,7 @@ private:
bool handleCommand(qint64 tag, const Protocol::Command &cmd);
+ ConnectionType mConnectionType;
QLocalSocket *mSocket;
QFile *mLogFile;
QByteArray mSessionId;
@@ -75,6 +85,8 @@ private:
Protocol::Command cmd;
};
QQueue<Command> mOutQueue;
+
+ friend class Akonadi::SessionThread;
};
}
diff --git a/src/core/monitor.cpp b/src/core/monitor.cpp
index 258cbef..2ac8e60 100644
--- a/src/core/monitor.cpp
+++ b/src/core/monitor.cpp
@@ -67,15 +67,13 @@ void Monitor::setCollectionMonitored(const Collection &collection, bool monitore
Q_D(Monitor);
if (!d->collections.contains(collection) && monitored) {
d->collections << collection;
- if (d->notificationSource) {
- d->notificationSource->setMonitoredCollection(collection.id(), true);
- }
+ d->pendingModification.startMonitoringCollection(collection.id());
+ d->scheduleSubscriptionUpdate();
} else if (!monitored) {
if (d->collections.removeAll(collection)) {
d->cleanOldNotifications();
- if (d->notificationSource) {
- d->notificationSource->setMonitoredCollection(collection.id(), false);
- }
+ d->pendingModification.stopMonitoringCollection(collection.id());
+ d->scheduleSubscriptionUpdate();
}
}
@@ -87,15 +85,13 @@ void Monitor::setItemMonitored(const Item &item, bool monitored)
Q_D(Monitor);
if (!d->items.contains(item.id()) && monitored) {
d->items.insert(item.id());
- if (d->notificationSource) {
- d->notificationSource->setMonitoredItem(item.id(), true);
- }
+ d->pendingModification.startMonitoringItem(item.id());
+ d->scheduleSubscriptionUpdate();
} else if (!monitored) {
if (d->items.remove(item.id())) {
d->cleanOldNotifications();
- if (d->notificationSource) {
- d->notificationSource->setMonitoredItem(item.id(), false);
- }
+ d->pendingModification.stopMonitoringItem(item.id());
+ d->scheduleSubscriptionUpdate();
}
}
@@ -107,15 +103,13 @@ void Monitor::setResourceMonitored(const QByteArray &resource, bool monitored)
Q_D(Monitor);
if (!d->resources.contains(resource) && monitored) {
d->resources.insert(resource);
- if (d->notificationSource) {
- d->notificationSource->setMonitoredResource(resource, true);
- }
+ d->pendingModification.startMonitoringResource(resource);
+ d->scheduleSubscriptionUpdate();
} else if (!monitored) {
if (d->resources.remove(resource)) {
d->cleanOldNotifications();
- if (d->notificationSource) {
- d->notificationSource->setMonitoredResource(resource, false);
- }
+ d->pendingModification.stopMonitoringResource(resource);
+ d->scheduleSubscriptionUpdate();
}
}
@@ -127,15 +121,13 @@ void Monitor::setMimeTypeMonitored(const QString &mimetype, bool monitored)
Q_D(Monitor);
if (!d->mimetypes.contains(mimetype) && monitored) {
d->mimetypes.insert(mimetype);
- if (d->notificationSource) {
- d->notificationSource->setMonitoredMimeType(mimetype, true);
- }
+ d->pendingModification.startMonitoringMimeType(mimetype);
+ d->scheduleSubscriptionUpdate();
} else if (!monitored) {
if (d->mimetypes.remove(mimetype)) {
d->cleanOldNotifications();
- if (d->notificationSource) {
- d->notificationSource->setMonitoredMimeType(mimetype, false);
- }
+ d->pendingModification.stopMonitoringMimeType(mimetype);
+ d->scheduleSubscriptionUpdate();
}
}
@@ -147,15 +139,13 @@ void Monitor::setTagMonitored(const Akonadi::Tag &tag, bool monitored)
Q_D(Monitor);
if (!d->tags.contains(tag.id()) && monitored) {
d->tags.insert(tag.id());
- if (d->notificationSource) {
- d->notificationSource->setMonitoredTag(tag.id(), true);
- }
+ d->pendingModification.startMonitoringTag(tag.id());
+ d->scheduleSubscriptionUpdate();
} else if (!monitored) {
if (d->tags.remove(tag.id())) {
d->cleanOldNotifications();
- if (d->notificationSource) {
- d->notificationSource->setMonitoredTag(tag.id(), false);
- }
+ d->pendingModification.stopMonitoringTag(tag.id());
+ d->scheduleSubscriptionUpdate();
}
}
@@ -167,15 +157,13 @@ void Monitor::setTypeMonitored(Monitor::Type type, bool monitored)
Q_D(Monitor);
if (!d->types.contains(type) && monitored) {
d->types.insert(type);
- if (d->notificationSource) {
- d->notificationSource->setMonitoredType(static_cast<Protocol::ChangeNotification::Type>(type), true);
- }
+ d->pendingModification.startMonitoringType(static_cast<Protocol::ChangeNotification::Type>(type));
+ d->scheduleSubscriptionUpdate();
} else if (!monitored) {
if (d->types.remove(type)) {
d->cleanOldNotifications();
- if (d->notificationSource) {
- d->notificationSource->setMonitoredType(static_cast<Protocol::ChangeNotification::Type>(type), false);
- }
+ d->pendingModification.stopMonitoringType(static_cast<Protocol::ChangeNotification::Type>(type));
+ d->scheduleSubscriptionUpdate();
}
}
@@ -195,9 +183,8 @@ void Akonadi::Monitor::setAllMonitored(bool monitored)
d->cleanOldNotifications();
}
- if (d->notificationSource) {
- d->notificationSource->setAllMonitored(monitored);
- }
+ d->pendingModification.setAllMonitored(monitored);
+ d->scheduleSubscriptionUpdate();
emit allMonitored(monitored);
}
@@ -206,9 +193,8 @@ void Monitor::setExclusive(bool exclusive)
{
Q_D(Monitor);
d->exclusive = exclusive;
- if (d->notificationSource) {
- d->notificationSource->setExclusive(exclusive);
- }
+ d->pendingModification.setExclusive(exclusive);
+ d->scheduleSubscriptionUpdate();
}
bool Monitor::exclusive() const
@@ -224,9 +210,8 @@ void Monitor::ignoreSession(Session *session)
if (!d->sessions.contains(session->sessionId())) {
d->sessions << session->sessionId();
connect(session, SIGNAL(destroyed(QObject*)), this, SLOT(slotSessionDestroyed(QObject*)));
- if (d->notificationSource) {
- d->notificationSource->setIgnoredSession(session->sessionId(), true);
- }
+ d->pendingModification.startIgnoringSession(session->sessionId());
+ d->scheduleSubscriptionUpdate();
}
}
@@ -368,9 +353,9 @@ void Monitor::setSession(Akonadi::Session *session)
d->itemCache->setSession(d->session);
d->collectionCache->setSession(d->session);
- if (d->notificationSource) {
- d->notificationSource->setSession(d->session->sessionId());
- }
+
+ // Reconnect with a new session
+ d->connectToNotificationManager();
}
Session *Monitor::session() const
diff --git a/src/core/monitor.h b/src/core/monitor.h
index a15ec84..19b9143 100644
--- a/src/core/monitor.h
+++ b/src/core/monitor.h
@@ -41,7 +41,7 @@ class TagFetchScope;
namespace Protocol
{
-class ChangeNotification;
+class Command;
}
/**
@@ -733,6 +733,8 @@ Q_SIGNALS:
*/
void typeMonitored(const Akonadi::Monitor::Type type, bool monitored);
+ void monitorReady();
+
protected:
//@cond PRIVATE
friend class EntityTreeModel;
@@ -748,7 +750,8 @@ private:
Q_PRIVATE_SLOT(d_ptr, void slotSessionDestroyed(QObject *))
Q_PRIVATE_SLOT(d_ptr, void slotStatisticsChangedFinished(KJob *))
Q_PRIVATE_SLOT(d_ptr, void slotFlushRecentlyChangedCollections())
- Q_PRIVATE_SLOT(d_ptr, void slotNotify(const Akonadi::Protocol::ChangeNotification &))
+ Q_PRIVATE_SLOT(d_ptr, void slotUpdateSubscription())
+ Q_PRIVATE_SLOT(d_ptr, void commandReceived(qint64 tag, const Akonadi::Protocol::Command &))
Q_PRIVATE_SLOT(d_ptr, void dataAvailable())
Q_PRIVATE_SLOT(d_ptr, void serverStateChanged(Akonadi::ServerManager::State))
Q_PRIVATE_SLOT(d_ptr, void invalidateCollectionCache(qint64))
diff --git a/src/core/monitor_p.cpp b/src/core/monitor_p.cpp
index b20d8d4..c2f9945 100644
--- a/src/core/monitor_p.cpp
+++ b/src/core/monitor_p.cpp
@@ -38,8 +38,7 @@ static const int PipelineSize = 5;
MonitorPrivate::MonitorPrivate(ChangeNotificationDependenciesFactory *dependenciesFactory_, Monitor *parent)
: q_ptr(parent)
, dependenciesFactory(dependenciesFactory_ ? dependenciesFactory_ : new ChangeNotificationDependenciesFactory)
- , notificationSource(0)
- , notificationBus(0)
+ , ntfConnection(Q_NULLPTR)
, monitorAll(false)
, exclusive(false)
, mFetchChangedOnly(false)
@@ -47,6 +46,8 @@ MonitorPrivate::MonitorPrivate(ChangeNotificationDependenciesFactory *dependenci
, collectionCache(0)
, itemCache(0)
, tagCache(0)
+ , pendingModificationTimer(Q_NULLPTR)
+ , monitorReady(false)
, fetchCollection(false)
, fetchCollectionStatistics(false)
, collectionMoveTranslationEnabled(true)
@@ -86,62 +87,28 @@ void MonitorPrivate::init()
bool MonitorPrivate::connectToNotificationManager()
{
- delete notificationSource;
+ if (ntfConnection) {
+ ntfConnection->deleteLater();
+ ntfConnection = Q_NULLPTR;
+ }
- notificationSource = dependenciesFactory->createNotificationSource(q_ptr);
- if (!notificationSource) {
+ if (!session) {
return false;
}
- notificationSource->setSession(session->sessionId());
-
- if (notificationBus) {
- // HACK: Implementation detail: notificationBus is SessionPrivate subclass,
- // so we cannot delete it directly, but we need to delete the owning
- // Session instead, otherwise it will dereference a deleted d_ptr.
- delete notificationBus->parent();
- }
- notificationBus = dependenciesFactory->createNotificationBus(q_ptr, notificationSource);
- if (!notificationBus) {
- delete notificationSource;
- notificationSource = 0;
+ ntfConnection = dependenciesFactory->createNotificationConnection(session);
+ if (!ntfConnection) {
return false;
}
- QObject::connect(notificationBus, SIGNAL(notify(Akonadi::Protocol::ChangeNotification)),
- q_ptr, SLOT(slotNotify(Akonadi::Protocol::ChangeNotification)));
-
+ q_ptr->connect(ntfConnection, SIGNAL(commandReceived(qint64,Akonadi::Protocol::Command)),
+ q_ptr, SLOT(commandReceived(qint64,Akonadi::Protocol::Command)));
return true;
}
void MonitorPrivate::serverStateChanged(ServerManager::State state)
{
if (state == ServerManager::Running) {
- if (connectToNotificationManager()) {
- notificationSource->setAllMonitored(monitorAll);
- notificationSource->setSession(session->sessionId());
- Q_FOREACH (const Collection &col, collections) {
- notificationSource->setMonitoredCollection(col.id(), true);
- }
- Q_FOREACH (const Item::Id id, items) {
- notificationSource->setMonitoredItem(id, true);
- }
- Q_FOREACH (const QByteArray &resource, resources) {
- notificationSource->setMonitoredResource(resource, true);
- }
- Q_FOREACH (const QByteArray &session, sessions) {
- notificationSource->setIgnoredSession(session, true);
- }
- Q_FOREACH (const QString &mimeType, mimetypes) {
- notificationSource->setMonitoredMimeType(mimeType, true);
- }
- Q_FOREACH (Tag::Id tagId, tags) {
- notificationSource->setMonitoredTag(tagId, true);
- }
- Q_FOREACH (Monitor::Type type, types) {
- notificationSource->setMonitoredType(
- static_cast<Protocol::ChangeNotification::Type>(type), true);
- }
- }
+ connectToNotificationManager();
}
}
@@ -165,6 +132,31 @@ int MonitorPrivate::pipelineSize() const
return PipelineSize;
}
+void MonitorPrivate::scheduleSubscriptionUpdate()
+{
+ if (pendingModificationTimer || !monitorReady) {
+ return;
+ }
+
+ pendingModificationTimer = new QTimer();
+ pendingModificationTimer->setSingleShot(true);
+ pendingModificationTimer->setInterval(0);
+ pendingModificationTimer->start();
+ q_ptr->connect(pendingModificationTimer, SIGNAL(timeout()),
+ q_ptr, SLOT(slotUpdateSubscription()));
+}
+
+void MonitorPrivate::slotUpdateSubscription()
+{
+ delete pendingModificationTimer;
+ pendingModificationTimer = Q_NULLPTR;
+
+ if (ntfConnection) {
+ ntfConnection->sendCommand(3, pendingModification);
+ pendingModification = Protocol::ModifySubscriptionCommand();
+ }
+}
+
bool MonitorPrivate::isLazilyIgnored(const Protocol::ChangeNotification &msg, bool allowModifyFlagsConversion) const
{
Protocol::ChangeNotification::Operation op = msg.operation();
@@ -616,9 +608,8 @@ void MonitorPrivate::slotSessionDestroyed(QObject *object)
Session *objectSession = qobject_cast<Session *>(object);
if (objectSession) {
sessions.removeAll(objectSession->sessionId());
- if (notificationSource) {
- notificationSource->setIgnoredSession(objectSession->sessionId(), false);
- }
+ pendingModification.stopIgnoringSession(objectSession->sessionId());
+ scheduleSubscriptionUpdate();
}
}
@@ -713,6 +704,52 @@ int MonitorPrivate::translateAndCompress(QQueue< Protocol::ChangeNotification >
return split.count();
}
+void MonitorPrivate::commandReceived(qint64 tag, const Protocol::Command &command)
+{
+ Q_UNUSED(tag);
+ if (command.isResponse()) {
+ switch (command.type()) {
+ case Protocol::Command::Hello: {
+ Protocol::HelloResponse hello(command);
+ qCDebug(AKONADICORE_LOG) << q_ptr << "Connected to notification bus";
+ Protocol::CreateSubscriptionCommand subCmd(session->sessionId() + QByteArray::number(quintptr(this)));
+ ntfConnection->sendCommand(2, subCmd);
+ break;
+ }
+
+ case Protocol::Command::CreateSubscription: {
+ Protocol::ModifySubscriptionCommand msubCmd = pendingModification;
+ pendingModification = Protocol::ModifySubscriptionCommand();
+ ntfConnection->sendCommand(3, msubCmd);
+ break;
+ }
+
+ case Protocol::Command::ModifySubscription:
+ // TODO: Handle errors
+ if (!monitorReady) {
+ monitorReady = true;
+ Q_EMIT q_ptr->monitorReady();
+ }
+ break;
+
+ default:
+ // TODO
+ break;
+ }
+ } else {
+ switch (command.type()) {
+ case Protocol::Command::ChangeNotification: {
+ Protocol::ChangeNotification ntf(command);
+ slotNotify(ntf);
+ break;
+ }
+
+ default:
+ break;
+ }
+ }
+}
+
/*
server notification --> ?accepted --> pendingNotifications --> ?dataAvailable --> emit
diff --git a/src/core/monitor_p.h b/src/core/monitor_p.h
index a94d0f8..87ae610 100644
--- a/src/core/monitor_p.h
+++ b/src/core/monitor_p.h
@@ -32,7 +32,7 @@
#include "entitycache_p.h"
#include "servermanager.h"
#include "changenotificationdependenciesfactory_p.h"
-#include "notificationsource_p.h"
+#include "connection_p.h"
#include "private/protocol_p.h"
@@ -60,8 +60,7 @@ public:
Monitor *q_ptr;
Q_DECLARE_PUBLIC(Monitor)
ChangeNotificationDependenciesFactory *dependenciesFactory;
- NotificationSource *notificationSource;
- QObject *notificationBus;
+ Connection *ntfConnection;
Collection::List collections;
QSet<QByteArray> resources;
QSet<Item::Id> items;
@@ -81,6 +80,10 @@ public:
TagListCache *tagCache;
QMimeDatabase mimeDatabase;
+ Protocol::ModifySubscriptionCommand pendingModification;
+ QTimer *pendingModificationTimer;
+ bool monitorReady;
+
// The waiting list
QQueue<Protocol::ChangeNotification> pendingNotifications;
// The messages for which data is currently being fetched
@@ -142,6 +145,8 @@ public:
*/
int translateAndCompress(QQueue<Protocol::ChangeNotification> &notificationQueue, const Protocol::ChangeNotification &msg);
+ void commandReceived(qint64 tag, const Protocol::Command &command);
+
virtual void slotNotify(const Protocol::ChangeNotification &msg);
/**
@@ -178,6 +183,9 @@ public:
*/
void invalidateTagCache(qint64 tagId);
+ void scheduleSubscriptionUpdate();
+ void slotUpdateSubscription();
+
/**
@brief Class used to determine when to purge items in a Collection
diff --git a/src/core/notificationbus_p.cpp b/src/core/notificationbus_p.cpp
deleted file mode 100644
index 8b50080..0000000
--- a/src/core/notificationbus_p.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- Copyright (c) 2015 Daniel Vrátil <dvratil@redhat.com>
-
- This library is free software; you can redistribute it and/or modify it
- under the terms of the GNU Library General Public License as published by
- the Free Software Foundation; either version 2 of the License, or (at your
- option) any later version.
-
- This library is distributed in the hope that it will be useful, but WITHOUT
- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
- License for more details.
-
- You should have received a copy of the GNU Library General Public License
- along with this library; see the file COPYING.LIB. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- 02110-1301, USA.
-*/
-
-#include "notificationbus_p.h"
-#include "session_p.h"
-#include "connectionthread_p.h"
-#include "akonadicore_debug.h"
-
-#include "private/protocol_p.h"
-
-#include <QTimer>
-
-using namespace Akonadi;
-
-NotificationBusPrivate::NotificationBusPrivate(Session *parent)
- : QObject(parent)
- , SessionPrivate(parent)
-{
-}
-
-NotificationBusPrivate::~NotificationBusPrivate()
-{
-}
-
-bool NotificationBusPrivate::handleCommand(qint64 tag, const Protocol::Command &cmd)
-{
- Q_UNUSED(tag);
-
- if (cmd.type() == Protocol::Command::Hello) {
- Protocol::HelloResponse hello(cmd);
- if (hello.isError()) {
- qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage();
- connThread->disconnect();
- QTimer::singleShot(1000, mParent, SLOT(reconnect()));
- return false;
- }
-
- qCDebug(AKONADICORE_LOG) << "Connected to" << hello.serverName() << ", using protocol version" << hello.protocolVersion();
- qCDebug(AKONADICORE_LOG) << "Server says:" << hello.message();
- // Version mismatch is handled in SessionPrivate::startJob() so that
- // we can report the error out via KJob API
- protocolVersion = hello.protocolVersion();
-
- Protocol::LoginCommand login(sessionId, Protocol::LoginCommand::NotificationBus);
- sendCommand(nextTag(), login);
- return true;
- }
-
- if (cmd.type() == Protocol::Command::Login) {
- Protocol::LoginResponse login(cmd);
- if (login.isError()) {
- qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage();
- connThread->disconnect();
- QTimer::singleShot(1000, mParent, SLOT(reconnect()));
- return false;
- }
-
- connected = true;
- startNext();
- return true;
- }
-
- if (cmd.type() == Protocol::Command::ChangeNotification) {
- Q_EMIT notify(cmd);
- return true;
- }
-
- qCWarning(AKONADICORE_LOG) << "Recieved invalid command on NotificationBus" << sessionId;
- return false;
-}
diff --git a/src/core/session.cpp b/src/core/session.cpp
index bfebdc4..7545acc 100644
--- a/src/core/session.cpp
+++ b/src/core/session.cpp
@@ -25,7 +25,7 @@
#include "servermanager.h"
#include "servermanager_p.h"
#include "protocolhelper_p.h"
-#include "connectionthread_p.h"
+#include "sessionthread_p.h"
#include "private/standarddirs_p.h"
#include "private/protocol_p.h"
@@ -63,21 +63,20 @@ void SessionPrivate::startNext()
void SessionPrivate::reconnect()
{
- if (!connThread) {
- connThread = new ConnectionThread(sessionId);
- mParent->connect(connThread, &ConnectionThread::reconnected, mParent, &Session::reconnected,
- Qt::QueuedConnection);
- mParent->connect(connThread, SIGNAL(commandReceived(qint64,Akonadi::Protocol::Command)),
- mParent, SLOT(handleCommand(qint64, Akonadi::Protocol::Command)),
- Qt::QueuedConnection);
- mParent->connect(connThread, SIGNAL(socketDisconnected()), mParent, SLOT(socketDisconnected()),
- Qt::QueuedConnection);
- mParent->connect(connThread, SIGNAL(socketError(QString)), mParent, SLOT(socketError(QString)),
- Qt::QueuedConnection);
-
+ if (!connection) {
+ connection = sessionThread()->createConnection(Connection::CommandConnection, sessionId);
+ mParent->connect(connection, &Connection::reconnected, mParent, &Session::reconnected,
+ Qt::QueuedConnection);
+ mParent->connect(connection, SIGNAL(commandReceived(qint64,Akonadi::Protocol::Command)),
+ mParent, SLOT(handleCommand(qint64, Akonadi::Protocol::Command)),
+ Qt::QueuedConnection);
+ mParent->connect(connection, SIGNAL(socketDisconnected()), mParent, SLOT(socketDisconnected()),
+ Qt::QueuedConnection);
+ mParent->connect(connection, SIGNAL(socketError(QString)), mParent, SLOT(socketError(QString)),
+ Qt::QueuedConnection);
}
- connThread->reconnect();
+ connection->reconnect();
}
QString SessionPrivate::connectionFile()
@@ -106,8 +105,8 @@ bool SessionPrivate::handleCommand(qint64 tag, const Protocol::Command &cmd)
Protocol::HelloResponse hello(cmd);
if (hello.isError()) {
qCWarning(AKONADICORE_LOG) << "Error when establishing connection with Akonadi server:" << hello.errorMessage();
- connThread->disconnect();
- QTimer::singleShot(1000, connThread, &ConnectionThread::reconnect);
+ connection->closeConnection();
+ QTimer::singleShot(1000, connection, &Connection::reconnect);
return false;
}
@@ -128,7 +127,7 @@ bool SessionPrivate::handleCommand(qint64 tag, const Protocol::Command &cmd)
Protocol::LoginResponse login(cmd);
if (login.isError()) {
qCWarning(AKONADICORE_LOG) << "Unable to login to Akonadi server:" << login.errorMessage();
- connThread->disconnect();
+ connection->closeConnection();
QTimer::singleShot(1000, mParent, SLOT(reconnect()));
return false;
}
@@ -260,7 +259,7 @@ qint64 SessionPrivate::nextTag()
void SessionPrivate::sendCommand(qint64 tag, const Protocol::Command &command)
{
- connThread->sendCommand(tag, command);
+ connection->sendCommand(tag, command);
}
void SessionPrivate::serverStateChanged(ServerManager::State state)
@@ -275,8 +274,8 @@ void SessionPrivate::serverStateChanged(ServerManager::State state)
job->kill(KJob::EmitResult);
}
} else if (state == ServerManager::Stopping) {
- delete connThread;
- connThread = Q_NULLPTR;
+ delete connection;
+ connection = Q_NULLPTR;
}
}
@@ -293,25 +292,25 @@ void SessionPrivate::itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevisi
SessionPrivate::SessionPrivate(Session *parent)
: mParent(parent)
- , thread(0)
- , connThread(0)
+ , mSessionThread(new SessionThread)
+ , connection(Q_NULLPTR)
, protocolVersion(0)
- , currentJob(0)
+ , currentJob(Q_NULLPTR)
{
// Shutdown the thread before QApplication event loop quits - the
- // thread()->wait() mechanism in ConnectionThread dtor crashes sometimes
+ // thread()->wait() mechanism in Connection dtor crashes sometimes
// when called from QApplication destructor
connThreadCleanUp = QObject::connect(qApp, &QCoreApplication::aboutToQuit,
[this]() {
- delete connThread;
- connThread = Q_NULLPTR;
+ delete mSessionThread;
+ mSessionThread = Q_NULLPTR;
});
}
SessionPrivate::~SessionPrivate()
{
QObject::disconnect(connThreadCleanUp);
- delete connThread;
+ delete mSessionThread;
}
void SessionPrivate::init(const QByteArray &id)
@@ -341,8 +340,8 @@ void SessionPrivate::forceReconnect()
{
jobRunning = false;
connected = false;
- if (connThread) {
- connThread->forceReconnect();
+ if (connection) {
+ connection->forceReconnect();
}
QMetaObject::invokeMethod(mParent, "reconnect", Qt::QueuedConnection);
}
diff --git a/src/core/session_p.h b/src/core/session_p.h
index 1be448e..a55ad79 100644
--- a/src/core/session_p.h
+++ b/src/core/session_p.h
@@ -36,7 +36,8 @@ class QIODevice;
namespace Akonadi
{
-class ConnectionThread;
+class SessionThread;
+class Connection;
namespace Protocol
{
@@ -55,6 +56,8 @@ public:
virtual void init(const QByteArray &sessionId);
+ SessionThread *sessionThread() const { return mSessionThread; }
+
void startNext();
/// Disconnects a previously existing connection and tries to reconnect
void forceReconnect();
@@ -124,8 +127,8 @@ public:
static QString connectionFile();
Session *mParent;
- QThread *thread;
- ConnectionThread *connThread;
+ SessionThread *mSessionThread;
+ Connection *connection;
QMetaObject::Connection connThreadCleanUp;
QByteArray sessionId;
bool connected;
diff --git a/src/core/sessionthread.cpp b/src/core/sessionthread.cpp
new file mode 100644
index 0000000..a946ef7
--- /dev/null
+++ b/src/core/sessionthread.cpp
@@ -0,0 +1,86 @@
+/*
+ Copyright (c) 2016 Daniel Vrátil <dvratil@kde.org>
+
+ This library is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Library General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or (at your
+ option) any later version.
+
+ This library is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
+ License for more details.
+
+ You should have received a copy of the GNU Library General Public License
+ along with this library; see the file COPYING.LIB. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA.
+*/
+
+#include "sessionthread_p.h"
+
+#include <QThread>
+
+Q_DECLARE_METATYPE(Akonadi::Connection::ConnectionType)
+Q_DECLARE_METATYPE(Akonadi::Connection *)
+
+using namespace Akonadi;
+
+SessionThread::SessionThread(QObject *parent)
+ : QObject(parent)
+{
+ qRegisterMetaType<Connection::ConnectionType>();
+ qRegisterMetaType<Connection*>();
+
+ QThread *thread = new QThread();
+ moveToThread(thread);
+ thread->start();
+}
+
+SessionThread::~SessionThread()
+{
+ QMetaObject::invokeMethod(this, "doThreadQuit");
+ if (!thread()->wait(10 * 1000)) {
+ thread()->terminate();
+ // Make sure to wait until it's done, otherwise it can crash when the pthread callback is called
+ thread()->wait();
+ }
+ delete thread();
+}
+
+Connection *SessionThread::createConnection(Connection::ConnectionType connectionType,
+ const QByteArray &sessionId)
+{
+ Connection *conn = Q_NULLPTR;
+ const bool invoke = QMetaObject::invokeMethod(this, "doCreateConnection",
+ Qt::BlockingQueuedConnection,
+ Q_RETURN_ARG(Akonadi::Connection*, conn),
+ Q_ARG(Akonadi::Connection::ConnectionType, connectionType),
+ Q_ARG(QByteArray, sessionId));
+ Q_ASSERT(invoke); Q_UNUSED(invoke);
+ return conn;
+}
+
+Connection *SessionThread::doCreateConnection(Connection::ConnectionType connType,
+ const QByteArray& sessionId)
+{
+ Q_ASSERT(thread() == QThread::currentThread());
+
+ Connection *conn = new Connection(connType, sessionId);
+ conn->moveToThread(thread());
+ connect(conn, &QObject::destroyed,
+ this, [this](QObject *obj) {
+ mConnections.removeOne(static_cast<Connection*>(obj));
+ });
+ conn->doReconnect(); // immediately try to connect
+ return conn;
+}
+
+
+void SessionThread::doThreadQuit()
+{
+ Q_FOREACH (Connection *conn, mConnections) {
+ conn->closeConnection();
+ delete conn;
+ }
+}
diff --git a/src/core/notificationbus_p.h b/src/core/sessionthread_p.h
index 9c568ca..2e37c4b 100644
--- a/src/core/notificationbus_p.h
+++ b/src/core/sessionthread_p.h
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2015 Daniel Vrátil <dvratil@redhat.com>
+ Copyright (c) 2016 Daniel Vrátil <dvratil@kde.org>
This library is free software; you can redistribute it and/or modify it
under the terms of the GNU Library General Public License as published by
@@ -17,33 +17,38 @@
02110-1301, USA.
*/
-#ifndef AKONADI_NOTIFICATIONBUS_P_H
-#define AKONADI_NOTIFICATIONBUS_P_H
+#ifndef SESSIONTHREAD_P_H
+#define SESSIONTHREAD_P_H
-#include "session_p.h"
+#include <QObject>
+#include <QVector>
-namespace Akonadi
-{
+#include "connection_p.h"
-namespace Protocol
+namespace Akonadi
{
-class ChangeNotification;
-}
-class NotificationBusPrivate : public QObject,
- public Akonadi::SessionPrivate
+class SessionThread : public QObject
{
Q_OBJECT
public:
- explicit NotificationBusPrivate(Session *parent = Q_NULLPTR);
- ~NotificationBusPrivate();
+ explicit SessionThread(QObject *parent = Q_NULLPTR);
+ ~SessionThread();
- bool handleCommand(qint64 tag, const Protocol::Command &cmd) Q_DECL_OVERRIDE;
+ Connection *createConnection(Connection::ConnectionType connType, const QByteArray &sessionId);
-Q_SIGNALS:
- void notify(const Akonadi::Protocol::ChangeNotification &ntf);
+private:
+ Q_INVOKABLE Akonadi::Connection *doCreateConnection(Akonadi::Connection::ConnectionType connType,
+ const QByteArray &sessionId);
+
+ Q_INVOKABLE void doThreadQuit();
+
+private:
+ QVector<Connection *> mConnections;
};
+
}
-#endif // AKONADI_NOTIFICATIONBUS_P_H
+
+#endif
diff --git a/src/private/protocol.cpp b/src/private/protocol.cpp
index a64c2b4..cf83e2d 100644
--- a/src/private/protocol.cpp
+++ b/src/private/protocol.cpp
@@ -8712,7 +8712,7 @@ public:
, startMimeTypes(other.startMimeTypes)
, stopMimeTypes(other.stopMimeTypes)
, startSessions(other.startSessions)
- , stopSessions(other.startSessions)
+ , stopSessions(other.stopSessions)
, monitorAll(other.monitorAll)
, exclusive(other.exclusive)
{
diff --git a/src/private/protocol_p.h b/src/private/protocol_p.h
index 15e9049..0a66d0b 100644
--- a/src/private/protocol_p.h
+++ b/src/private/protocol_p.h
@@ -2259,7 +2259,6 @@ class AKONADIPRIVATE_EXPORT ModifySubscriptionResponse : public Response
{
public:
explicit ModifySubscriptionResponse();
- explicit ModifySubscriptionResponse(const QByteArray &subscriptionId);
ModifySubscriptionResponse(const Command &other);
private:
diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt
index 853e732..53624ab 100644
--- a/src/server/CMakeLists.txt
+++ b/src/server/CMakeLists.txt
@@ -125,7 +125,7 @@ set(libakonadiserver_SRCS
dbustracer.cpp
filetracer.cpp
notificationmanager.cpp
- notificationsource.cpp
+ notificationsubscriber.cpp
resourcemanager.cpp
cachecleaner.cpp
debuginterface.cpp
@@ -142,9 +142,7 @@ qt5_generate_dbus_interface(debuginterface.h org.freedesktop.Akonadi.DebugInterf
qt5_add_dbus_adaptor(libakonadiserver_SRCS ${Akonadi_SOURCE_DIR}/src/interfaces/org.freedesktop.Akonadi.TracerNotification.xml dbustracer.h Akonadi::Server::DBusTracer)
qt5_add_dbus_adaptor(libakonadiserver_SRCS ${Akonadi_SOURCE_DIR}/src/interfaces/org.freedesktop.Akonadi.Tracer.xml tracer.h Akonadi::Server::Tracer)
-qt5_add_dbus_adaptor(libakonadiserver_SRCS ${Akonadi_SOURCE_DIR}/src/interfaces/org.freedesktop.Akonadi.NotificationManager.xml notificationmanager.h Akonadi::Server::NotificationManager)
qt5_add_dbus_adaptor(libakonadiserver_SRCS ${Akonadi_SOURCE_DIR}/src/interfaces/org.freedesktop.Akonadi.Server.xml akonadi.h Akonadi::Server::AkonadiServer)
-qt5_add_dbus_adaptor(libakonadiserver_SRCS ${Akonadi_SOURCE_DIR}/src/interfaces/org.freedesktop.Akonadi.NotificationSource.xml notificationsource.h Akonadi::Server::NotificationSource)
qt5_add_dbus_adaptor(libakonadiserver_SRCS ${Akonadi_SOURCE_DIR}/src/interfaces/org.freedesktop.Akonadi.StorageDebugger.xml storage/storagedebugger.h Akonadi::Server::StorageDebugger)
qt5_add_dbus_adaptor(libakonadiserver_SRCS ${CMAKE_CURRENT_BINARY_DIR}/org.freedesktop.Akonadi.DebugInterface.xml debuginterface.h Akonadi::Server::DebugInterface)
qt5_add_dbus_adaptor(libakonadiserver_SRCS ${Akonadi_SOURCE_DIR}/src/interfaces/org.freedesktop.Akonadi.ResourceManager.xml resourcemanager.h Akonadi::Server::ResourceManager)
diff --git a/src/server/akonadi.cpp b/src/server/akonadi.cpp
index 8683264..5a5d54f 100644
--- a/src/server/akonadi.cpp
+++ b/src/server/akonadi.cpp
@@ -57,16 +57,6 @@
#include <QtDBus/QDBusServiceWatcher>
#include <QtNetwork/QLocalServer>
-#ifdef HAVE_UNISTD_H
-# include <unistd.h>
-#endif
-#include <stdlib.h>
-
-#ifdef Q_OS_WIN
-#include <windows.h>
-#include <Sddl.h>
-#endif
-
using namespace Akonadi;
using namespace Akonadi::Server;
@@ -90,6 +80,7 @@ AkonadiServer::AkonadiServer(QObject *parent)
qRegisterMetaType<Protocol::Command>();
qRegisterMetaType<Protocol::ChangeNotification>();
qRegisterMetaType<Protocol::ChangeNotification::List>();
+ qRegisterMetaType<quintptr>("quintptr");
qRegisterMetaType<Protocol::ChangeNotification::Type>();
qDBusRegisterMetaType<Protocol::ChangeNotification::Type>();
@@ -123,6 +114,9 @@ bool AkonadiServer::init()
s_instance = this;
+ const QString connectionSettingsFile = StandardDirs::connectionConfigFile(XdgBaseDirs::WriteOnly);
+ QSettings connectionSettings(connectionSettingsFile, QSettings::IniFormat);
+
mCmdServer = new AkLocalServer(this);
connect(mCmdServer, static_cast<void(AkLocalServer::*)(quintptr)>(&AkLocalServer::newConnection),
this, &AkonadiServer::newCmdConnection);
@@ -134,12 +128,6 @@ bool AkonadiServer::init()
connect(mNtfServer, static_cast<void(AkLocalServer::*)(quintptr)>(&AkLocalServer::newConnection),
mNotificationManager, &NotificationManager::registerConnection);
-
-
- const QString connectionSettingsFile = StandardDirs::connectionConfigFile(XdgBaseDirs::WriteOnly);
- QSettings connectionSettings(connectionSettingsFile, QSettings::IniFormat);
-
- QString pipeName;
#ifdef Q_OS_WIN
HANDLE hToken = NULL;
PSID sid;
@@ -173,36 +161,49 @@ bool AkonadiServer::init()
free(sid);
}
- pipeName = QStringLiteral("Akonadi-%1").arg(userID);
-#else
- uid_t uid = getuid();
- pipeName = QStringLiteral("Akonadi-%1").arg(uid);
-#endif
-
- if (Instance::hasIdentifier()) {
- pipeName += QStringLiteral("-") % Instance::identifier();
- }
-
- const QString cmdPipeName = pipeName + QStringLiteral("-Cmd");
- if (!mCmdServer->listen(cmdPipeName)) {
- qCCritical(AKONADISERVER_LOG) << "Unable to listen on named pipe" << cmdPipeName;
+ const QString defaultCmdPipe = QStringLiteral("Akonadi-Cmd-") % userID;
+ const QString cmdPipe = settings.value(QStringLiteral("Connection/NamedPipe"), defaultCmdPipe).toString();
+ if (!mCmdServer->listen(cmdPipe)) {
+ qCCritical(AKONADISERVER_LOG) << "Unable to listen on Named Pipe" << cmdPipe;
quit();
return false;
}
- const QString ntfPipeName = pipeName + QStringLiteral("-Ntf");
- if (!mNtfServer->listen(ntfPipeName)) {
- qCCritical(AKONADISERVER_LOG) << "Unable to listen on named pipe" << ntfPipeName;
+ const QString defaultNtfPipe = QStringLiteral("Akonadi-Ntf-") % userID;
+ const QString ntfPipe = settings.value(QStringLiteral("Connection/NtfNamedPipe"), defaultNtfPipe).toString();
+ if (!mNtfServer->listen(ntfPipe)) {
+ qCCritical(AKONADISERVER_LOG) << "Unable to listen on Named Pipe" << ntfPipe;
quit();
return false;
}
connectionSettings.setValue(QStringLiteral("Data/Method"), QStringLiteral("NamedPipe"));
- connectionSettings.setValue(QStringLiteral("Data/NamedPipe"), cmdPipeName);
-
+ connectionSettings.setValue(QStringLiteral("Data/NamedPipe"), cmdPipe);
connectionSettings.setValue(QStringLiteral("Notifications/Method"), QStringLiteral("NamedPipe"));
- connectionSettings.setValue(QStringLiteral("Nottifications/NamedPipe"), ntfPipeName);
+ connectionSettings.setValue(QStringLiteral("Notifications/NamedPipe"), ntfPipe);
+#else
+ const QString socketDir = Utils::preferredSocketDirectory(StandardDirs::saveDir("data"));
+ const QString cmdSocketFile = socketDir % QStringLiteral("/akonadiserver-cmd.socket");
+ QFile::remove(cmdSocketFile);
+ if (!mCmdServer->listen(cmdSocketFile)) {
+ akError() << "Unable to listen on Unix socket" << cmdSocketFile;
+ quit();
+ return false;
+ }
+
+ const QString ntfSocketFile = socketDir % QStringLiteral("/akonadiserver-ntf.socket");
+ QFile::remove(ntfSocketFile);
+ if (!mNtfServer->listen(ntfSocketFile)) {
+ akError() << "Unable to listen on Unix socket" << ntfSocketFile;
+ quit();
+ return false;
+ }
+ connectionSettings.setValue(QStringLiteral("Data/Method"), QStringLiteral("UnixPath"));
+ connectionSettings.setValue(QStringLiteral("Data/UnixPath"), cmdSocketFile);
+ connectionSettings.setValue(QStringLiteral("Notifications/Method"), QStringLiteral("UnixPath"));
+ connectionSettings.setValue(QStringLiteral("Notifications/UnixPath"), ntfSocketFile);
+#endif
// initialize the database
DataStore *db = DataStore::self();
@@ -331,13 +332,6 @@ bool AkonadiServer::quit()
QSettings settings(StandardDirs::serverConfigFile(), QSettings::IniFormat);
const QString connectionSettingsFile = StandardDirs::connectionConfigFile(XdgBaseDirs::WriteOnly);
-#ifndef Q_OS_WIN
- const QString socketDir = Utils::preferredSocketDirectory(StandardDirs::saveDir("data"));
-
- if (!QDir::home().remove(socketDir + QLatin1String("/akonadiserver.socket"))) {
- qCCritical(AKONADISERVER_LOG) << "Failed to remove Unix socket";
- }
-#endif
if (!QDir::home().remove(connectionSettingsFile)) {
qCCritical(AKONADISERVER_LOG) << "Failed to remove runtime connection config file";
}
diff --git a/src/server/akonadi.h b/src/server/akonadi.h
index 9c3e19d..389862b 100644
--- a/src/server/akonadi.h
+++ b/src/server/akonadi.h
@@ -84,6 +84,8 @@ private:
bool startDatabaseProcess();
bool createDatabase();
void stopDatabaseProcess();
+ uint userId() const;
+
protected:
AkonadiServer(QObject *parent = Q_NULLPTR);
diff --git a/src/server/connection.cpp b/src/server/connection.cpp
index 8eee50c..5a16a35 100644
--- a/src/server/connection.cpp
+++ b/src/server/connection.cpp
@@ -51,7 +51,6 @@ Connection::Connection(QObject *parent)
, m_socket(0)
, m_currentHandler(0)
, m_connectionState(NonAuthenticated)
- , m_isNotificationBus(false)
, m_backend(0)
, m_verifyCacheOnRetrieval(false)
, m_idleTimer(Q_NULLPTR)
@@ -112,7 +111,6 @@ void Connection::quit()
{
Tracer::self()->endConnection(m_identifier, QString());
collectionReferenceManager()->removeSession(m_sessionId);
- NotificationManager::self()->unregisterConnection(this);
delete m_socket;
m_socket = 0;
@@ -170,11 +168,6 @@ void Connection::slotConnectionIdle()
void Connection::slotNewData()
{
- if (m_isNotificationBus) {
- qWarning() << "Connection" << sessionId() << ": received data when in NotificationBus mode!";
- return;
- }
-
m_idleTimer->stop();
// will only open() a previously idle backend.
@@ -326,28 +319,6 @@ QByteArray Connection::sessionId() const
return m_sessionId;
}
-void Connection::setIsNotificationBus(bool on)
-{
- if (m_isNotificationBus == on) {
- return;
- }
-
- m_isNotificationBus = on;
- if (m_isNotificationBus) {
- qCDebug(AKONADISERVER_LOG()) << "New notification bus:" << m_sessionId;
- NotificationManager::self()->registerConnection(this);
- } else {
- NotificationManager::self()->unregisterConnection(this);
- }
-}
-
-bool Connection::isNotificationBus() const
-{
- return m_isNotificationBus;
-}
-
-
-
bool Connection::isOwnerResource(const PimItem &item) const
{
if (context()->resource().isValid() && item.collection().resourceId() == context()->resource().id()) {
@@ -402,11 +373,8 @@ void Connection::reportTime() const
void Connection::sendResponse(qint64 tag, const Protocol::Command &response)
{
- // Notifications have their own debugging system
- if (!m_isNotificationBus) {
- if (Tracer::self()->currentTracer() != QLatin1String("null")) {
- Tracer::self()->connectionOutput(m_identifier, QByteArray::number(tag) + ' ' + response.debugString().toUtf8());
- }
+ if (Tracer::self()->currentTracer() != QLatin1String("null")) {
+ Tracer::self()->connectionOutput(m_identifier, QByteArray::number(tag) + ' ' + response.debugString().toUtf8());
}
QDataStream stream(m_socket);
stream << tag;
@@ -415,13 +383,8 @@ void Connection::sendResponse(qint64 tag, const Protocol::Command &response)
void Connection::sendResponse(const Protocol::Command &response)
{
- if (m_isNotificationBus) {
- // FIXME: Don't hardcode the tag for notifications
- sendResponse(4, response);
- } else {
- Q_ASSERT(m_currentHandler);
- sendResponse(m_currentHandler->tag(), response);
- }
+ Q_ASSERT(m_currentHandler);
+ sendResponse(m_currentHandler->tag(), response);
}
Protocol::Command Connection::readCommand()
diff --git a/src/server/connection.h b/src/server/connection.h
index 810d277..3e4a874 100644
--- a/src/server/connection.h
+++ b/src/server/connection.h
@@ -67,13 +67,9 @@ public:
void setSessionId(const QByteArray &id);
QByteArray sessionId() const;
- void setIsNotificationBus(bool on);
- bool isNotificationBus() const;
-
/** Returns @c true if permanent cache verification is enabled. */
bool verifyCacheOnRetrieval() const;
-
Protocol::Command readCommand();
public Q_SLOTS:
@@ -105,7 +101,6 @@ protected:
QLocalSocket *m_socket;
QPointer<Handler> m_currentHandler;
ConnectionState m_connectionState;
- bool m_isNotificationBus;
mutable DataStore *m_backend;
QList<QByteArray> m_statusMessageQueue;
QString m_identifier;
diff --git a/src/server/handler/login.cpp b/src/server/handler/login.cpp
index 96b55f4..f7d07a0 100644
--- a/src/server/handler/login.cpp
+++ b/src/server/handler/login.cpp
@@ -33,10 +33,6 @@ bool Login::parseStream()
}
connection()->setSessionId(cmd.sessionId());
- if (cmd.sessionMode() == Protocol::LoginCommand::NotificationBus) {
- connection()->setIsNotificationBus(true);
- }
-
Q_EMIT connectionStateChange(Server::Authenticated);
return successResponse<Protocol::LoginResponse>();
diff --git a/src/server/notificationmanager.cpp b/src/server/notificationmanager.cpp
index 88a3421..6d197b3 100644
--- a/src/server/notificationmanager.cpp
+++ b/src/server/notificationmanager.cpp
@@ -19,112 +19,79 @@
*/
#include "notificationmanager.h"
-#include "notificationmanageradaptor.h"
-#include "notificationsource.h"
+#include "notificationsubscriber.h"
+#include "storage/notificationcollector.h"
#include "tracer.h"
-#include "storage/datastore.h"
-#include "connection.h"
#include "akonadiserver_debug.h"
#include <private/standarddirs_p.h>
#include <private/xdgbasedirs_p.h>
-#include <QDBusConnection>
+#include <QLocalSocket>
#include <QSettings>
+#include <QCoreApplication>
using namespace Akonadi;
using namespace Akonadi::Server;
-NotificationManager *NotificationManager::mSelf = 0;
-
-Q_DECLARE_METATYPE(QVector<qint64>)
-
NotificationManager::NotificationManager()
- : QObject(0)
- , mDebug(false)
+ : AkThread()
{
- qRegisterMetaType<QVector<QByteArray>>();
- qDBusRegisterMetaType<QVector<QByteArray>>();
- qRegisterMetaType<Protocol::ChangeNotification::Type>();
- qDBusRegisterMetaType<Protocol::ChangeNotification::Type>();
- qRegisterMetaType<QVector<qint64>>();
- qDBusRegisterMetaType<QVector<qint64>>();
-
- new NotificationManagerAdaptor(this);
- QDBusConnection::sessionBus().registerObject(QStringLiteral("/notifications"),
- this, QDBusConnection::ExportAdaptors);
- QDBusConnection::sessionBus().registerObject(QStringLiteral("/notifications/debug"),
- this, QDBusConnection::ExportScriptableSlots |
- QDBusConnection::ExportScriptableSignals);
-
- const QString serverConfigFile = StandardDirs::serverConfigFile(XdgBaseDirs::ReadWrite);
- QSettings settings(serverConfigFile, QSettings::IniFormat);
-
- mTimer.setInterval(settings.value(QStringLiteral("NotificationManager/Interval"), 50).toInt());
- mTimer.setSingleShot(true);
- connect(&mTimer, &QTimer::timeout, this, &NotificationManager::emitPendingNotifications);
}
NotificationManager::~NotificationManager()
{
}
-NotificationManager *NotificationManager::self()
+void NotificationManager::init()
{
- if (!mSelf) {
- mSelf = new NotificationManager();
- }
+ AkThread::init();
- return mSelf;
+ const QString serverConfigFile = StandardDirs::serverConfigFile(XdgBaseDirs::ReadWrite);
+ QSettings settings(serverConfigFile, QSettings::IniFormat);
+
+ mTimer = new QTimer(this);
+ mTimer->setInterval(settings.value(QStringLiteral("NotificationManager/Interval"), 50).toInt());
+ mTimer->setSingleShot(true);
+ connect(mTimer, &QTimer::timeout,
+ this, &NotificationManager::emitPendingNotifications);
}
-void NotificationManager::connectNotificationCollector(NotificationCollector *collector)
+void NotificationManager::quit()
{
- connect(collector, &NotificationCollector::notify,
- this, &NotificationManager::slotNotify);
+ AkThread::quit();
+
+ qDeleteAll(mSubscribers);
}
-void NotificationManager::registerConnection(Connection *connection)
+void NotificationManager::registerConnection(quintptr socketDescriptor)
{
- QMutexLocker locker(&mSourcesLock);
- auto source = std::find_if(mNotificationSources.cbegin(), mNotificationSources.cend(),
- [connection](NotificationSource *source) {
- return connection->sessionId() == source->dbusPath().path().toLatin1();
- });
- if (source == mNotificationSources.cend()) {
- qWarning() << "Received request to register Notification bus connection, but there's no such subscriber";
- return;
- }
+ Q_ASSERT(thread() == QThread::currentThread());
- connect(const_cast<NotificationSource*>(*source), &NotificationSource::notify,
- connection, static_cast<void(Connection::*)(const Protocol::Command &)>(&Connection::sendResponse),
- Qt::QueuedConnection);
+ NotificationSubscriber *subscriber = new NotificationSubscriber(socketDescriptor);
+ qDebug() << "NotificationManager: new connection (registered as" << subscriber << ")";
+ connect(subscriber, &QObject::destroyed,
+ [this, subscriber]() {
+ mSubscribers.removeOne(subscriber);
+ });
+
+ mSubscribers.push_back(subscriber);
}
-void NotificationManager::unregisterConnection(Connection *connection)
+void NotificationManager::connectNotificationCollector(NotificationCollector *collector)
{
- QMutexLocker locker(&mSourcesLock);
- auto source = std::find_if(mNotificationSources.cbegin(), mNotificationSources.cend(),
- [connection](NotificationSource *source) {
- return connection->sessionId() == source->dbusPath().path().toLatin1();
- });
- if (source != mNotificationSources.cend()) {
- (*source)->disconnect(connection);
- }
+ connect(collector, &NotificationCollector::notify,
+ this, &NotificationManager::slotNotify);
}
-
-
void NotificationManager::slotNotify(const Akonadi::Protocol::ChangeNotification::List &msgs)
{
- //qCDebug(AKONADISERVER_LOG) << Q_FUNC_INFO << "Appending" << msgs.count() << "notifications to current list of " << mNotifications.count() << "notifications";
Q_FOREACH (const Protocol::ChangeNotification &msg, msgs) {
Protocol::ChangeNotification::appendAndCompress(mNotifications, msg);
}
- //qCDebug(AKONADISERVER_LOG) << Q_FUNC_INFO << "We have" << mNotifications.count() << "notifications queued in total after appendAndCompress()";
- if (!mTimer.isActive()) {
- mTimer.start();
+ if (!mTimer->isActive()) {
+ mTimer->start();
}
}
@@ -134,104 +101,9 @@ void NotificationManager::emitPendingNotifications()
return;
}
- if (mDebug) {
- QVector<QByteArray> bas;
- bas.reserve(mNotifications.size());
- QBuffer buffer;
- buffer.open(QIODevice::WriteOnly);
- Q_FOREACH (const Protocol::ChangeNotification &notification, mNotifications) {
- Tracer::self()->signal("NotificationManager::notify", notification.debugString());
- Protocol::serialize(&buffer, notification);
- bas << buffer.data();
- buffer.buffer().clear();
- buffer.seek(0);
- }
- Q_EMIT debugNotify(bas);
- } else {
- Q_FOREACH (const Protocol::ChangeNotification &notification, mNotifications) {
- Tracer::self()->signal("NotificationManager::notify", notification.debugString());
- }
- }
-
- Q_FOREACH (NotificationSource *source, mNotificationSources) {
- Protocol::ChangeNotification::List acceptedNotifications;
- Q_FOREACH (const Protocol::ChangeNotification &notification, mNotifications) {
- if (source->acceptsNotification(notification)) {
- acceptedNotifications << notification;
- }
- }
-
- if (!acceptedNotifications.isEmpty()) {
- source->emitNotification(acceptedNotifications);
- }
+ Q_FOREACH (NotificationSubscriber *subscriber, mSubscribers) {
+ subscriber->notify(mNotifications);
}
mNotifications.clear();
}
-
-QDBusObjectPath NotificationManager::subscribe(const QString &identifier, bool exclusive)
-{
- qCDebug(AKONADISERVER_LOG) << Q_FUNC_INFO << this << identifier << exclusive;
- NotificationSource *source = mNotificationSources.value(identifier);
- if (source) {
- qCDebug(AKONADISERVER_LOG) << "Known subscriber" << identifier << "subscribes again";
- source->addClientServiceName(message().service());
- } else {
- source = new NotificationSource(identifier, message().service(), this);
- }
-
- registerSource(source);
- source->setExclusive(exclusive);
-
- // FIXME KF5: Emit the QDBusObjectPath instead of the identifier
- Q_EMIT subscribed(source->dbusPath());
-
- return source->dbusPath();
-}
-
-void NotificationManager::registerSource(NotificationSource *source)
-{
- // Protect write operations because of registerConnection()
- QMutexLocker locker(&mSourcesLock);
- mNotificationSources.insert(source->identifier(), source);
-}
-
-void NotificationManager::unsubscribe(const QString &identifier)
-{
- NotificationSource *source = mNotificationSources.value(identifier);
- if (source) {
- unregisterSource(source);
- source->deleteLater();
- Q_EMIT unsubscribed(source->dbusPath());
- } else {
- qCDebug(AKONADISERVER_LOG) << "Attempt to unsubscribe unknown subscriber" << identifier;
- }
-}
-
-void NotificationManager::unregisterSource(NotificationSource *source)
-{
- // Protect write operations because of registerConnection()
- QMutexLocker locker(&mSourcesLock);
- mNotificationSources.remove(source->identifier());
-}
-
-QList<QDBusObjectPath> NotificationManager::subscribers() const
-{
- QList<QDBusObjectPath> identifiers;
- identifiers.reserve(mNotificationSources.count());
- Q_FOREACH (NotificationSource *source, mNotificationSources) {
- identifiers << source->dbusPath();
- }
-
- return identifiers;
-}
-
-void NotificationManager::enableDebug(bool enable)
-{
- mDebug = enable;
-}
-
-bool NotificationManager::debugEnabled() const
-{
- return mDebug;
-}
diff --git a/src/server/notificationmanager.h b/src/server/notificationmanager.h
index 079f52e..bd2597c 100644
--- a/src/server/notificationmanager.h
+++ b/src/server/notificationmanager.h
@@ -20,98 +20,48 @@
#ifndef AKONADI_NOTIFICATIONMANAGER_H
#define AKONADI_NOTIFICATIONMANAGER_H
+#include "akthread.h"
+
#include <private/protocol_p.h>
-#include "storage/entity.h"
-#include <QtCore/QHash>
-#include <QtCore/QObject>
#include <QtCore/QTimer>
-#include <QtCore/QMutex>
-#include <QtDBus/QDBusContext>
-#include <QtDBus/QDBusObjectPath>
class NotificationManagerTest;
+class QLocalSocket;
namespace Akonadi {
namespace Server {
class NotificationCollector;
-class NotificationSource;
-class Connection;
+class NotificationSubscriber;
-/**
- Notification manager D-Bus interface.
-*/
-class NotificationManager : public QObject, protected QDBusContext
+class NotificationManager : public AkThread
{
Q_OBJECT
- Q_CLASSINFO("D-Bus Interface", "org.freedesktop.Akonadi.NotificationManager")
public:
- static NotificationManager *self();
-
+ explicit NotificationManager();
virtual ~NotificationManager();
void connectNotificationCollector(NotificationCollector *collector);
- void registerConnection(Connection *connection);
- void unregisterConnection(Connection *connection);
-
public Q_SLOTS:
- Q_SCRIPTABLE void emitPendingNotifications();
-
- /**
- * Subscribe to notifications emitted by this manager.
- *
- * @param identifier Identifier to use for our subscription.
- * @param exclusive Exclusive subscribers also receive notifications on referenced collections
- * @return The path we got assigned. Contains identifier.
- */
- QDBusObjectPath subscribe(const QString &identifier, bool exclusive);
-
- /**
- * Unsubscribe from this manager.
- *
- * This method is for your inconvenience only. It's advisable to use the unsubscribe method
- * provided by the NotificationSource.
- *
- * @param identifier The identifier used for subscription.
- */
- void unsubscribe(const QString &identifier);
-
- /**
- * Returns identifiers of currently subscribed sources
- */
- Q_SCRIPTABLE QList<QDBusObjectPath> subscribers() const;
-
- Q_SCRIPTABLE void enableDebug(bool enable);
- Q_SCRIPTABLE bool debugEnabled() const;
-
-Q_SIGNALS:
- Q_SCRIPTABLE void debugNotify(const QVector<QByteArray> &msg);
-
- void subscribed(const QDBusObjectPath &path);
- void unsubscribed(const QDBusObjectPath &path);
+ void registerConnection(quintptr socketDescriptor);
+
+ void emitPendingNotifications();
private Q_SLOTS:
void slotNotify(const Akonadi::Protocol::ChangeNotification::List &msgs);
-private:
- NotificationManager();
+protected:
+ void init() Q_DECL_OVERRIDE;
+ void quit() Q_DECL_OVERRIDE;
private:
- void registerSource(NotificationSource *source);
- void unregisterSource(NotificationSource *source);
-
- static NotificationManager *mSelf;
Protocol::ChangeNotification::List mNotifications;
- QTimer mTimer;
-
- //! One message source for each subscribed process
- QMutex mSourcesLock;
- QHash<QString, NotificationSource *> mNotificationSources;
+ QTimer *mTimer;
- bool mDebug;
+ QVector<NotificationSubscriber *> mSubscribers;
friend class NotificationSource;
friend class ::NotificationManagerTest;
diff --git a/src/server/notificationsource.h b/src/server/notificationsource.h
deleted file mode 100644
index 72acc98..0000000
--- a/src/server/notificationsource.h
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- Copyright (c) 2010 Michael Jansen <kde@michael-jansen>
-
- This library is free software; you can redistribute it and/or modify it
- under the terms of the GNU Library General Public License as published by
- the Free Software Foundation; either version 2 of the License, or (at your
- option) any later version.
-
- This library is distributed in the hope that it will be useful, but WITHOUT
- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
- License for more details.
-
- You should have received a copy of the GNU Library General Public License
- along with this library; see the file COPYING.LIB. If not, write to the
- Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- 02110-1301, USA.
-*/
-#ifndef AKONADI_NOTIFICATIONSOURCE_H
-#define AKONADI_NOTIFICATIONSOURCE_H
-
-#include <QtCore/QObject>
-#include <QtCore/QVector>
-#include <QtDBus/QtDBus>
-
-#include "entities.h"
-
-#include <private/protocol_p.h>
-
-namespace Akonadi {
-namespace Server {
-
-class Connection;
-class NotificationManager;
-
-class NotificationSource : public QObject
-{
- Q_OBJECT
- Q_CLASSINFO("D-Bus Interface", "org.freedesktop.Akonadi.NotificationSource")
-
-public:
-
- /**
- * Construct a NotificationSource.
- *
- * @param identifier The identifier of this notification source, defined by the client
- * @param clientServiceName The D-Bus service name of the client, used to clean up if the client does not unsubscribe correctly.
- * @param parent The parent object.
- */
- NotificationSource(const QString &identifier, const QString &clientServiceName, NotificationManager *parent);
-
- /**
- * Destroy the NotificationSource.
- */
- virtual ~NotificationSource();
-
- /**
- * Emit the given notifications
- *
- * @param notifications List of notifications to emit.
- */
- void emitNotification(const Protocol::ChangeNotification::List &notifications);
-
- /**
- * Return the dbus path this message source uses.
- */
- QDBusObjectPath dbusPath() const;
-
- /**
- * Return the identifier for this message source
- */
- QString identifier() const;
-
- /**
- * Add another client service to watch for. Auto-unsubscription only happens if
- * all watched client services have been stopped.
- */
- void addClientServiceName(const QString &clientServiceName);
-
- bool acceptsNotification(const Protocol::ChangeNotification &notification);
-
-public Q_SLOTS:
- /**
- * Unsubscribe from the message source.
- *
- * This will delete the message source and make the used dbus path unavailable.
- */
- Q_SCRIPTABLE void unsubscribe();
-
- Q_SCRIPTABLE void setMonitoredCollection(Entity::Id id, bool monitored);
- Q_SCRIPTABLE QVector<Entity::Id> monitoredCollections() const;
- Q_SCRIPTABLE void setMonitoredItem(Entity::Id id, bool monitored);
- Q_SCRIPTABLE QVector<Entity::Id> monitoredItems() const;
- Q_SCRIPTABLE void setMonitoredTag(Entity::Id id, bool monitored);
- Q_SCRIPTABLE QVector<Entity::Id> monitoredTags() const;
- Q_SCRIPTABLE void setMonitoredResource(const QByteArray &resource, bool monitored);
- Q_SCRIPTABLE QVector<QByteArray> monitoredResources() const;
- Q_SCRIPTABLE void setMonitoredMimeType(const QString &mimeType, bool monitored);
- Q_SCRIPTABLE QStringList monitoredMimeTypes() const;
- Q_SCRIPTABLE void setAllMonitored(bool allMonitored);
- Q_SCRIPTABLE bool isAllMonitored() const;
- Q_SCRIPTABLE void setSession( const QByteArray &sessionId );
- Q_SCRIPTABLE void setIgnoredSession(const QByteArray &sessionId, bool ignored);
- Q_SCRIPTABLE QVector<QByteArray> ignoredSessions() const;
- Q_SCRIPTABLE void setMonitoredType(Protocol::ChangeNotification::Type type, bool monitored);
- Q_SCRIPTABLE QVector<Protocol::ChangeNotification::Type> monitoredTypes() const;
- Q_SCRIPTABLE void setExclusive( bool exclusive );
- Q_SCRIPTABLE bool isExclusive() const;
-
-Q_SIGNALS:
- // Internal, not exported to DBus
- void notify(const Akonadi::Protocol::Command &response);
-
- Q_SCRIPTABLE void monitoredCollectionsChanged();
- Q_SCRIPTABLE void monitoredItemsChanged();
- Q_SCRIPTABLE void monitoredTagsChanged();
- Q_SCRIPTABLE void monitoredResourcesChanged();
- Q_SCRIPTABLE void monitoredMimeTypesChanged();
- Q_SCRIPTABLE void isAllMonitoredChanged();
- Q_SCRIPTABLE void ignoredSessionsChanged();
- Q_SCRIPTABLE void monitoredTypesChanged();
-
-private Q_SLOTS:
- void serviceUnregistered(const QString &serviceName);
-
-private:
- bool isCollectionMonitored(Entity::Id id) const;
- bool isMimeTypeMonitored(const QString &mimeType) const;
- bool isMoveDestinationResourceMonitored(const Protocol::ChangeNotification &msg) const;
-
-private:
- NotificationManager *mManager;
- QString mIdentifier;
- QString mDBusIdentifier;
- QDBusServiceWatcher *mClientWatcher;
-
- QPointer<Connection> mConnection;
-
- bool mAllMonitored;
- bool mExclusive;
- QSet<Entity::Id> mMonitoredCollections;
- QSet<Entity::Id> mMonitoredItems;
- QSet<Entity::Id> mMonitoredTags;
- // TODO: Make this a bitflag
- QSet<Protocol::ChangeNotification::Type> mMonitoredTypes;
- QSet<QString> mMonitoredMimeTypes;
- QSet<QByteArray> mMonitoredResources;
- QSet<QByteArray> mIgnoredSessions;
- QByteArray mSession;
-
-}; // class NotificationSource
-
-} // namespace Server
-} // namespace Akonadi
-
-#endif // #define AKONADI_NOTIFICATIONSOURCE_H
diff --git a/src/server/notificationsource.cpp b/src/server/notificationsubscriber.cpp
index 14986d6..4f43a43 100644
--- a/src/server/notificationsource.cpp
+++ b/src/server/notificationsubscriber.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2010 Michael Jansen <kde@michael-jansen>
+ Copyright (c) 2015 Daniel Vrátil <dvratil@kde.org>
This library is free software; you can redistribute it and/or modify it
under the terms of the GNU Library General Public License as published by
@@ -17,245 +17,212 @@
02110-1301, USA.
*/
-#include "notificationsource.h"
+#include "notificationsubscriber.h"
#include "akonadiserver_debug.h"
-#include "notificationsourceadaptor.h"
-#include "notificationmanager.h"
#include "collectionreferencemanager.h"
-#include "connection.h"
+
+#include <QLocalSocket>
+#include <QDataStream>
+
+#include <private/protocol_p.h>
+#include <private/protocol_exception_p.h>
using namespace Akonadi;
using namespace Akonadi::Server;
-template<typename T>
-QVector<T> setToVector(const QSet<T> &set)
-{
- QVector<T> v;
- v.reserve(set.size());
- Q_FOREACH (const T &val, set) {
- v << val;
- }
- return v;
-}
+QMimeDatabase NotificationSubscriber::sMimeDatabase;
-NotificationSource::NotificationSource(const QString &identifier, const QString &clientServiceName, NotificationManager *parent)
- : QObject(parent)
- , mManager(parent)
- , mIdentifier(identifier)
- , mDBusIdentifier(identifier)
- , mClientWatcher(0)
+NotificationSubscriber::NotificationSubscriber()
+ : QObject()
+ , mSocket(Q_NULLPTR)
, mAllMonitored(false)
, mExclusive(false)
{
- new NotificationSourceAdaptor(this);
+}
- // Clean up for dbus usage: any non-alphanumeric char should be turned into '_'
- const int len = mDBusIdentifier.length();
- for (int i = 0; i < len; ++i) {
- if (!mDBusIdentifier[i].isLetterOrNumber()) {
- mDBusIdentifier[i] = QLatin1Char('_');
- }
- }
- QDBusConnection::sessionBus().registerObject(
- dbusPath().path(),
- this,
- QDBusConnection::ExportAdaptors);
+NotificationSubscriber::NotificationSubscriber(quintptr socketDescriptor)
+ : NotificationSubscriber()
+{
+ mSocket = new QLocalSocket(this);
+ connect(mSocket, &QLocalSocket::readyRead,
+ this, &NotificationSubscriber::socketReadyRead);
+ connect(mSocket, &QLocalSocket::disconnected,
+ this, &NotificationSubscriber::socketDisconnected);
+ mSocket->setSocketDescriptor(socketDescriptor);
- mClientWatcher = new QDBusServiceWatcher(clientServiceName, QDBusConnection::sessionBus(), QDBusServiceWatcher::WatchForUnregistration, this);
- connect(mClientWatcher, &QDBusServiceWatcher::serviceUnregistered, this, &NotificationSource::serviceUnregistered);
+ writeCommand(0, Protocol::HelloResponse(QStringLiteral("Akonadi"),
+ QStringLiteral("Not-really IMAP server"),
+ Protocol::version()));
}
-NotificationSource::~NotificationSource()
+NotificationSubscriber::~NotificationSubscriber()
{
}
-QDBusObjectPath NotificationSource::dbusPath() const
+void NotificationSubscriber::socketReadyRead()
{
- return QDBusObjectPath(QLatin1String("/subscriber/") + mDBusIdentifier);
-}
+ while (mSocket->bytesAvailable() > (int) sizeof(qint64)) {
+ QDataStream stream(mSocket);
-void NotificationSource::emitNotification(const Protocol::ChangeNotification::List &notifications)
-{
- Q_FOREACH (const auto &notification, notifications) {
- Q_EMIT notify(notification);
- }
-}
+ // Ignored atm
+ qint64 tag = -1;
+ stream >> tag;
-QString NotificationSource::identifier() const
-{
- return mIdentifier;
+ Protocol::Command cmd;
+ try {
+ cmd = Protocol::deserialize(mSocket);
+ } catch (const Akonadi::ProtocolException &e) {
+ qDebug() << "ProtocolException:" << e.what();
+ disconnectSubscriber();
+ return;
+ } catch (const std::exception &e) {
+ qDebug() << "Unknown exception:" << e.what();
+ disconnectSubscriber();
+ return;
+ }
+ if (cmd.type() == Protocol::Command::Invalid) {
+ qDebug() << "Received an invalid command: resetting connection";
+ disconnectSubscriber();
+ return;
+ }
+
+ switch (cmd.type()) {
+ case Protocol::Command::CreateSubscription:
+ registerSubscriber(cmd);
+ writeCommand(tag, Protocol::CreateSubscriptionResponse());
+ break;
+ case Protocol::Command::ModifySubscription:
+ if (mSubscriber.isEmpty()) {
+ qDebug() << "Received ModifySubscription command before RegisterSubscriber";
+ disconnectSubscriber();
+ return;
+ }
+ modifySubscription(cmd);
+ writeCommand(tag, Protocol::ModifySubscriptionResponse());
+ break;
+ case Protocol::Command::Logout:
+ disconnectSubscriber();
+ break;
+ default:
+ qDebug() << "Invalid command" << cmd.type() << "received by NotificationSubscriber" << mSubscriber;
+ disconnectSubscriber();
+ break;
+ }
+ }
}
-void NotificationSource::unsubscribe()
+void NotificationSubscriber::socketDisconnected()
{
- mManager->unsubscribe(mIdentifier);
+ qDebug() << "Subscriber" << mSubscriber << "disconnected from us!";
+ disconnectSubscriber();
}
-bool NotificationSource::isExclusive() const
+void NotificationSubscriber::disconnectSubscriber()
{
- return mExclusive;
+ disconnect(mSocket, &QLocalSocket::readyRead,
+ this, &NotificationSubscriber::socketReadyRead);
+ disconnect(mSocket, &QLocalSocket::disconnected,
+ this, &NotificationSubscriber::socketDisconnected);
+ mSocket->close();
+ deleteLater();
}
-void NotificationSource::setExclusive(bool enabled)
+void NotificationSubscriber::registerSubscriber(const Protocol::CreateSubscriptionCommand &command)
{
- mExclusive = enabled;
+ qDebug() << "Subscriber identified:" << command.subscriberName();
+ mSubscriber = command.subscriberName();
}
-void NotificationSource::addClientServiceName(const QString &clientServiceName)
+void NotificationSubscriber::modifySubscription(const Protocol::ModifySubscriptionCommand &command)
{
- if (mClientWatcher->watchedServices().contains(clientServiceName)) {
- return;
- }
+ const auto modifiedParts = command.modifiedParts();
- mClientWatcher->addWatchedService(clientServiceName);
- qCDebug(AKONADISERVER_LOG) << Q_FUNC_INFO << "Notification source" << mIdentifier << "now serving:" << mClientWatcher->watchedServices();
-}
+ #define START_MONITORING(type) \
+ (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts( \
+ Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Add))
+ #define STOP_MONITORING(type) \
+ (modifiedParts & Protocol::ModifySubscriptionCommand::ModifiedParts( \
+ Protocol::ModifySubscriptionCommand::type | Protocol::ModifySubscriptionCommand::Remove))
-void NotificationSource::serviceUnregistered(const QString &serviceName)
-{
- mClientWatcher->removeWatchedService(serviceName);
- qCDebug(AKONADISERVER_LOG) << Q_FUNC_INFO << "Notification source" << mIdentifier << "now serving:" << mClientWatcher->watchedServices();
+ #define APPEND(set, newItems) \
+ Q_FOREACH (const auto &entity, newItems) { \
+ set.insert(entity); \
+ }
+ #define REMOVE(set, items) \
+ Q_FOREACH (const auto &entity, items) { \
+ set.remove(entity); \
+ }
- if (mClientWatcher->watchedServices().isEmpty()) {
- unsubscribe();
+ qCDebug(AKONADISERVER_LOG) << "Subscription for" << mSubscriber << "updated:";
+ if (START_MONITORING(Types)) {
+ APPEND(mMonitoredTypes, command.startMonitoringTypes())
+ qCDebug(AKONADISERVER_LOG) << "\tStart monitoring types:" << command.startMonitoringTypes();
}
-}
-
-void NotificationSource::setMonitoredCollection(Entity::Id id, bool monitored)
-{
- if (id < 0) {
- return;
+ if (STOP_MONITORING(Types)) {
+ REMOVE(mMonitoredTypes, command.stopMonitoringTypes());
+ qCDebug(AKONADISERVER_LOG) << "\tStop monitoring types:" << command.stopMonitoringTypes();
}
-
- if (monitored && !mMonitoredCollections.contains(id)) {
- mMonitoredCollections.insert(id);
- Q_EMIT monitoredCollectionsChanged();
- } else if (!monitored) {
- mMonitoredCollections.remove(id);
- Q_EMIT monitoredCollectionsChanged();
+ if (START_MONITORING(Collections)) {
+ APPEND(mMonitoredCollections, command.startMonitoringCollections())
+ qCDebug(AKONADISERVER_LOG) << "\tStart monitoring collections:" << command.startMonitoringCollections();
}
-}
-
-QVector<Entity::Id> NotificationSource::monitoredCollections() const
-{
- return setToVector<Entity::Id>(mMonitoredCollections);
-}
-
-void NotificationSource::setMonitoredItem(Entity::Id id, bool monitored)
-{
- if (id < 0) {
- return;
+ if (STOP_MONITORING(Collections)) {
+ REMOVE(mMonitoredCollections, command.stopMonitoringCollections())
+ qCDebug(AKONADISERVER_LOG) << "\tStop monitoring collections:" << command.stopMonitoringCollections();
}
-
- if (monitored && !mMonitoredItems.contains(id)) {
- mMonitoredItems.insert(id);
- Q_EMIT monitoredItemsChanged();
- } else if (!monitored) {
- mMonitoredItems.remove(id);
- Q_EMIT monitoredItemsChanged();
+ if (START_MONITORING(Items)) {
+ APPEND(mMonitoredItems, command.startMonitoringItems())
+ qCDebug(AKONADISERVER_LOG) << "\tStart monitoring items:" << command.startMonitoringItems();
}
-}
-
-QVector<Entity::Id> NotificationSource::monitoredItems() const
-{
- return setToVector<Entity::Id>(mMonitoredItems);
-}
-
-void NotificationSource::setMonitoredTag(Entity::Id id, bool monitored)
-{
- if (id < 0) {
- return;
+ if (STOP_MONITORING(Items)) {
+ REMOVE(mMonitoredItems, command.stopMonitoringItems())
+ qCDebug(AKONADISERVER_LOG) << "\tStop monitoring items:" << command.stopMonitoringItems();
}
-
- if (monitored && !mMonitoredTags.contains(id)) {
- mMonitoredTags.insert(id);
- Q_EMIT monitoredTagsChanged();
- } else if (!monitored) {
- mMonitoredTags.remove(id);
- Q_EMIT monitoredTagsChanged();
+ if (START_MONITORING(Tags)) {
+ APPEND(mMonitoredTags, command.startMonitoringTags())
+ qCDebug(AKONADISERVER_LOG) << "\tStart monitoring tags:" << command.startMonitoringTags();
}
-}
-
-QVector<Entity::Id> NotificationSource::monitoredTags() const
-{
- return setToVector<Entity::Id>(mMonitoredTags);
-}
-
-void NotificationSource::setMonitoredResource(const QByteArray &resource, bool monitored)
-{
- if (monitored && !mMonitoredResources.contains(resource)) {
- mMonitoredResources.insert(resource);
- Q_EMIT monitoredResourcesChanged();
- } else if (!monitored) {
- mMonitoredResources.remove(resource);
- Q_EMIT monitoredResourcesChanged();
+ if (STOP_MONITORING(Tags)) {
+ REMOVE(mMonitoredTags, command.stopMonitoringTags())
+ qCDebug(AKONADISERVER_LOG) << "\tStop monitoring tags:" << command.stopMonitoringTags();
}
-}
-
-QVector<QByteArray> NotificationSource::monitoredResources() const
-{
- return setToVector<QByteArray>(mMonitoredResources);
-}
-
-void NotificationSource::setMonitoredMimeType(const QString &mimeType, bool monitored)
-{
- if (mimeType.isEmpty()) {
- return;
+ if (START_MONITORING(Resources)) {
+ APPEND(mMonitoredResources, command.startMonitoringResources())
+ qCDebug(AKONADISERVER_LOG) << "\tStart monitoring resources:" << command.startMonitoringResources();
}
-
- if (monitored && !mMonitoredMimeTypes.contains(mimeType)) {
- mMonitoredMimeTypes.insert(mimeType);
- Q_EMIT monitoredMimeTypesChanged();
- } else if (!monitored) {
- mMonitoredMimeTypes.remove(mimeType);
- Q_EMIT monitoredMimeTypesChanged();
+ if (STOP_MONITORING(Resources)) {
+ REMOVE(mMonitoredResources, command.stopMonitoringResources())
+ qCDebug(AKONADISERVER_LOG) << "\tStop monitoring resourceS:" << command.stopMonitoringResources();
}
-}
-
-QStringList NotificationSource::monitoredMimeTypes() const
-{
- return mMonitoredMimeTypes.toList();
-}
-
-void NotificationSource::setAllMonitored(bool allMonitored)
-{
- if (allMonitored && !mAllMonitored) {
- mAllMonitored = true;
- Q_EMIT isAllMonitoredChanged();
- } else if (!allMonitored) {
- mAllMonitored = false;
- Q_EMIT isAllMonitoredChanged();
+ if (START_MONITORING(MimeTypes)) {
+ APPEND(mMonitoredMimeTypes, command.startMonitoringMimeTypes())
+ qCDebug(AKONADISERVER_LOG) << "\tStart monitoring mime types:" << command.startMonitoringMimeTypes();
}
-}
-
-bool NotificationSource::isAllMonitored() const
-{
- return mAllMonitored;
-}
-
-void NotificationSource::setSession(const QByteArray &sessionId)
-{
- mSession = sessionId;
-}
-
-void NotificationSource::setIgnoredSession(const QByteArray &sessionId, bool ignored)
-{
- if (ignored && !mIgnoredSessions.contains(sessionId)) {
- mIgnoredSessions.insert(sessionId);
- Q_EMIT ignoredSessionsChanged();
- } else if (!ignored) {
- mIgnoredSessions.remove(sessionId);
- Q_EMIT ignoredSessionsChanged();
+ if (STOP_MONITORING(MimeTypes)) {
+ REMOVE(mMonitoredMimeTypes, command.stopMonitoringMimeTypes())
+ qCDebug(AKONADISERVER_LOG) << "\tStop monitoring mime types:" << command.stopMonitoringCollections();
+ }
+ if (START_MONITORING(Sessions)) {
+ APPEND(mIgnoredSessions, command.startIgnoringSessions())
+ qCDebug(AKONADISERVER_LOG) << "\tStart ignoring sessions:" << command.startIgnoringSessions();
+ }
+ if (STOP_MONITORING(Sessions)) {
+ REMOVE(mIgnoredSessions, command.stopIgnoringSessions())
+ qCDebug(AKONADISERVER_LOG) << "\tStop ignoring sessions:" << command.stopIgnoringSessions();
+ }
+ if (modifiedParts & Protocol::ModifySubscriptionCommand::AllFlag) {
+ mAllMonitored = command.allMonitored();
+ qCDebug(AKONADISERVER_LOG) << "\tAll monitored:" << command.allMonitored();
+ }
+ if (modifiedParts & Protocol::ModifySubscriptionCommand::ExclusiveFlag) {
+ mExclusive = command.isExclusive();
+ qCDebug(AKONADISERVER_LOG) << "\tExclusive:" << command.isExclusive();
}
}
-QVector<QByteArray> NotificationSource::ignoredSessions() const
-{
- return setToVector<QByteArray>(mIgnoredSessions);
-}
-bool NotificationSource::isCollectionMonitored(Entity::Id id) const
+bool NotificationSubscriber::isCollectionMonitored(Entity::Id id) const
{
if (id < 0) {
return false;
@@ -267,14 +234,23 @@ bool NotificationSource::isCollectionMonitored(Entity::Id id) const
return false;
}
-bool NotificationSource::isMimeTypeMonitored(const QString &mimeType) const
+bool NotificationSubscriber::isMimeTypeMonitored(const QString &mimeType) const
{
- return mMonitoredMimeTypes.contains(mimeType);
+ const QMimeType mt = sMimeDatabase.mimeTypeForName(mimeType);
+ if (mMonitoredMimeTypes.contains(mimeType)) {
+ return true;
+ }
- // FIXME: Handle mimetype aliases
+ Q_FOREACH (const QString &alias, mt.aliases()) {
+ if (mMonitoredMimeTypes.contains(alias)) {
+ return true;
+ }
+ }
+
+ return false;
}
-bool NotificationSource::isMoveDestinationResourceMonitored(const Protocol::ChangeNotification &msg) const
+bool NotificationSubscriber::isMoveDestinationResourceMonitored(const Protocol::ChangeNotification &msg) const
{
if (msg.operation() != Protocol::ChangeNotification::Move) {
return false;
@@ -282,23 +258,7 @@ bool NotificationSource::isMoveDestinationResourceMonitored(const Protocol::Chan
return mMonitoredResources.contains(msg.destinationResource());
}
-void NotificationSource::setMonitoredType(Protocol::ChangeNotification::Type type, bool monitored)
-{
- if (monitored && !mMonitoredTypes.contains(type)) {
- mMonitoredTypes.insert(type);
- Q_EMIT monitoredTypesChanged();
- } else if (!monitored) {
- mMonitoredTypes.remove(type);
- Q_EMIT monitoredTypesChanged();
- }
-}
-
-QVector<Protocol::ChangeNotification::Type> NotificationSource::monitoredTypes() const
-{
- return setToVector<Protocol::ChangeNotification::Type>(mMonitoredTypes);
-}
-
-bool NotificationSource::acceptsNotification(const Protocol::ChangeNotification &notification)
+bool NotificationSubscriber::acceptsNotification(const Protocol::ChangeNotification &notification)
{
// session is ignored
if (mIgnoredSessions.contains(notification.sessionId())) {
@@ -481,3 +441,25 @@ bool NotificationSource::acceptsNotification(const Protocol::ChangeNotification
return false;
}
+
+void NotificationSubscriber::notify(const Protocol::ChangeNotification::List &notifications)
+{
+ Q_FOREACH (const auto &notification, notifications) {
+ if (acceptsNotification(notification)) {
+ writeNotification(notification);
+ }
+ }
+}
+
+void NotificationSubscriber::writeNotification(const Protocol::ChangeNotification &notification)
+{
+ // tag chosen by fair dice roll
+ writeCommand(4, notification);
+}
+
+void NotificationSubscriber::writeCommand(qint64 tag, const Protocol::Command& cmd)
+{
+ QDataStream stream(mSocket);
+ stream << tag; // chosen by fair dice roll
+ Protocol::serialize(mSocket, cmd);
+}
diff --git a/src/server/notificationsubscriber.h b/src/server/notificationsubscriber.h
new file mode 100644
index 0000000..559d256
--- /dev/null
+++ b/src/server/notificationsubscriber.h
@@ -0,0 +1,88 @@
+/*
+ Copyright (c) 2015 Daniel Vrátil <dvratil@kde.org>
+
+ This library is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Library General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or (at your
+ option) any later version.
+
+ This library is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public
+ License for more details.
+
+ You should have received a copy of the GNU Library General Public License
+ along with this library; see the file COPYING.LIB. If not, write to the
+ Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ 02110-1301, USA.
+*/
+
+#ifndef NOTIFICATIONSUBSCRIBER_H
+#define NOTIFICATIONSUBSCRIBER_H
+
+#include <QObject>
+#include <QByteArray>
+#include <QMimeDatabase>
+
+#include <private/protocol_p.h>
+#include "entities.h"
+
+class QLocalSocket;
+
+namespace Akonadi {
+namespace Server {
+
+
+class NotificationSubscriber : public QObject
+{
+ Q_OBJECT
+
+public:
+ explicit NotificationSubscriber(quintptr socketDescriptor);
+ ~NotificationSubscriber();
+
+ void notify(const Protocol::ChangeNotification::List &notifications);
+
+private Q_SLOTS:
+ void socketReadyRead();
+ void socketDisconnected();
+
+protected:
+ void registerSubscriber(const Protocol::CreateSubscriptionCommand &command);
+ void modifySubscription(const Protocol::ModifySubscriptionCommand &command);
+ void disconnectSubscriber();
+
+private:
+ bool acceptsNotification(const Protocol::ChangeNotification &notification);
+ bool isCollectionMonitored(Entity::Id id) const;
+ bool isMimeTypeMonitored(const QString &mimeType) const;
+ bool isMoveDestinationResourceMonitored(const Protocol::ChangeNotification &msg) const;
+
+protected:
+ explicit NotificationSubscriber();
+
+ virtual void writeNotification(const Protocol::ChangeNotification &notification);
+ void writeCommand(qint64 tag, const Protocol::Command &cmd);
+
+ QLocalSocket *mSocket;
+ QByteArray mSubscriber;
+ QSet<Entity::Id> mMonitoredCollections;
+ QSet<Entity::Id> mMonitoredItems;
+ QSet<Entity::Id> mMonitoredTags;
+ // TODO: Make this a bitflag
+ QSet<Protocol::ChangeNotification::Type> mMonitoredTypes;
+ QSet<QString> mMonitoredMimeTypes;
+ QSet<QByteArray> mMonitoredResources;
+ QSet<QByteArray> mIgnoredSessions;
+ QByteArray mSession;
+ bool mAllMonitored;
+ bool mExclusive;
+
+ static QMimeDatabase sMimeDatabase;
+};
+
+} // namespace Server
+} // namespace Akonadi
+
+
+#endif
diff --git a/src/server/storage/datastore.cpp b/src/server/storage/datastore.cpp
index e7a4770..80384d4 100644
--- a/src/server/storage/datastore.cpp
+++ b/src/server/storage/datastore.cpp
@@ -20,6 +20,7 @@
#include "datastore.h"
+#include "akonadi.h"
#include "dbconfig.h"
#include "dbinitializer.h"
#include "dbupdater.h"
@@ -204,7 +205,10 @@ NotificationCollector *DataStore::notificationCollector()
{
if (mNotificationCollector == 0) {
mNotificationCollector = new NotificationCollector(this);
- NotificationManager::self()->connectNotificationCollector(notificationCollector());
+ NotificationManager *notificationManager = AkonadiServer::instance()->notificationManager();
+ if (notificationManager) {
+ notificationManager->connectNotificationCollector(notificationCollector());
+ }
}
return mNotificationCollector;