summaryrefslogtreecommitdiff
path: root/plugins-alternative/CopyEngine/Rsync/ReadThread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'plugins-alternative/CopyEngine/Rsync/ReadThread.cpp')
-rw-r--r--plugins-alternative/CopyEngine/Rsync/ReadThread.cpp597
1 files changed, 597 insertions, 0 deletions
diff --git a/plugins-alternative/CopyEngine/Rsync/ReadThread.cpp b/plugins-alternative/CopyEngine/Rsync/ReadThread.cpp
new file mode 100644
index 0000000..3c7bfc1
--- /dev/null
+++ b/plugins-alternative/CopyEngine/Rsync/ReadThread.cpp
@@ -0,0 +1,597 @@
+#include "ReadThread.h"
+
+ReadThread::ReadThread()
+{
+ start();
+ moveToThread(this);
+ stopIt=false;
+ putInPause=false;
+ blockSize=1024*1024;
+ setObjectName("read");
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ isInReadLoop=false;
+ tryStartRead=false;
+ isOpen.release();
+}
+
+ReadThread::~ReadThread()
+{
+ stopIt=true;
+ disconnect(this);
+ waitNewClockForSpeed.release();
+ isOpen.acquire();
+ exit();
+ wait();
+}
+
+void ReadThread::run()
+{
+ connect(this,SIGNAL(internalStartOpen()), this,SLOT(internalOpen()), Qt::QueuedConnection);
+ connect(this,SIGNAL(internalStartReopen()), this,SLOT(internalReopen()), Qt::QueuedConnection);
+ connect(this,SIGNAL(internalStartRead()), this,SLOT(internalRead()), Qt::QueuedConnection);
+ connect(this,SIGNAL(internalStartClose()), this,SLOT(internalClose()), Qt::QueuedConnection);
+ connect(this,SIGNAL(checkIfIsWait()), this,SLOT(isInWait()), Qt::QueuedConnection);
+ connect(this,SIGNAL(internalStartChecksum()), this,SLOT(checkSum()), Qt::QueuedConnection);
+ exec();
+}
+
+void ReadThread::open(const QString &name,const CopyMode &mode)
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] open source: "+name);
+ if(file.isOpen())
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Critical,"["+QString::number(id)+"] previous file is already open: "+file.fileName()+", try open: "+this->name);
+ return;
+ }
+ if(isInReadLoop)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Critical,"["+QString::number(id)+"] previous file is already readding: "+file.fileName()+", try open: "+this->name);
+ return;
+ }
+ if(tryStartRead)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Critical,"["+QString::number(id)+"] previous file is already try read: "+file.fileName()+", try open: "+this->name);
+ return;
+ }
+ fakeMode=false;
+ this->name=name;
+ this->mode=mode;
+ emit internalStartOpen();
+}
+
+QString ReadThread::errorString()
+{
+ return errorString_internal;
+}
+
+void ReadThread::stop()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] stop()");
+ stopIt=true;
+ if(isOpen.available()>0)
+ return;
+ emit internalStartClose();
+}
+
+bool ReadThread::pause()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] try put read thread in pause");
+ putInPause=true;
+ stopIt=true;
+ return isInReadLoop;
+}
+
+void ReadThread::resume()
+{
+ if(putInPause)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ putInPause=false;
+ stopIt=false;
+ }
+ else
+ return;
+ if(tryStartRead)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] already in try start");
+ return;
+ }
+ tryStartRead=true;
+ if(isInReadLoop)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] is in read loop");
+ return;
+ }
+ if(!file.isOpen())
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] file is not open");
+ return;
+ }
+ emit internalStartRead();
+}
+
+bool ReadThread::seek(qint64 position)
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start with: "+QString::number(position));
+ if(position>file.size())
+ return false;
+ return file.seek(position);
+}
+
+qint64 ReadThread::size()
+{
+ return file.size();
+}
+
+void ReadThread::postOperation()
+{
+ emit internalStartClose();
+}
+
+void ReadThread::checkSum()
+{
+ QByteArray blockArray;
+ QCryptographicHash hash(QCryptographicHash::Sha1);
+ isInReadLoop=true;
+ lastGoodPosition=0;
+ seek(0);
+ int sizeReaden=0;
+ do
+ {
+ //read one block
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Read;
+ #endif
+ blockArray=file.read(blockSize);
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+
+ if(file.error()!=QFile::NoError)
+ {
+ errorString_internal=tr("Unable to read the source file: ")+file.errorString()+" ("+QString::number(file.error())+")";
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("file.error()!=QFile::NoError: %1, error: %2").arg(QString::number(file.error())).arg(errorString_internal));
+ emit error();
+ isInReadLoop=false;
+ return;
+ }
+ sizeReaden=blockArray.size();
+ if(sizeReaden>0)
+ {
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Checksum;
+ #endif
+ hash.addData(blockArray);
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+
+ if(stopIt)
+ break;
+
+ lastGoodPosition+=blockArray.size();
+
+ //wait for limitation speed if stop not query
+ if(maxSpeed>0)
+ {
+ numberOfBlockCopied++;
+ if(numberOfBlockCopied>=MultiForBigSpeed)
+ {
+ numberOfBlockCopied=0;
+ waitNewClockForSpeed.acquire();
+ if(stopIt)
+ break;
+ }
+ }
+ }
+ }
+ while(sizeReaden>0 && !stopIt);
+ if(lastGoodPosition>file.size())
+ {
+ errorString_internal=tr("File truncated during the read, possible data change");
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("Source truncated during the read: %1 (%2)").arg(file.errorString()).arg(QString::number(file.error())));
+ emit error();
+ isInReadLoop=false;
+ return;
+ }
+ isInReadLoop=false;
+ if(stopIt)
+ {
+ if(putInPause)
+ emit isInPause();
+ stopIt=false;
+ return;
+ }
+ emit checksumFinish(hash.result());
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] stop the read");
+}
+
+bool ReadThread::internalOpen(bool resetLastGoodPosition)
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] internalOpen source: "+name);
+ stopIt=false;
+ putInPause=false;
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=InodeOperation;
+ #endif
+ if(file.isOpen())
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] already open! source: "+name);
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ return false;
+ }
+ file.setFileName(name);
+ QIODevice::OpenMode openMode=QIODevice::ReadOnly;
+ if(mode==Move)
+ openMode=QIODevice::ReadWrite;
+ seekToZero=false;
+ if(file.open(openMode))
+ {
+ size_at_open=file.size();
+ mtime_at_open=QFileInfo(file).lastModified();
+ putInPause=false;
+ if(resetLastGoodPosition)
+ {
+ lastGoodPosition=0;
+ seek(0);
+ emit opened();
+ }
+ else if(!seek(lastGoodPosition))
+ {
+ errorString_internal=file.errorString();
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("Unable to seek after open: %1, error: %2").arg(name).arg(errorString_internal));
+ emit error();
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ return false;
+ }
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ isOpen.acquire();
+ return true;
+ }
+ else
+ {
+ errorString_internal=file.errorString();
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("Unable to open: %1, error: %2").arg(name).arg(errorString_internal));
+ emit error();
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ return false;
+ }
+}
+
+void ReadThread::internalRead()
+{
+ isInReadLoop=true;
+ tryStartRead=false;
+ if(stopIt)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] stopIt == true, then quit");
+ internalClose();
+ return;
+ }
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=InodeOperation;
+ #endif
+ int sizeReaden=0;
+ if(!file.isOpen())
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] is not open!");
+ return;
+ }
+ QByteArray blockArray;
+ //numberOfBlockCopied = 0;
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start the copy");
+ emit readIsStarted();
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+ if(stopIt)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] stopIt == true, then quit");
+ internalClose();
+ return;
+ }
+ do
+ {
+ //read one block
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Read;
+ #endif
+ blockArray=file.read(blockSize);
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+
+ if(file.error()!=QFile::NoError)
+ {
+ errorString_internal=tr("Unable to read the source file: ")+file.errorString()+" ("+QString::number(file.error())+")";
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("file.error()!=QFile::NoError: %1, error: %2").arg(QString::number(file.error())).arg(errorString_internal));
+ emit error();
+ isInReadLoop=false;
+ return;
+ }
+ sizeReaden=blockArray.size();
+ if(sizeReaden>0)
+ {
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=WaitWritePipe;
+ #endif
+ if(!writeThread->write(blockArray))
+ {
+ if(!stopIt)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] stopped because the write is stopped: "+QString::number(lastGoodPosition));
+ stopIt=true;
+ }
+ }
+
+ #ifdef ULTRACOPIER_PLUGIN_DEBUG
+ stat=Idle;
+ #endif
+
+ if(stopIt)
+ break;
+
+ lastGoodPosition+=blockArray.size();
+
+ //wait for limitation speed if stop not query
+ if(maxSpeed>0)
+ {
+ numberOfBlockCopied++;
+ if(numberOfBlockCopied>=MultiForBigSpeed)
+ {
+ numberOfBlockCopied=0;
+ waitNewClockForSpeed.acquire();
+ if(stopIt)
+ break;
+ }
+ }
+ }
+ /*
+ if(lastGoodPosition>16*1024)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("Test error in reading: %1 (%2)").arg(file.errorString()).arg(file.error()));
+ errorString_internal=QString("Test error in reading: %1 (%2)").arg(file.errorString()).arg(file.error());
+ emit error();
+ isInReadLoop=false;
+ return;
+ }
+ */
+ }
+ while(sizeReaden>0 && !stopIt);
+ if(lastGoodPosition>file.size())
+ {
+ errorString_internal=tr("File truncated during the read, possible data change");
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] "+QString("Source truncated during the read: %1 (%2)").arg(file.errorString()).arg(QString::number(file.error())));
+ emit error();
+ isInReadLoop=false;
+ return;
+ }
+ isInReadLoop=false;
+ if(stopIt)
+ {
+ if(putInPause)
+ emit isInPause();
+ stopIt=false;
+ return;
+ }
+ emit readIsStopped();//will product by signal connection writeThread->endIsDetected();
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] stop the read");
+}
+
+void ReadThread::startRead()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ if(tryStartRead)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] already in try start");
+ return;
+ }
+ if(isInReadLoop)
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] double event dropped");
+ else
+ {
+ tryStartRead=true;
+ emit internalStartRead();
+ }
+}
+
+void ReadThread::internalClose(bool callByTheDestructor)
+{
+ /// \note never send signal here, because it's called by the destructor
+ //ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ if(!fakeMode)
+ file.close();
+ if(!callByTheDestructor)
+ emit closed();
+
+ /// \note always the last of this function
+ if(!fakeMode)
+ isOpen.release();
+}
+
+/** \brief set block size
+\param block the new block size in KB
+\return Return true if succes */
+bool ReadThread::setBlockSize(const int blockSize)
+{
+ if(blockSize<1 || blockSize>16384)
+ {
+ this->blockSize=blockSize*1024;
+ //set the new max speed because the timer have changed
+ setMaxSpeed(maxSpeed);
+ return true;
+ }
+ else
+ return false;
+}
+
+/*! \brief Set the max speed
+\param tempMaxSpeed Set the max speed in KB/s, 0 for no limit */
+int ReadThread::setMaxSpeed(const int maxSpeed)
+{
+ if(this->maxSpeed==0 && maxSpeed==0 && waitNewClockForSpeed.available()>0)
+ waitNewClockForSpeed.tryAcquire(waitNewClockForSpeed.available());
+ this->maxSpeed=maxSpeed;
+ if(this->maxSpeed>0)
+ {
+ int NewInterval,newMultiForBigSpeed=0;
+ do
+ {
+ newMultiForBigSpeed++;
+ NewInterval=(blockSize*newMultiForBigSpeed)/(this->maxSpeed);
+ }
+ while (NewInterval<ULTRACOPIER_PLUGIN_MINTIMERINTERVAL);
+ if(NewInterval>ULTRACOPIER_PLUGIN_MAXTIMERINTERVAL)
+ {
+ NewInterval=ULTRACOPIER_PLUGIN_MAXTIMERINTERVAL;
+ newMultiForBigSpeed=1;
+ blockSize=this->maxSpeed*NewInterval;
+ }
+ MultiForBigSpeed=newMultiForBigSpeed;
+ return NewInterval;
+ }
+ else
+ {
+ waitNewClockForSpeed.release();
+ return 0;
+ }
+}
+
+/// \brief For give timer every X ms
+void ReadThread::timeOfTheBlockCopyFinished()
+{
+ if(waitNewClockForSpeed.available()<ULTRACOPIER_PLUGIN_NUMSEMSPEEDMANAGEMENT)
+ waitNewClockForSpeed.release();
+ //why not just use waitNewClockForSpeed.release() ?
+}
+
+/// \brief do the fake open
+void ReadThread::fakeOpen()
+{
+ fakeMode=true;
+ emit opened();
+}
+
+/// \brief do the fake writeIsStarted
+void ReadThread::fakeReadIsStarted()
+{
+ emit readIsStarted();
+}
+
+/// \brief do the fake writeIsStopped
+void ReadThread::fakeReadIsStopped()
+{
+ emit readIsStopped();
+}
+
+/// do the checksum
+void ReadThread::startCheckSum()
+{
+ emit internalStartChecksum();
+}
+
+qint64 ReadThread::getLastGoodPosition()
+{
+ /*if(lastGoodPosition>file.size())
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Critical,"["+QString::number(id)+"] Bug, the lastGoodPosition is greater than the file size!");
+ return file.size();
+ }
+ else*/
+ return lastGoodPosition;
+}
+
+//reopen after an error
+void ReadThread::reopen()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ if(isInReadLoop)
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] try reopen where read is not finish");
+ return;
+ }
+ stopIt=true;
+ emit internalStartReopen();
+}
+
+bool ReadThread::internalReopen()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ stopIt=false;
+ file.close();
+ if(size_at_open!=file.size() && mtime_at_open!=QFileInfo(file).lastModified())
+ {
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Warning,"["+QString::number(id)+"] source file have changed since the last open, restart all");
+ //fix this function like the close function
+ if(internalOpen(true))
+ {
+ emit resumeAfterErrorByRestartAll();
+ return true;
+ }
+ else
+ return false;
+ }
+ else
+ {
+ //fix this function like the close function
+ if(internalOpen(false))
+ {
+ emit resumeAfterErrorByRestartAtTheLastPosition();
+ return true;
+ }
+ else
+ return false;
+ }
+}
+
+//set the write thread
+void ReadThread::setWriteThread(WriteThread * writeThread)
+{
+ this->writeThread=writeThread;
+}
+
+#ifdef ULTRACOPIER_PLUGIN_DEBUG
+//to set the id
+void ReadThread::setId(int id)
+{
+ this->id=id;
+}
+#endif
+
+void ReadThread::seekToZeroAndWait()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ stopIt=true;
+ seekToZero=true;
+ emit checkIfIsWait();
+}
+
+void ReadThread::isInWait()
+{
+ ULTRACOPIER_DEBUGCONSOLE(DebugLevel_Notice,"["+QString::number(id)+"] start");
+ if(seekToZero)
+ {
+ seekToZero=false;
+ if(file.isOpen())
+ seek(0);
+ else
+ internalOpen(true);
+ emit isSeekToZeroAndWait();
+ }
+}
+
+bool ReadThread::isReading()
+{
+ return isInReadLoop;
+}
+