| // Copyright 2013 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "chrome/browser/chromeos/drive/drive_file_stream_reader.h" |
| |
| #include <algorithm> |
| #include <cstring> |
| |
| #include "base/callback_helpers.h" |
| #include "base/logging.h" |
| #include "base/sequenced_task_runner.h" |
| #include "chrome/browser/chromeos/drive/drive.pb.h" |
| #include "chrome/browser/chromeos/drive/file_system_interface.h" |
| #include "chrome/browser/chromeos/drive/local_file_reader.h" |
| #include "chrome/browser/google_apis/task_util.h" |
| #include "content/public/browser/browser_thread.h" |
| #include "net/base/io_buffer.h" |
| #include "net/base/net_errors.h" |
| #include "net/http/http_byte_range.h" |
| |
| using content::BrowserThread; |
| |
| namespace drive { |
| namespace { |
| |
| // Converts FileError code to net::Error code. |
| int FileErrorToNetError(FileError error) { |
| return net::PlatformFileErrorToNetError(FileErrorToPlatformError(error)); |
| } |
| |
| // Runs task on UI thread. |
| void RunTaskOnUIThread(const base::Closure& task) { |
| google_apis::RunTaskOnThread( |
| BrowserThread::GetMessageLoopProxyForThread(BrowserThread::UI), task); |
| } |
| |
| } // namespace |
| |
| namespace internal { |
| namespace { |
| |
| // Copies the content in |pending_data| into |buffer| at most |
| // |buffer_length| bytes, and erases the copied data from |
| // |pending_data|. Returns the number of copied bytes. |
| int ReadInternal(ScopedVector<std::string>* pending_data, |
| net::IOBuffer* buffer, int buffer_length) { |
| size_t index = 0; |
| int offset = 0; |
| for (; index < pending_data->size() && offset < buffer_length; ++index) { |
| const std::string& chunk = *(*pending_data)[index]; |
| DCHECK(!chunk.empty()); |
| |
| size_t bytes_to_read = std::min( |
| chunk.size(), static_cast<size_t>(buffer_length - offset)); |
| std::memmove(buffer->data() + offset, chunk.data(), bytes_to_read); |
| offset += bytes_to_read; |
| if (bytes_to_read < chunk.size()) { |
| // The chunk still has some remaining data. |
| // So remove leading (copied) bytes, and quit the loop so that |
| // the remaining data won't be deleted in the following erase(). |
| (*pending_data)[index]->erase(0, bytes_to_read); |
| break; |
| } |
| } |
| |
| // Consume the copied data. |
| pending_data->erase(pending_data->begin(), pending_data->begin() + index); |
| |
| return offset; |
| } |
| |
| } // namespace |
| |
| LocalReaderProxy::LocalReaderProxy( |
| scoped_ptr<util::LocalFileReader> file_reader, int64 length) |
| : file_reader_(file_reader.Pass()), |
| remaining_length_(length), |
| weak_ptr_factory_(this) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| DCHECK(file_reader_); |
| } |
| |
| LocalReaderProxy::~LocalReaderProxy() { |
| } |
| |
| int LocalReaderProxy::Read(net::IOBuffer* buffer, int buffer_length, |
| const net::CompletionCallback& callback) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| DCHECK(file_reader_); |
| |
| if (buffer_length > remaining_length_) { |
| // Here, narrowing is safe. |
| buffer_length = static_cast<int>(remaining_length_); |
| } |
| |
| file_reader_->Read(buffer, buffer_length, |
| base::Bind(&LocalReaderProxy::OnReadCompleted, |
| weak_ptr_factory_.GetWeakPtr(), callback)); |
| return net::ERR_IO_PENDING; |
| } |
| |
| void LocalReaderProxy::OnGetContent(scoped_ptr<std::string> data) { |
| // This method should never be called, because no data should be received |
| // from the network during the reading of local-cache file. |
| NOTREACHED(); |
| } |
| |
| void LocalReaderProxy::OnCompleted(FileError error) { |
| // If this method is called, no network error should be happened. |
| DCHECK_EQ(FILE_ERROR_OK, error); |
| } |
| |
| void LocalReaderProxy::OnReadCompleted(const net::CompletionCallback& callback, |
| int read_result) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| DCHECK(file_reader_); |
| |
| if (read_result >= 0) { |
| // |read_result| bytes data is read. |
| DCHECK_LE(read_result, remaining_length_); |
| remaining_length_ -= read_result; |
| } else { |
| // An error occurs. Close the |file_reader_|. |
| file_reader_.reset(); |
| } |
| callback.Run(read_result); |
| } |
| |
| NetworkReaderProxy::NetworkReaderProxy( |
| int64 offset, |
| int64 content_length, |
| const base::Closure& job_canceller) |
| : remaining_offset_(offset), |
| remaining_content_length_(content_length), |
| error_code_(net::OK), |
| buffer_length_(0), |
| job_canceller_(job_canceller) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| } |
| |
| NetworkReaderProxy::~NetworkReaderProxy() { |
| if (!job_canceller_.is_null()) { |
| job_canceller_.Run(); |
| } |
| } |
| |
| int NetworkReaderProxy::Read(net::IOBuffer* buffer, int buffer_length, |
| const net::CompletionCallback& callback) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| // Check if there is no pending Read operation. |
| DCHECK(!buffer_.get()); |
| DCHECK_EQ(buffer_length_, 0); |
| DCHECK(callback_.is_null()); |
| // Validate the arguments. |
| DCHECK(buffer); |
| DCHECK_GT(buffer_length, 0); |
| DCHECK(!callback.is_null()); |
| |
| if (error_code_ != net::OK) { |
| // An error is already found. Return it immediately. |
| return error_code_; |
| } |
| |
| if (remaining_content_length_ == 0) { |
| // If no more data, return immediately. |
| return 0; |
| } |
| |
| if (buffer_length > remaining_content_length_) { |
| // Here, narrowing cast should be safe. |
| buffer_length = static_cast<int>(remaining_content_length_); |
| } |
| |
| if (pending_data_.empty()) { |
| // No data is available. Keep the arguments, and return pending status. |
| buffer_ = buffer; |
| buffer_length_ = buffer_length; |
| callback_ = callback; |
| return net::ERR_IO_PENDING; |
| } |
| |
| int result = ReadInternal(&pending_data_, buffer, buffer_length); |
| remaining_content_length_ -= result; |
| DCHECK_GE(remaining_content_length_, 0); |
| return result; |
| } |
| |
| void NetworkReaderProxy::OnGetContent(scoped_ptr<std::string> data) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| DCHECK(data && !data->empty()); |
| |
| if (remaining_offset_ >= static_cast<int64>(data->length())) { |
| // Skip unneeded leading data. |
| remaining_offset_ -= data->length(); |
| return; |
| } |
| |
| if (remaining_offset_ > 0) { |
| // Erase unnecessary leading bytes. |
| data->erase(0, static_cast<size_t>(remaining_offset_)); |
| remaining_offset_ = 0; |
| } |
| |
| pending_data_.push_back(data.release()); |
| if (!buffer_.get()) { |
| // No pending Read operation. |
| return; |
| } |
| |
| int result = ReadInternal(&pending_data_, buffer_.get(), buffer_length_); |
| remaining_content_length_ -= result; |
| DCHECK_GE(remaining_content_length_, 0); |
| |
| buffer_ = NULL; |
| buffer_length_ = 0; |
| DCHECK(!callback_.is_null()); |
| base::ResetAndReturn(&callback_).Run(result); |
| } |
| |
| void NetworkReaderProxy::OnCompleted(FileError error) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| // The downloading is completed, so we do not need to cancel the job |
| // in the destructor. |
| job_canceller_.Reset(); |
| |
| if (error == FILE_ERROR_OK) { |
| return; |
| } |
| |
| error_code_ = FileErrorToNetError(error); |
| pending_data_.clear(); |
| |
| if (callback_.is_null()) { |
| // No pending Read operation. |
| return; |
| } |
| |
| buffer_ = NULL; |
| buffer_length_ = 0; |
| base::ResetAndReturn(&callback_).Run(error_code_); |
| } |
| |
| } // namespace internal |
| |
| namespace { |
| |
| // Calls FileSystemInterface::GetFileContent if the file system |
| // is available. If not, the |completion_callback| is invoked with |
| // FILE_ERROR_FAILED. |
| void GetFileContentOnUIThread( |
| const DriveFileStreamReader::FileSystemGetter& file_system_getter, |
| const base::FilePath& drive_file_path, |
| const GetFileContentInitializedCallback& initialized_callback, |
| const google_apis::GetContentCallback& get_content_callback, |
| const FileOperationCallback& completion_callback) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); |
| |
| FileSystemInterface* file_system = file_system_getter.Run(); |
| if (!file_system) { |
| completion_callback.Run(FILE_ERROR_FAILED); |
| return; |
| } |
| |
| file_system->GetFileContent(drive_file_path, |
| initialized_callback, |
| get_content_callback, |
| completion_callback); |
| } |
| |
| // Helper to run FileSystemInterface::GetFileContent on UI thread. |
| void GetFileContent( |
| const DriveFileStreamReader::FileSystemGetter& file_system_getter, |
| const base::FilePath& drive_file_path, |
| const GetFileContentInitializedCallback& initialized_callback, |
| const google_apis::GetContentCallback& get_content_callback, |
| const FileOperationCallback& completion_callback) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| |
| BrowserThread::PostTask( |
| BrowserThread::UI, |
| FROM_HERE, |
| base::Bind(&GetFileContentOnUIThread, |
| file_system_getter, |
| drive_file_path, |
| google_apis::CreateRelayCallback(initialized_callback), |
| google_apis::CreateRelayCallback(get_content_callback), |
| google_apis::CreateRelayCallback(completion_callback))); |
| } |
| |
| } // namespace |
| |
| DriveFileStreamReader::DriveFileStreamReader( |
| const FileSystemGetter& file_system_getter, |
| base::SequencedTaskRunner* file_task_runner) |
| : file_system_getter_(file_system_getter), |
| file_task_runner_(file_task_runner), |
| weak_ptr_factory_(this) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| } |
| |
| DriveFileStreamReader::~DriveFileStreamReader() { |
| } |
| |
| bool DriveFileStreamReader::IsInitialized() const { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| return reader_proxy_.get() != NULL; |
| } |
| |
| void DriveFileStreamReader::Initialize( |
| const base::FilePath& drive_file_path, |
| const net::HttpByteRange& byte_range, |
| const InitializeCompletionCallback& callback) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| DCHECK(!callback.is_null()); |
| |
| GetFileContent( |
| file_system_getter_, |
| drive_file_path, |
| base::Bind(&DriveFileStreamReader |
| ::InitializeAfterGetFileContentInitialized, |
| weak_ptr_factory_.GetWeakPtr(), |
| byte_range, |
| callback), |
| base::Bind(&DriveFileStreamReader::OnGetContent, |
| weak_ptr_factory_.GetWeakPtr()), |
| base::Bind(&DriveFileStreamReader::OnGetFileContentCompletion, |
| weak_ptr_factory_.GetWeakPtr(), |
| callback)); |
| } |
| |
| int DriveFileStreamReader::Read(net::IOBuffer* buffer, int buffer_length, |
| const net::CompletionCallback& callback) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| DCHECK(reader_proxy_); |
| DCHECK(buffer); |
| DCHECK(!callback.is_null()); |
| return reader_proxy_->Read(buffer, buffer_length, callback); |
| } |
| |
| void DriveFileStreamReader::InitializeAfterGetFileContentInitialized( |
| const net::HttpByteRange& in_byte_range, |
| const InitializeCompletionCallback& callback, |
| FileError error, |
| scoped_ptr<ResourceEntry> entry, |
| const base::FilePath& local_cache_file_path, |
| const base::Closure& ui_cancel_download_closure) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| |
| if (error != FILE_ERROR_OK) { |
| callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>()); |
| return; |
| } |
| DCHECK(entry); |
| |
| net::HttpByteRange byte_range = in_byte_range; |
| if (!byte_range.ComputeBounds(entry->file_info().size())) { |
| // If |byte_range| is invalid (e.g. out of bounds), return with an error. |
| // At the same time, we cancel the in-flight downloading operation if |
| // needed and and invalidate weak pointers so that we won't |
| // receive unwanted callbacks. |
| if (!ui_cancel_download_closure.is_null()) |
| ui_cancel_download_closure.Run(); |
| weak_ptr_factory_.InvalidateWeakPtrs(); |
| callback.Run( |
| net::ERR_REQUEST_RANGE_NOT_SATISFIABLE, scoped_ptr<ResourceEntry>()); |
| return; |
| } |
| |
| // Note: both boundary of |byte_range| are inclusive. |
| int64 range_length = |
| byte_range.last_byte_position() - byte_range.first_byte_position() + 1; |
| DCHECK_GE(range_length, 0); |
| |
| if (local_cache_file_path.empty()) { |
| // The file is not cached, and being downloaded. |
| DCHECK(!ui_cancel_download_closure.is_null()); |
| reader_proxy_.reset( |
| new internal::NetworkReaderProxy( |
| byte_range.first_byte_position(), range_length, |
| base::Bind(&RunTaskOnUIThread, ui_cancel_download_closure))); |
| callback.Run(net::OK, entry.Pass()); |
| return; |
| } |
| |
| // Otherwise, open the stream for file. |
| scoped_ptr<util::LocalFileReader> file_reader( |
| new util::LocalFileReader(file_task_runner_.get())); |
| util::LocalFileReader* file_reader_ptr = file_reader.get(); |
| file_reader_ptr->Open( |
| local_cache_file_path, |
| byte_range.first_byte_position(), |
| base::Bind( |
| &DriveFileStreamReader::InitializeAfterLocalFileOpen, |
| weak_ptr_factory_.GetWeakPtr(), |
| range_length, |
| callback, |
| base::Passed(&entry), |
| base::Passed(&file_reader))); |
| } |
| |
| void DriveFileStreamReader::InitializeAfterLocalFileOpen( |
| int64 length, |
| const InitializeCompletionCallback& callback, |
| scoped_ptr<ResourceEntry> entry, |
| scoped_ptr<util::LocalFileReader> file_reader, |
| int open_result) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| |
| if (open_result != net::OK) { |
| callback.Run(net::ERR_FAILED, scoped_ptr<ResourceEntry>()); |
| return; |
| } |
| |
| reader_proxy_.reset( |
| new internal::LocalReaderProxy(file_reader.Pass(), length)); |
| callback.Run(net::OK, entry.Pass()); |
| } |
| |
| void DriveFileStreamReader::OnGetContent(google_apis::GDataErrorCode error_code, |
| scoped_ptr<std::string> data) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| DCHECK(reader_proxy_); |
| reader_proxy_->OnGetContent(data.Pass()); |
| } |
| |
| void DriveFileStreamReader::OnGetFileContentCompletion( |
| const InitializeCompletionCallback& callback, |
| FileError error) { |
| DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO)); |
| |
| if (reader_proxy_) { |
| // If the proxy object available, send the error to it. |
| reader_proxy_->OnCompleted(error); |
| } else { |
| // Here the proxy object is not yet available. |
| // There are two cases. 1) Some error happens during the initialization. |
| // 2) the cache file is found, but the proxy object is not *yet* |
| // initialized because the file is being opened. |
| // We are interested in 1) only. The callback for 2) will be called |
| // after opening the file is completed. |
| // Note: due to the same reason, LocalReaderProxy::OnCompleted may |
| // or may not be called. This is timing issue, and it is difficult to avoid |
| // unfortunately. |
| if (error != FILE_ERROR_OK) { |
| callback.Run(FileErrorToNetError(error), scoped_ptr<ResourceEntry>()); |
| } |
| } |
| } |
| |
| } // namespace drive |