summaryrefslogtreecommitdiff
path: root/plugins-alternative/CopyEngine/Rsync/WriteThread.cpp
diff options
context:
space:
mode:
authorThomas Preud'homme <robotux@celest.fr>2013-03-21 11:01:59 +0100
committerThomas Preud'homme <robotux@celest.fr>2013-03-21 11:01:59 +0100
commite297dbd8052ef4e66f069e2dd1865ae7fa8af28e (patch)
tree342fea0a2f6f33b8b62dad2d1729f8209da1a1ba /plugins-alternative/CopyEngine/Rsync/WriteThread.cpp
parent8f9f382e1c97cab2e72e97495650c73ac4b97314 (diff)
Imported Upstream version 0.3.1.0
Diffstat (limited to 'plugins-alternative/CopyEngine/Rsync/WriteThread.cpp')
-rw-r--r--plugins-alternative/CopyEngine/Rsync/WriteThread.cpp508
1 files changed, 508 insertions, 0 deletions
diff --git a/plugins-alternative/CopyEngine/Rsync/WriteThread.cpp b/plugins-alternative/CopyEngine/Rsync/WriteThread.cpp
new file mode 100644
index 0000000..448e8dc
--- /dev/null
+++ b/plugins-alternative/CopyEngine/Rsync/WriteThread.cpp
@@ -0,0 +1,508 @@
+#include "WriteThread.h"
+
+#include <QDir>
+
+WriteThread::WriteThread()
+{
+ stopIt=false;
+ isOpen.release();
+ start();
+ moveToThread(this);
+ setObjectName("write");
+ this->mkpathTransfer = mkpathTransfer;
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ CurentCopiedSize=0;
+ buffer=false;
+ putInPause=false;
+ needRemoveTheFile=false;
+ blockSize=1024*1024;
+}
+
+WriteThread::~WriteThread()
+{
+ stopIt=true;
+ needRemoveTheFile=true;
+ freeBlock.release();
+ // useless because stopIt will close all thread, but if thread not runing run it
+ //endIsDetected();
+ emit internalStartClose();
+ isOpen.acquire();
+ disconnect(this);
+ quit();
+ wait();
+}
+
+void WriteThread::run()
+{
+ connect(this,SIGNAL(internalStartOpen()), this,SLOT(internalOpen()), Qt::QueuedConnection);
+ connect(this,SIGNAL(internalStartReopen()), this,SLOT(internalReopen()), Qt::QueuedConnection);
+ connect(this,SIGNAL(internalStartWrite()), this,SLOT(internalWrite()), Qt::QueuedConnection);
+ connect(this,SIGNAL(internalStartClose()), this,SLOT(internalClose()), Qt::QueuedConnection);
+ connect(this,SIGNAL(internalStartEndOfFile()), this,SLOT(internalEndOfFile()), Qt::QueuedConnection);
+ connect(this,SIGNAL(internalStartFlushAndSeekToZero()), this,SLOT(internalFlushAndSeekToZero()),Qt::QueuedConnection);
+ connect(this,SIGNAL(internalStartChecksum()), this,SLOT(checkSum()), Qt::QueuedConnection);
+ exec();
+}
+
+bool WriteThread::internalOpen()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] internalOpen destination: "+name);
+ if(stopIt)
+ return false;
+ if(file.isOpen())
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] already open! destination: "+file.fileName());
+ return false;
+ }
+ //set to LISTBLOCKSIZE
+ while(freeBlock.available()<ULTRACOPIER_PLUGIN_MAXBUFFERBLOCK)
+ freeBlock.release();
+ if(freeBlock.available()>ULTRACOPIER_PLUGIN_MAXBUFFERBLOCK)
+ freeBlock.acquire(freeBlock.available()-ULTRACOPIER_PLUGIN_MAXBUFFERBLOCK);
+ stopIt=false;
+ CurentCopiedSize=0;
+ endDetected=false;
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=InodeOperation;
+ #endif
+ file.setFileName(name);
+ //mkpath check if exists and return true if already exists
+ QFileInfo destinationInfo(file);
+ QDir destinationFolder;
+ {
+ mkpathTransfer->acquire();
+ if(!destinationFolder.exists(destinationInfo.absolutePath()))
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] "+QString("Try create the path: %1")
+ .arg(destinationInfo.absolutePath()));
+ if(!destinationFolder.mkpath(destinationInfo.absolutePath()))
+ {
+ if(!destinationFolder.exists(destinationInfo.absolutePath()))
+ {
+ /// \todo do real folder error here
+ errorString_internal="mkpath error on destination";
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("Unable create the folder: %1, error: %2")
+ .arg(destinationInfo.absolutePath())
+ .arg(errorString_internal));
+ emit error();
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ return false;
+ }
+ }
+ }
+ mkpathTransfer->release();
+ }
+ if(stopIt)
+ return false;
+ //try open it
+ QIODevice::OpenMode flags=QIODevice::ReadWrite;
+ if(!buffer)
+ flags|=QIODevice::Unbuffered;
+ if(file.open(flags))
+ {
+ if(stopIt)
+ return false;
+ file.seek(0);
+ if(stopIt)
+ return false;
+ file.resize(startSize);
+ if(stopIt)
+ return false;
+ emit opened();
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ isOpen.acquire();
+ needRemoveTheFile=false;
+ return true;
+ }
+ else
+ {
+ if(stopIt)
+ return false;
+ errorString_internal=file.errorString();
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("Unable to open: %1, error: %2").arg(name).arg(errorString_internal));
+ emit error();
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ return false;
+ }
+}
+
+void WriteThread::open(const QString &name,const quint64 &startSize,const bool &buffer)
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] open destination: "+name);
+ if(stopIt)
+ return;
+ fakeMode=false;
+ this->name=name;
+ this->startSize=startSize;
+ this->buffer=buffer;
+ endDetected=false;
+ emit internalStartOpen();
+}
+
+void WriteThread::endIsDetected()
+{
+ if(endDetected)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] double event dropped");
+ return;
+ }
+ endDetected=true;
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ emit internalStartEndOfFile();
+}
+
+QString WriteThread::errorString()
+{
+ return errorString_internal;
+}
+
+bool WriteThread::write(const QByteArray &data)
+{
+ if(stopIt)
+ return false;
+ freeBlock.acquire();
+ if(stopIt)
+ return false;
+ {
+ QMutexLocker lock_mutex(&accessList);
+ theBlockList.append(data);
+ }
+ emit internalStartWrite();
+ return true;
+}
+
+void WriteThread::stop()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] stop()");
+ needRemoveTheFile=true;
+ stopIt=true;
+ if(isOpen.available()>0)
+ return;
+ freeBlock.release();
+ // useless because stopIt will close all thread, but if thread not runing run it
+ endIsDetected();
+ //for the stop for skip: void TransferThread::skip()
+ emit internalStartClose();
+}
+
+void WriteThread::flushBuffer()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ freeBlock.release();
+ freeBlock.acquire();
+ {
+ QMutexLocker lock_mutex(&accessList);
+ theBlockList.clear();
+ }
+}
+
+void WriteThread::internalEndOfFile()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] writeIsStopped");
+ emit writeIsStopped();
+}
+
+void WriteThread::internalWrite()
+{
+ if(stopIt)
+ return;
+ //read one block
+ if(theBlockList.size()<=0)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] End detected of the file");
+ return;
+ }
+ else
+ {
+ QMutexLocker lock_mutex(&accessList);
+ blockArray=theBlockList.first();
+ theBlockList.removeFirst();
+ }
+ //write one block
+ freeBlock.release();
+
+ if(stopIt)
+ return;
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Write;
+ #endif
+ bytesWriten=file.write(blockArray);
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ //mutex for stream this data
+ if(CurentCopiedSize==0)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] emit writeIsStarted()");
+ emit writeIsStarted();
+ }
+ CurentCopiedSize+=bytesWriten;
+ if(stopIt)
+ return;
+ if(file.error()!=QFile::NoError)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("Error in writing: %1 (%2)").arg(file.errorString()).arg(file.error()));
+ errorString_internal=QString("Error in writing: %1 (%2)").arg(file.errorString()).arg(file.error());
+ stopIt=true;
+ emit error();
+ return;
+ }
+ if(bytesWriten!=blockArray.size())
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("Error in writing, bytesWriten: %1, blockArray.size(): %2").arg(bytesWriten).arg(blockArray.size()));
+ errorString_internal=QString("Error in writing, bytesWriten: %1, blockArray.size(): %2").arg(bytesWriten).arg(blockArray.size());
+ stopIt=true;
+ emit error();
+ return;
+ }
+ lastGoodPosition+=bytesWriten;
+}
+
+void WriteThread::postOperation()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ emit internalStartClose();
+}
+
+void WriteThread::internalClose(bool emitSignal)
+{
+ /// \note never send signal here, because it's called by the destructor
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Close;
+ #endif
+ if(!fakeMode && file.isOpen())
+ {
+ if(!needRemoveTheFile)
+ {
+ if(startSize!=CurentCopiedSize)
+ file.resize(CurentCopiedSize);
+ }
+ file.close();
+ if(needRemoveTheFile)
+ {
+ if(file.remove())
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] unable to remove the destination file");
+ }
+ needRemoveTheFile=false;
+ }
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ if(emitSignal)
+ emit closed();
+
+ /// \note always the last of this function
+ if(!fakeMode)
+ isOpen.release();
+}
+
+void WriteThread::internalReopen()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ internalClose(false);
+ flushBuffer();
+ stopIt=false;
+ CurentCopiedSize=0;
+ if(internalOpen())
+ emit reopened();
+}
+
+void WriteThread::reopen()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ stopIt=true;
+ endDetected=false;
+ emit internalStartReopen();
+}
+
+#ifdef ULTRACOPIER_PLUGIN_DEBUG
+//to set the id
+void WriteThread::setId(int id)
+{
+ this->id=id;
+}
+#endif
+
+/// \brief do the fake open
+void WriteThread::fakeOpen()
+{
+ fakeMode=true;
+ emit opened();
+}
+
+/// \brief do the fake writeIsStarted
+void WriteThread::fakeWriteIsStarted()
+{
+ emit writeIsStarted();
+}
+
+/// \brief do the fake writeIsStopped
+void WriteThread::fakeWriteIsStopped()
+{
+ emit writeIsStopped();
+}
+
+/// do the checksum
+void WriteThread::startCheckSum()
+{
+ emit internalStartChecksum();
+}
+
+/** \brief set block size
+\param block the new block size in KB
+\return Return true if succes */
+bool WriteThread::setBlockSize(const int blockSize)
+{
+ if(blockSize<1 || blockSize>16384)
+ {
+ this->blockSize=blockSize*1024;
+ //set the new max speed because the timer have changed
+ setMaxSpeed(maxSpeed);
+ return true;
+ }
+ else
+ return false;
+}
+
+/*! \brief Set the max speed
+\param tempMaxSpeed Set the max speed in KB/s, 0 for no limit */
+int WriteThread::setMaxSpeed(const int maxSpeed)
+{
+ if(this->maxSpeed==0 && maxSpeed==0 && waitNewClockForSpeed.available()>0)
+ waitNewClockForSpeed.tryAcquire(waitNewClockForSpeed.available());
+ this->maxSpeed=maxSpeed;
+ if(this->maxSpeed>0)
+ {
+ int NewInterval,newMultiForBigSpeed=0;
+ do
+ {
+ newMultiForBigSpeed++;
+ NewInterval=(blockSize*newMultiForBigSpeed)/(this->maxSpeed);
+ }
+ while (NewInterval<ULTRACOPIER_PLUGIN_MINTIMERINTERVAL);
+ if(NewInterval>ULTRACOPIER_PLUGIN_MAXTIMERINTERVAL)
+ {
+ NewInterval=ULTRACOPIER_PLUGIN_MAXTIMERINTERVAL;
+ newMultiForBigSpeed=1;
+ blockSize=this->maxSpeed*NewInterval;
+ }
+ MultiForBigSpeed=newMultiForBigSpeed;
+ return NewInterval;
+ }
+ else
+ {
+ waitNewClockForSpeed.release();
+ return 0;
+ }
+}
+
+/// \brief For give timer every X ms
+void WriteThread::timeOfTheBlockCopyFinished()
+{
+ if(waitNewClockForSpeed.available()<ULTRACOPIER_PLUGIN_NUMSEMSPEEDMANAGEMENT)
+ waitNewClockForSpeed.release();
+ //why not just use waitNewClockForSpeed.release() ?
+}
+
+void WriteThread::flushAndSeekToZero()
+{
+ stopIt=true;
+ emit internalStartFlushAndSeekToZero();
+}
+
+
+void WriteThread::checkSum()
+{
+ //QByteArray blockArray;
+ QCryptographicHash hash(QCryptographicHash::Sha1);
+ endDetected=false;
+ lastGoodPosition=0;
+ file.seek(0);
+ int sizeReaden=0;
+ do
+ {
+ //read one block
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Read;
+ #endif
+ blockArray=file.read(blockSize);
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+
+ if(file.error()!=QFile::NoError)
+ {
+ errorString_internal=tr("Unable to read the source file: ")+file.errorString()+" ("+QString::number(file.error())+")";
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("file.error()!=QFile::NoError: %1, error: %2").arg(QString::number(file.error())).arg(errorString_internal));
+ emit error();
+ return;
+ }
+ sizeReaden=blockArray.size();
+ if(sizeReaden>0)
+ {
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Checksum;
+ #endif
+ hash.addData(blockArray);
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+
+ if(stopIt)
+ break;
+
+ lastGoodPosition+=blockArray.size();
+
+ //wait for limitation speed if stop not query
+ if(maxSpeed>0)
+ {
+ numberOfBlockCopied++;
+ if(numberOfBlockCopied>=MultiForBigSpeed)
+ {
+ numberOfBlockCopied=0;
+ waitNewClockForSpeed.acquire();
+ if(stopIt)
+ break;
+ }
+ }
+ }
+ }
+ while(sizeReaden>0 && !stopIt);
+ if(lastGoodPosition>file.size())
+ {
+ errorString_internal=tr("File truncated during the read, possible data change");
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("Source truncated during the read: %1 (%2)").arg(file.errorString()).arg(QString::number(file.error())));
+ emit error();
+ return;
+ }
+ if(stopIt)
+ {
+/* if(putInPause)
+ emit isInPause();*/
+ stopIt=false;
+ return;
+ }
+ emit checksumFinish(hash.result());
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] stop the read");
+}
+
+void WriteThread::internalFlushAndSeekToZero()
+{
+ flushBuffer();
+ file.seek(0);
+ stopIt=false;
+ emit flushedAndSeekedToZero();
+}
+
+void WriteThread::setMkpathTransfer(QSemaphore *mkpathTransfer)
+{
+ this->mkpathTransfer=mkpathTransfer;
+}