24 #include "dbusconnectionpool.h"
26 #include "imapparser_p.h"
28 #include "session_p.h"
33 #include <QtCore/QEventLoop>
34 #include <QtCore/QTimer>
35 #include <QtCore/QTextStream>
36 #include <QtNetwork/QHostAddress>
37 #include <QtNetwork/QTcpSocket>
38 #include <QtDBus/QDBusInterface>
39 #include <QtDBus/QDBusConnectionInterface>
41 using namespace Akonadi;
43 static QDBusAbstractInterface *s_jobtracker = 0;
46 void JobPrivate::handleResponse(
const QByteArray & tag,
const QByteArray & data )
50 if ( mCurrentSubJob ) {
51 mCurrentSubJob->d_ptr->handleResponse( tag, data );
56 if ( data.startsWith(
"NO " ) || data.startsWith(
"BAD " ) ) {
57 QString msg = QString::fromUtf8( data );
59 msg.remove( 0, msg.startsWith( QLatin1String(
"NO " ) ) ? 3 : 4 );
61 if ( msg.endsWith( QLatin1String(
"\r\n" ) ) )
65 q->setErrorText( msg );
68 }
else if ( data.startsWith(
"OK" ) ) {
75 QTimer::singleShot( 0, q, SLOT(delayedEmitResult()) );
80 q->doHandleResponse( tag, data );
83 void JobPrivate::init( QObject *parent )
87 mParentJob =
dynamic_cast<Job*
>( parent );
88 mSession =
dynamic_cast<Session*
>( parent );
94 mSession = mParentJob->d_ptr->mSession;
98 mSession->d->addJob( q );
103 if ( !s_jobtracker ) {
106 static QTime s_lastTime;
107 if ( s_lastTime.isNull() || s_lastTime.elapsed() > 3000 ) {
108 if ( s_lastTime.isNull() )
110 if ( DBusConnectionPool::threadConnection().interface()->isServiceRegistered(QLatin1String(
"org.kde.akonadiconsole" ) ) ) {
111 s_jobtracker =
new QDBusInterface( QLatin1String(
"org.kde.akonadiconsole" ),
112 QLatin1String(
"/jobtracker" ),
113 QLatin1String(
"org.freedesktop.Akonadi.JobTracker" ),
114 DBusConnectionPool::threadConnection(), 0 );
116 s_lastTime.restart();
122 QMetaObject::invokeMethod( q,
"signalCreationToJobTracker", Qt::QueuedConnection );
125 void JobPrivate::signalCreationToJobTracker()
128 if ( s_jobtracker ) {
132 QList<QVariant> argumentList;
133 argumentList << QLatin1String( mSession->
sessionId() )
134 << QString::number(reinterpret_cast<quintptr>( q ), 16)
135 << ( mParentJob ? QString::number( reinterpret_cast<quintptr>( mParentJob ), 16) : QString() )
136 << QString::fromLatin1( q->metaObject()->className() );
137 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String(
"jobCreated" ), argumentList);
141 void JobPrivate::delayedEmitResult()
147 void JobPrivate::startQueued()
152 emit q->aboutToStart( q );
154 QTimer::singleShot( 0, q, SLOT(startNext()) );
157 if ( s_jobtracker ) {
158 QList<QVariant> argumentList;
159 argumentList << QString::number(reinterpret_cast<quintptr>( q ), 16);
160 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String(
"jobStarted" ), argumentList);
164 void JobPrivate::lostConnection()
168 if ( mCurrentSubJob ) {
169 mCurrentSubJob->d_ptr->lostConnection();
176 void JobPrivate::slotSubJobAboutToStart(
Job * job )
178 Q_ASSERT( mCurrentSubJob == 0 );
179 mCurrentSubJob = job;
182 void JobPrivate::startNext()
186 if ( mStarted && !mCurrentSubJob && q->hasSubjobs() ) {
189 job->d_ptr->startQueued();
196 mTag = mParentJob->d_ptr->newTag();
198 mTag = QByteArray::number( mSession->d->nextTag() );
209 Q_ASSERT_X( !mWriteFinished,
"Job::writeData()",
"Calling writeData() after emitting writeFinished()" );
210 mSession->d->writeData( data );
215 mSession->d->itemRevisionChanged( itemId, oldRevision, newRevision );
221 foreach ( KJob *j, q->subjobs() ) {
224 job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
232 Q_UNUSED( oldRevision );
233 Q_UNUSED( newRevision );
236 int JobPrivate::protocolVersion()
const
238 return mSession->d->protocolVersion;
244 : KCompositeJob( parent ),
247 d_ptr->init( parent );
251 : KCompositeJob( parent ),
254 d_ptr->init( parent );
262 if ( s_jobtracker ) {
263 QList<QVariant> argumentList;
264 argumentList << QString::number(reinterpret_cast<quintptr>( this ), 16)
266 s_jobtracker->callWithArgumentList(QDBus::NoBlock, QLatin1String(
"jobEnded" ), argumentList);
279 d->mSession->d->forceReconnect();
292 str = i18n(
"Cannot connect to the Akonadi service." );
295 str = i18n(
"The protocol version of the Akonadi server is incompatible. Make sure you have a compatible version installed." );
298 str = i18n(
"User canceled operation." );
303 str = i18n(
"Unknown error." );
306 if ( !errorText().isEmpty() ) {
307 str += QString::fromLatin1(
" (%1)" ).arg( errorText() );
314 bool rv = KCompositeJob::addSubjob( job );
317 QTimer::singleShot( 0,
this, SLOT(startNext()) );
324 bool rv = KCompositeJob::removeSubjob( job );
325 if ( job == d_ptr->mCurrentSubJob ) {
326 d_ptr->mCurrentSubJob = 0;
327 QTimer::singleShot( 0,
this, SLOT(startNext()) );
334 kDebug() <<
"Unhandled response: " << tag << data;
337 void Job::slotResult(KJob * job)
339 if ( d_ptr->mCurrentSubJob == job ) {
341 d_ptr->mCurrentSubJob = 0;
342 KCompositeJob::slotResult( job );
344 QTimer::singleShot( 0,
this, SLOT(startNext()) );
349 KCompositeJob::removeSubjob( job );
355 d_ptr->mWriteFinished =
true;
359 #include "moc_job.cpp"
virtual void doUpdateItemRevision(Akonadi::Item::Id, int oldRevision, int newRevision)
Overwrite this if your job does operations with conflict detection and update the item revisions if y...
The server protocol version is too old or too new.
Base class for all actions in the Akonadi storage.
void updateItemRevision(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Propagate item revision changes to this job and its sub-jobs.
void itemRevisionChanged(Akonadi::Item::Id itemId, int oldRevision, int newRevision)
Notify following jobs about item revision changes.
The connection to the Akonadi server failed.
static Session * defaultSession()
Returns the default session for this thread.
void start()
Jobs are started automatically once entering the event loop again, no need to explicitly call this...
void writeFinished(Akonadi::Job *job)
This signal is emitted if the job has finished all write operations, ie.
QByteArray sessionId() const
Returns the session identifier.
QByteArray tag() const
Return the tag used for the request.
virtual bool addSubjob(KJob *job)
Adds the given job as a subjob to this job.
virtual bool doKill()
Kills the execution of the job.
void emitWriteFinished()
Call this method to indicate that this job will not call writeData() again.
A communication session with the Akonadi storage.
QByteArray newTag()
Returns a new unique command tag for communication with the backend.
virtual void doHandleResponse(const QByteArray &tag, const QByteArray &data)
This method should be reimplemented in the concrete jobs in case you want to handle incoming data...
virtual ~Job()
Destroys the job.
void writeData(const QByteArray &data)
Sends raw data to the backend.
virtual bool removeSubjob(KJob *job)
Removes the given subjob of this job.
void aboutToStart(Akonadi::Job *job)
This signal is emitted directly before the job will be started.
The user canceld this job.
virtual QString errorString() const
Returns the error string, if there has been an error, an empty string otherwise.
Job(QObject *parent=0)
Creates a new job.