Add tf_random_access_file
diff --git a/tensorflow/c/experimental/filesystem/plugins/hadoop/BUILD b/tensorflow/c/experimental/filesystem/plugins/hadoop/BUILD
index 97b3b96..51ffd70 100644
--- a/tensorflow/c/experimental/filesystem/plugins/hadoop/BUILD
+++ b/tensorflow/c/experimental/filesystem/plugins/hadoop/BUILD
@@ -29,5 +29,7 @@
"//tensorflow/c:tf_status",
"//tensorflow/c/experimental/filesystem:filesystem_interface",
"//third_party/hadoop:hdfs",
+ "@com_google_absl//absl/base:core_headers",
+ "@com_google_absl//absl/synchronization",
],
)
diff --git a/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc b/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc
index b63db80..517c920 100644
--- a/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc
+++ b/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc
@@ -22,6 +22,7 @@
#include <sstream>
#include <string>
+#include "absl/synchronization/mutex.h"
#include "tensorflow/c/env.h"
#include "tensorflow/c/experimental/filesystem/filesystem_interface.h"
#include "tensorflow/c/tf_status.h"
@@ -206,8 +207,87 @@
// SECTION 1. Implementation for `TF_RandomAccessFile`
// ----------------------------------------------------------------------------
namespace tf_random_access_file {
+typedef struct HDFSFile {
+ std::string path;
+ std::string hdfs_path;
+ hdfsFS fs;
+ LibHDFS* libhdfs;
+ absl::Mutex mu;
+ hdfsFile handle ABSL_GUARDED_BY(mu);
+ HDFSFile(std::string path, std::string hdfs_path, hdfsFS fs, LibHDFS* libhdfs,
+ hdfsFile handle)
+ : path(std::move(path)),
+ hdfs_path(std::move(hdfs_path)),
+ fs(fs),
+ libhdfs(libhdfs),
+ mu(),
+ handle(handle) {}
+} HDFSFile;
-// TODO(vnvo2409): Implement later
+void Cleanup(TF_RandomAccessFile* file) {
+ auto hdfs_file = static_cast<HDFSFile*>(file->plugin_file);
+ {
+ absl::MutexLock l(&hdfs_file->mu);
+ if (hdfs_file->handle != nullptr) {
+ hdfs_file->libhdfs->hdfsCloseFile(hdfs_file->fs, hdfs_file->handle);
+ }
+ }
+ delete hdfs_file;
+}
+
+int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
+ char* buffer, TF_Status* status) {
+ auto hdfs_file = static_cast<HDFSFile*>(file->plugin_file);
+ auto libhdfs = hdfs_file->libhdfs;
+ auto fs = hdfs_file->fs;
+ auto hdfs_path = hdfs_file->hdfs_path.c_str();
+ auto path = hdfs_file->path.c_str();
+
+ char* dst = buffer;
+ bool eof_retried = false;
+ int64_t r = 0;
+ while (TF_GetCode(status) == TF_OK && !eof_retried) {
+ // We lock inside the loop rather than outside so we don't block other
+ // concurrent readers.
+ absl::MutexLock l(&hdfs_file->mu);
+ auto handle = hdfs_file->handle;
+ // Max read length is INT_MAX-2, for hdfsPread function take a parameter
+ // of int32. -2 offset can avoid JVM OutOfMemoryError.
+ size_t read_n =
+ (std::min)(n, static_cast<size_t>(std::numeric_limits<int>::max() - 2));
+ r = libhdfs->hdfsPread(fs, handle, static_cast<tOffset>(offset), dst,
+ static_cast<tSize>(read_n));
+ if (r > 0) {
+ dst += r;
+ n -= r;
+ offset += r;
+ } else if (!eof_retried && r == 0) {
+ // Always reopen the file upon reaching EOF to see if there's more data.
+ // If writers are streaming contents while others are concurrently
+ // reading, HDFS requires that we reopen the file to see updated
+ // contents.
+ //
+ // Fixes #5438
+ if (handle != nullptr && libhdfs->hdfsCloseFile(fs, handle) != 0) {
+ TF_SetStatusFromIOError(status, errno, path);
+ return -1;
+ }
+ handle = libhdfs->hdfsOpenFile(fs, hdfs_path, O_RDONLY, 0, 0, 0);
+ if (handle == nullptr) {
+ TF_SetStatusFromIOError(status, errno, path);
+ return -1;
+ }
+ eof_retried = true;
+ } else if (eof_retried && r == 0) {
+ TF_SetStatus(status, TF_OUT_OF_RANGE, "Read less bytes than requested");
+ } else if (errno == EINTR || errno == EAGAIN) {
+ // hdfsPread may return EINTR too. Just retry.
+ } else {
+ TF_SetStatusFromIOError(status, errno, path);
+ }
+ }
+ return r;
+}
} // namespace tf_random_access_file