diff options
Diffstat (limited to 'plugins/CopyEngine/Ultracopier/WriteThread.cpp')
-rw-r--r-- | plugins/CopyEngine/Ultracopier/WriteThread.cpp | 968 |
1 files changed, 968 insertions, 0 deletions
diff --git a/plugins/CopyEngine/Ultracopier/WriteThread.cpp b/plugins/CopyEngine/Ultracopier/WriteThread.cpp new file mode 100644 index 0000000..9993961 --- /dev/null +++ b/plugins/CopyEngine/Ultracopier/WriteThread.cpp @@ -0,0 +1,968 @@ +#include "WriteThread.h" + +#include <QDir> + +QMultiHash<QString,WriteThread *> WriteThread::writeFileList; +QMutex WriteThread::writeFileListMutex; + +WriteThread::WriteThread() +{ + deletePartiallyTransferredFiles = true; + lastGoodPosition = 0; + stopIt = false; + isOpen.release(); + moveToThread(this); + setObjectName(QStringLiteral("write")); + //this->mkpathTransfer = mkpathTransfer; + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat = Idle; + #endif + numberOfBlock = ULTRACOPIER_PLUGIN_DEFAULT_PARALLEL_NUMBER_OF_BLOCK; + buffer = false; + putInPause = false; + needRemoveTheFile = false; + blockSize = ULTRACOPIER_PLUGIN_DEFAULT_BLOCK_SIZE*1024; + start(); +} + +WriteThread::~WriteThread() +{ + stopIt=true; + needRemoveTheFile=true; + pauseMutex.release(); + writeFull.release(); + #ifdef ULTRACOPIER_PLUGIN_SPEED_SUPPORT + waitNewClockForSpeed.release(); + waitNewClockForSpeed2.release(); + #endif + writeFull.release(); + pauseMutex.release(); + // useless because stopIt will close all thread, but if thread not runing run it + //endIsDetected(); + emit internalStartClose(); + isOpen.acquire(); + if(!file.fileName().isEmpty()) + resumeNotStarted(); + //disconnect(this);//-> do into ~TransferThread() + quit(); + wait(); +} + +void WriteThread::run() +{ + connect(this,&WriteThread::internalStartOpen, this,&WriteThread::internalOpen, Qt::QueuedConnection); + connect(this,&WriteThread::internalStartReopen, this,&WriteThread::internalReopen, Qt::QueuedConnection); + connect(this,&WriteThread::internalStartWrite, this,&WriteThread::internalWrite, Qt::QueuedConnection); + connect(this,&WriteThread::internalStartClose, this,&WriteThread::internalCloseSlot, Qt::QueuedConnection); + connect(this,&WriteThread::internalStartEndOfFile, this,&WriteThread::internalEndOfFile, Qt::QueuedConnection); + connect(this,&WriteThread::internalStartFlushAndSeekToZero, this,&WriteThread::internalFlushAndSeekToZero, Qt::QueuedConnection); + connect(this,&WriteThread::internalStartChecksum, this,&WriteThread::checkSum, Qt::QueuedConnection); + exec(); +} + +bool WriteThread::internalOpen() +{ + //do a bug + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] internalOpen destination: "+file.fileName().toStdString()); + if(stopIt) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] close because stopIt is at true"); + emit closed(); + return false; + } + if(file.isOpen()) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] already open! destination: "+file.fileName().toStdString()); + return false; + } + if(file.fileName().isEmpty()) + { + errorString_internal=tr("Path resolution error (Empty path)").toStdString(); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("Unable to open: %1, error: %2").arg(file.fileName()).arg(QString::fromStdString(errorString_internal)).toStdString()); + emit error(); + return false; + } + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] before the mutex"); + //set to LISTBLOCKSIZE + if(sequential) + { + while(writeFull.available()<1) + writeFull.release(); + if(writeFull.available()>1) + writeFull.acquire(writeFull.available()-1); + } + else + { + while(writeFull.available()<numberOfBlock) + writeFull.release(); + if(writeFull.available()>numberOfBlock) + writeFull.acquire(writeFull.available()-numberOfBlock); + } + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] after the mutex"); + stopIt=false; + endDetected=false; + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=InodeOperation; + #endif + //mkpath check if exists and return true if already exists + QFileInfo destinationInfo(file); + QDir destinationFolder; + { + mkpathTransfer->acquire(); + if(!destinationFolder.exists(destinationInfo.absolutePath())) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] Try create the path: "+ + destinationInfo.absolutePath().toStdString()); + if(!destinationFolder.mkpath(destinationInfo.absolutePath())) + { + if(!destinationFolder.exists(destinationInfo.absolutePath())) + { + /// \todo do real folder error here + errorString_internal="mkpath error on destination"; + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("Unable create the folder: %1, error: %2") + .arg(destinationInfo.absolutePath()) + .arg(QString::fromStdString(errorString_internal)) + .toStdString()); + emit error(); + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Idle; + #endif + mkpathTransfer->release(); + return false; + } + } + } + mkpathTransfer->release(); + } + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] after the mkpath"); + if(stopIt) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] close because stopIt is at true"); + emit closed(); + return false; + } + //try open it + QIODevice::OpenMode flags=QIODevice::ReadWrite; + if(!buffer) + flags|=QIODevice::Unbuffered; + { + QMutexLocker lock_mutex(&writeFileListMutex); + if(writeFileList.count(file.fileName(),this)==0) + { + writeFileList.insert(file.fileName(),this); + if(writeFileList.count(file.fileName())>1) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] in waiting because same file is found"); + return false; + } + } + } + bool fileWasExists=file.exists(); + if(file.open(flags)) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] after the open"); + { + QMutexLocker lock_mutex(&accessList); + if(!theBlockList.isEmpty()) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] General file corruption detected"); + stopIt=true; + file.close(); + resumeNotStarted(); + file.setFileName(QStringLiteral("")); + return false; + } + } + pauseMutex.tryAcquire(pauseMutex.available()); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] after the pause mutex"); + if(stopIt) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] close because stopIt is at true"); + file.close(); + resumeNotStarted(); + file.setFileName(QStringLiteral("")); + emit closed(); + return false; + } + if(!file.seek(0)) + { + file.close(); + resumeNotStarted(); + file.setFileName(QStringLiteral("")); + errorString_internal=file.errorString().toStdString(); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("Unable to seek after open: %1, error: %2").arg(file.fileName()).arg(QString::fromStdString(errorString_internal)).toStdString()); + emit error(); + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Idle; + #endif + return false; + } + if(stopIt) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] close because stopIt is at true"); + file.close(); + resumeNotStarted(); + file.setFileName(QStringLiteral("")); + emit closed(); + return false; + } + if(!file.resize(startSize)) + { + file.close(); + resumeNotStarted(); + file.setFileName(QStringLiteral("")); + errorString_internal=file.errorString().toStdString(); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("Unable to resize to %1 after open: %2, error: %3").arg(startSize).arg(file.fileName()).arg(QString::fromStdString(errorString_internal)).toStdString()); + emit error(); + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Idle; + #endif + return false; + } + if(stopIt) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] close because stopIt is at true"); + file.close(); + resumeNotStarted(); + file.setFileName(QStringLiteral("")); + emit closed(); + return false; + } + isOpen.acquire(); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] emit opened()"); + emit opened(); + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Idle; + #endif + needRemoveTheFile=false; + postOperationRequested=false; + return true; + } + else + { + if(!fileWasExists && file.exists()) + if(!file.remove()) + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] file created but can't be removed"); + if(stopIt) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] close because stopIt is at true"); + resumeNotStarted(); + file.setFileName(QStringLiteral("")); + emit closed(); + return false; + } + errorString_internal=file.errorString().toStdString(); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("Unable to open: %1, error: %2").arg(file.fileName()).arg(QString::fromStdString(errorString_internal)).toStdString()); + emit error(); + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Idle; + #endif + return false; + } +} + +void WriteThread::open(const QFileInfo &file,const uint64_t &startSize,const bool &buffer,const int &numberOfBlock,const bool &sequential) +{ + if(!isRunning()) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] the thread not running to open destination: "+file.absoluteFilePath().toStdString()+", numberOfBlock: "+std::to_string(numberOfBlock)); + errorString_internal=tr("Internal error, please report it!").toStdString(); + emit error(); + return; + } + if(this->file.isOpen()) + { + if(file.absoluteFilePath()==this->file.fileName()) + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] Try reopen already opened same file: "+file.absoluteFilePath().toStdString()); + else + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Critical,"["+std::to_string(id)+"] previous file is already open: "+file.absoluteFilePath().toStdString()); + emit internalStartClose(); + isOpen.acquire(); + isOpen.release(); + } + if(numberOfBlock<1 || (numberOfBlock>ULTRACOPIER_PLUGIN_MAX_PARALLEL_NUMBER_OF_BLOCK && numberOfBlock>ULTRACOPIER_PLUGIN_MAX_SEQUENTIAL_NUMBER_OF_BLOCK)) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] numberOfBlock wrong, set to default"); + this->numberOfBlock=ULTRACOPIER_PLUGIN_DEFAULT_PARALLEL_NUMBER_OF_BLOCK; + } + else + this->numberOfBlock=numberOfBlock; + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] "+QStringLiteral("open destination: %1, sequential: %2").arg(file.absoluteFilePath()).arg(sequential).toStdString()); + stopIt=false; + fakeMode=false; + lastGoodPosition=0; + this->file.setFileName(file.absoluteFilePath()); + this->startSize=startSize; + this->buffer=buffer; + this->sequential=sequential; + endDetected=false; + writeFullBlocked=false; + emit internalStartOpen(); + #ifdef ULTRACOPIER_PLUGIN_SPEED_SUPPORT + numberOfBlockCopied=0; + #endif +} + +void WriteThread::endIsDetected() +{ + if(endDetected) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] double event dropped"); + return; + } + endDetected=true; + pauseMutex.release(); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] start"); + emit internalStartEndOfFile(); +} + +std::string WriteThread::errorString() const +{ + return errorString_internal; +} + +void WriteThread::stop() +{ + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] stop()"); + needRemoveTheFile=true; + stopIt=true; + if(isOpen.available()>0) + return; + writeFull.release(); + pauseMutex.release(); + pauseMutex.release(); + #ifdef ULTRACOPIER_PLUGIN_SPEED_SUPPORT + waitNewClockForSpeed.release(); + waitNewClockForSpeed2.release(); + #endif + // useless because stopIt will close all thread, but if thread not runing run it + endIsDetected(); + //for the stop for skip: void TransferThread::skip() + emit internalStartClose(); +} + +void WriteThread::flushBuffer() +{ + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] start"); + writeFull.release(); + writeFull.acquire(); + pauseMutex.release(); + { + QMutexLocker lock_mutex(&accessList); + theBlockList.clear(); + } + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] stop"); +} + +/// \brief buffer is empty +bool WriteThread::bufferIsEmpty() +{ + bool returnVal; + { + QMutexLocker lock_mutex(&accessList); + returnVal=theBlockList.isEmpty(); + } + return returnVal; +} + +void WriteThread::internalEndOfFile() +{ + if(!bufferIsEmpty()) + { + if(sequential) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] start the write"); + emit internalStartWrite(); + } + else + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] buffer is not empty!"); + } + else + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] writeIsStopped"); + emit writeIsStopped(); + } +} + +#ifdef ULTRACOPIER_PLUGIN_SPEED_SUPPORT +/*! \brief Set the max speed +\param tempMaxSpeed Set the max speed in KB/s, 0 for no limit */ +void WriteThread::setMultiForBigSpeed(const int &multiForBigSpeed) +{ + this->multiForBigSpeed=multiForBigSpeed; + waitNewClockForSpeed.release(); + waitNewClockForSpeed2.release(); +} + +/// \brief For give timer every X ms +void WriteThread::timeOfTheBlockCopyFinished() +{ + /* this is the old way to limit the speed, it product blocking + *if(waitNewClockForSpeed.available()<ULTRACOPIER_PLUGIN_NUMSEMSPEEDMANAGEMENT) + waitNewClockForSpeed.release();*/ + + //try this new way: + /* only if speed limited, else will accumulate waitNewClockForSpeed + * Disabled because: Stop call of this method when no speed limit + if(this->maxSpeed>0)*/ + if(waitNewClockForSpeed.available()<=1) + waitNewClockForSpeed.release(); + if(waitNewClockForSpeed2.available()<=1) + waitNewClockForSpeed2.release(); +} +#endif + +void WriteThread::resumeNotStarted() +{ + QMutexLocker lock_mutex(&writeFileListMutex); + #ifdef ULTRACOPIER_PLUGIN_DEBUG + if(!writeFileList.contains(file.fileName())) + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Critical,"["+std::to_string(id)+"] file: \""+file.fileName().toStdString()+"\" for similar inode is not located into the list of "+std::to_string(writeFileList.size())+" items!"); + #endif + writeFileList.remove(file.fileName(),this); + if(writeFileList.contains(file.fileName())) + { + QList<WriteThread *> writeList=writeFileList.values(file.fileName()); + if(!writeList.isEmpty()) + writeList.first()->reemitStartOpen(); + return; + } +} + +void WriteThread::pause() +{ + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] try put read thread in pause"); + pauseMutex.tryAcquire(pauseMutex.available()); + putInPause=true; + return; +} + +void WriteThread::resume() +{ + if(putInPause) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] start"); + putInPause=false; + stopIt=false; + } + else + return; + if(!file.isOpen()) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] file is not open"); + return; + } + pauseMutex.release(); +} + +void WriteThread::reemitStartOpen() +{ + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] start"); + emit internalStartOpen(); +} + +void WriteThread::postOperation() +{ + if(postOperationRequested) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Critical,"["+std::to_string(id)+"] double event dropped"); + return; + } + postOperationRequested=true; + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] start"); + emit internalStartClose(); +} + +void WriteThread::internalCloseSlot() +{ + internalClose(); +} + +void WriteThread::internalClose(bool emitSignal) +{ + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] close for file: "+file.fileName().toStdString()); + /// \note never send signal here, because it's called by the destructor + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Close; + #endif + bool emit_closed=false; + if(!fakeMode) + { + if(file.isOpen()) + { + if(!needRemoveTheFile) + { + if(startSize!=lastGoodPosition) + if(!file.resize(lastGoodPosition)) + { + if(emitSignal) + { + errorString_internal=file.errorString().toStdString(); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("Unable to seek after open: %1, error: %2").arg(file.fileName()).arg(QString::fromStdString(errorString_internal)).toStdString()); + emit error(); + } + else + needRemoveTheFile=true; + } + } + file.close(); + if(needRemoveTheFile || stopIt) + { + if(deletePartiallyTransferredFiles) + { + if(!file.remove()) + if(emitSignal) + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] unable to remove the destination file"); + } + } + //here and not after, because the transferThread don't need try close if not open + if(emitSignal) + emit_closed=true; + } + } + else + { + //here and not after, because the transferThread don't need try close if not open + + if(emitSignal) + emit_closed=true; + } + needRemoveTheFile=false; + resumeNotStarted(); + //warning: file.setFileName(""); need be after resumeNotStarted() + file.setFileName(QStringLiteral("")); + if(emit_closed) + emit closed(); + + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Idle; + #endif + + /// \note always the last of this function + if(!fakeMode) + isOpen.release(); +} + +void WriteThread::internalReopen() +{ + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] start"); + QString tempFile=file.fileName(); + internalClose(false); + flushBuffer(); + stopIt=false; + lastGoodPosition=0; + file.setFileName(tempFile); + if(internalOpen()) + emit reopened(); +} + +void WriteThread::reopen() +{ + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] start"); + stopIt=true; + endDetected=false; + emit internalStartReopen(); +} + +#ifdef ULTRACOPIER_PLUGIN_DEBUG +//to set the id +void WriteThread::setId(int id) +{ + this->id=id; +} +#endif + +/// \brief do the fake open +void WriteThread::fakeOpen() +{ + fakeMode=true; + postOperationRequested=false; + emit opened(); +} + +/// \brief do the fake writeIsStarted +void WriteThread::fakeWriteIsStarted() +{ + emit writeIsStarted(); +} + +/// \brief do the fake writeIsStopped +void WriteThread::fakeWriteIsStopped() +{ + emit writeIsStopped(); +} + +/// do the checksum +void WriteThread::startCheckSum() +{ + emit internalStartChecksum(); +} + +/** \brief set block size +\param block the new block size in B +\return Return true if succes */ +bool WriteThread::setBlockSize(const int blockSize) +{ + //can be smaller than min block size to do correct speed limitation + if(blockSize>1 && blockSize<ULTRACOPIER_PLUGIN_MAX_BLOCK_SIZE*1024) + { + this->blockSize=blockSize; + return true; + } + else + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"block size out of range: "+std::to_string(blockSize)); + return false; + } +} + +/// \brief get the last good position +int64_t WriteThread::getLastGoodPosition() const +{ + return lastGoodPosition; +} + +void WriteThread::flushAndSeekToZero() +{ + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"flushAndSeekToZero: "+std::to_string(blockSize)); + stopIt=true; + emit internalStartFlushAndSeekToZero(); +} + + +void WriteThread::checkSum() +{ + //QByteArray blockArray; + QCryptographicHash hash(QCryptographicHash::Sha1); + endDetected=false; + lastGoodPosition=0; + #ifdef ULTRACOPIER_PLUGIN_SPEED_SUPPORT + numberOfBlockCopied=0; + #endif + if(!file.seek(0)) + { + errorString_internal=file.errorString().toStdString(); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("Unable to seek after open: %1, error: %2").arg(file.fileName()).arg(QString::fromStdString(errorString_internal)).toStdString()); + emit error(); + return; + } + int sizeReaden=0; + do + { + if(putInPause) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Information,"["+std::to_string(id)+"] write put in pause"); + if(stopIt) + return; + pauseMutex.acquire(); + if(stopIt) + return; + } + //read one block + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Read; + #endif + blockArray=file.read(blockSize); + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Idle; + #endif + + if(file.error()!=QFile::NoError) + { + errorString_internal=tr("Unable to read the source file: ").toStdString()+file.errorString().toStdString()+" ("+std::to_string(file.error())+")"; + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("file.error()!=QFile::NoError: %1, error: %2").arg(QString::number(file.error())).arg(QString::fromStdString(errorString_internal)).toStdString()); + emit error(); + return; + } + sizeReaden=blockArray.size(); + if(sizeReaden>0) + { + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Checksum; + #endif + hash.addData(blockArray); + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Idle; + #endif + + if(stopIt) + break; + + lastGoodPosition+=blockArray.size(); + } + } + while(sizeReaden>0 && !stopIt); + if(lastGoodPosition>(quint64)file.size()) + { + errorString_internal=tr("File truncated during read, possible data change").toStdString(); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("Source truncated during the read: %1 (%2)").arg(file.errorString()).arg(QString::number(file.error())).toStdString()); + emit error(); + return; + } + if(stopIt) + { +/* if(putInPause) + emit isInPause();*/ + stopIt=false; + return; + } + emit checksumFinish(hash.result()); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] stop the read"); +} + +void WriteThread::internalFlushAndSeekToZero() +{ + flushBuffer(); + if(!file.seek(0)) + { + errorString_internal=file.errorString().toStdString(); + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("Unable to seek after open: %1, error: %2").arg(file.fileName()).arg(QString::fromStdString(errorString_internal)).toStdString()); + emit error(); + return; + } + stopIt=false; + emit flushedAndSeekedToZero(); +} + +void WriteThread::setMkpathTransfer(QSemaphore *mkpathTransfer) +{ + this->mkpathTransfer=mkpathTransfer; +} + +void WriteThread::setDeletePartiallyTransferredFiles(const bool &deletePartiallyTransferredFiles) +{ + this->deletePartiallyTransferredFiles=deletePartiallyTransferredFiles; +} + +bool WriteThread::write(const QByteArray &data) +{ + if(stopIt) + return false; + bool atMax; + if(sequential) + { + if(stopIt) + return false; + { + QMutexLocker lock_mutex(&accessList); + theBlockList.append(data); + atMax=(theBlockList.size()>=numberOfBlock); + } + if(atMax) + emit internalStartWrite(); + } + else + { + if(stopIt) + return false; + { + QMutexLocker lock_mutex(&accessList); + theBlockList.append(data); + atMax=(theBlockList.size()>=numberOfBlock); + } + emit internalStartWrite(); + } + if(atMax) + { + writeFullBlocked=true; + writeFull.acquire(); + writeFullBlocked=false; + } + if(stopIt) + return false; + #ifdef ULTRACOPIER_PLUGIN_SPEED_SUPPORT + //wait for limitation speed if stop not query + if(multiForBigSpeed>0) + { + if(sequential) + { + numberOfBlockCopied++; + if(numberOfBlockCopied>=(multiForBigSpeed*2)) + { + numberOfBlockCopied=0; + waitNewClockForSpeed.acquire(); + } + } + else + { + numberOfBlockCopied2++; + if(numberOfBlockCopied2>=multiForBigSpeed) + { + numberOfBlockCopied2=0; + waitNewClockForSpeed2.acquire(); + } + } + } + #endif + if(stopIt) + return false; + return true; +} + +void WriteThread::internalWrite() +{ + #ifdef ULTRACOPIER_PLUGIN_SPEED_SUPPORT + if(sequential) + { + multiForBigSpeed=0; + QMutexLocker lock_mutex(&accessList); + if(theBlockList.size()<numberOfBlock && !endDetected) + return; + } + #endif + bool haveBlock; + do + { + if(putInPause) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Information,"["+std::to_string(id)+"] write put in pause"); + if(stopIt) + return; + pauseMutex.acquire(); + if(stopIt) + return; + } + if(stopIt) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] stopIt"); + return; + } + if(stopIt) + return; + //read one block + { + QMutexLocker lock_mutex(&accessList); + if(theBlockList.isEmpty()) + haveBlock=false; + else + { + blockArray=theBlockList.first(); + if(multiForBigSpeed>0) + { + if(blockArray.size()==blockSize) + { + theBlockList.removeFirst(); + //if remove one block + if(!sequential) + writeFull.release(); + } + else + { + blockArray.clear(); + while(blockArray.size()!=blockSize) + { + //if larger + if(theBlockList.first().size()>blockSize) + { + blockArray+=theBlockList.first().mid(0,blockSize); + theBlockList.first().remove(0,blockSize); + if(!sequential) + { + //do write in loop to finish the actual block + emit internalStartWrite(); + } + break; + } + //if smaller + else + { + blockArray+=theBlockList.first(); + theBlockList.removeFirst(); + //if remove one block + if(!sequential) + writeFull.release(); + if(theBlockList.isEmpty()) + break; + } + } + } + //haveBlock=!blockArray.isEmpty(); + } + else + { + theBlockList.removeFirst(); + //if remove one block + if(!sequential) + writeFull.release(); + } + haveBlock=true; + } + } + if(stopIt) + return; + if(!haveBlock) + { + if(sequential) + { + if(endDetected) + internalEndOfFile(); + else + writeFull.release(); + return; + } + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] End detected of the file"); + return; + } + #ifdef ULTRACOPIER_PLUGIN_SPEED_SUPPORT + //wait for limitation speed if stop not query + if(multiForBigSpeed>0) + { + numberOfBlockCopied++; + if(sequential || (!sequential && writeFullBlocked)) + { + if(numberOfBlockCopied>=(multiForBigSpeed*2)) + { + numberOfBlockCopied=0; + waitNewClockForSpeed.acquire(); + if(stopIt) + break; + } + } + else + { + if(numberOfBlockCopied>=multiForBigSpeed) + { + numberOfBlockCopied=0; + waitNewClockForSpeed.acquire(); + if(stopIt) + break; + } + } + } + #endif + if(stopIt) + return; + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Write; + #endif + bytesWriten=file.write(blockArray); + #ifdef ULTRACOPIER_PLUGIN_DEBUG + stat=Idle; + #endif + //mutex for stream this data + if(lastGoodPosition==0) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Notice,"["+std::to_string(id)+"] emit writeIsStarted()"); + emit writeIsStarted(); + } + if(stopIt) + return; + if(file.error()!=QFile::NoError) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("Error in writing: %1 (%2)").arg(file.errorString()).arg(file.error()).toStdString()); + errorString_internal=QStringLiteral("Error in writing: %1 (%2)").arg(file.errorString()).arg(file.error()).toStdString(); + stopIt=true; + emit error(); + return; + } + if(bytesWriten!=blockArray.size()) + { + ULTRACOPIER_DEBUGCONSOLE(Ultracopier::DebugLevel_Warning,"["+std::to_string(id)+"] "+QStringLiteral("Error in writing, bytesWriten: %1, blockArray.size(): %2").arg(bytesWriten).arg(blockArray.size()).toStdString()); + errorString_internal=QStringLiteral("Error in writing, bytesWriten: %1, blockArray.size(): %2").arg(bytesWriten).arg(blockArray.size()).toStdString(); + stopIt=true; + emit error(); + return; + } + lastGoodPosition+=bytesWriten; + } while(sequential); +} |