Add tf_random_access_file
diff --git a/tensorflow/c/experimental/filesystem/plugins/hadoop/BUILD b/tensorflow/c/experimental/filesystem/plugins/hadoop/BUILD
index 97b3b96..51ffd70 100644
--- a/tensorflow/c/experimental/filesystem/plugins/hadoop/BUILD
+++ b/tensorflow/c/experimental/filesystem/plugins/hadoop/BUILD
@@ -29,5 +29,7 @@
         "//tensorflow/c:tf_status",
         "//tensorflow/c/experimental/filesystem:filesystem_interface",
         "//third_party/hadoop:hdfs",
+        "@com_google_absl//absl/base:core_headers",
+        "@com_google_absl//absl/synchronization",
     ],
 )
diff --git a/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc b/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc
index b63db80..517c920 100644
--- a/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc
+++ b/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc
@@ -22,6 +22,7 @@
 #include <sstream>
 #include <string>
 
+#include "absl/synchronization/mutex.h"
 #include "tensorflow/c/env.h"
 #include "tensorflow/c/experimental/filesystem/filesystem_interface.h"
 #include "tensorflow/c/tf_status.h"
@@ -206,8 +207,87 @@
 // SECTION 1. Implementation for `TF_RandomAccessFile`
 // ----------------------------------------------------------------------------
 namespace tf_random_access_file {
+typedef struct HDFSFile {
+  std::string path;
+  std::string hdfs_path;
+  hdfsFS fs;
+  LibHDFS* libhdfs;
+  absl::Mutex mu;
+  hdfsFile handle ABSL_GUARDED_BY(mu);
+  HDFSFile(std::string path, std::string hdfs_path, hdfsFS fs, LibHDFS* libhdfs,
+           hdfsFile handle)
+      : path(std::move(path)),
+        hdfs_path(std::move(hdfs_path)),
+        fs(fs),
+        libhdfs(libhdfs),
+        mu(),
+        handle(handle) {}
+} HDFSFile;
 
-// TODO(vnvo2409): Implement later
+void Cleanup(TF_RandomAccessFile* file) {
+  auto hdfs_file = static_cast<HDFSFile*>(file->plugin_file);
+  {
+    absl::MutexLock l(&hdfs_file->mu);
+    if (hdfs_file->handle != nullptr) {
+      hdfs_file->libhdfs->hdfsCloseFile(hdfs_file->fs, hdfs_file->handle);
+    }
+  }
+  delete hdfs_file;
+}
+
+int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
+             char* buffer, TF_Status* status) {
+  auto hdfs_file = static_cast<HDFSFile*>(file->plugin_file);
+  auto libhdfs = hdfs_file->libhdfs;
+  auto fs = hdfs_file->fs;
+  auto hdfs_path = hdfs_file->hdfs_path.c_str();
+  auto path = hdfs_file->path.c_str();
+
+  char* dst = buffer;
+  bool eof_retried = false;
+  int64_t r = 0;
+  while (TF_GetCode(status) == TF_OK && !eof_retried) {
+    // We lock inside the loop rather than outside so we don't block other
+    // concurrent readers.
+    absl::MutexLock l(&hdfs_file->mu);
+    auto handle = hdfs_file->handle;
+    // Max read length is INT_MAX-2, for hdfsPread function take a parameter
+    // of int32. -2 offset can avoid JVM OutOfMemoryError.
+    size_t read_n =
+        (std::min)(n, static_cast<size_t>(std::numeric_limits<int>::max() - 2));
+    r = libhdfs->hdfsPread(fs, handle, static_cast<tOffset>(offset), dst,
+                           static_cast<tSize>(read_n));
+    if (r > 0) {
+      dst += r;
+      n -= r;
+      offset += r;
+    } else if (!eof_retried && r == 0) {
+      // Always reopen the file upon reaching EOF to see if there's more data.
+      // If writers are streaming contents while others are concurrently
+      // reading, HDFS requires that we reopen the file to see updated
+      // contents.
+      //
+      // Fixes #5438
+      if (handle != nullptr && libhdfs->hdfsCloseFile(fs, handle) != 0) {
+        TF_SetStatusFromIOError(status, errno, path);
+        return -1;
+      }
+      handle = libhdfs->hdfsOpenFile(fs, hdfs_path, O_RDONLY, 0, 0, 0);
+      if (handle == nullptr) {
+        TF_SetStatusFromIOError(status, errno, path);
+        return -1;
+      }
+      eof_retried = true;
+    } else if (eof_retried && r == 0) {
+      TF_SetStatus(status, TF_OUT_OF_RANGE, "Read less bytes than requested");
+    } else if (errno == EINTR || errno == EAGAIN) {
+      // hdfsPread may return EINTR too. Just retry.
+    } else {
+      TF_SetStatusFromIOError(status, errno, path);
+    }
+  }
+  return r;
+}
 
 }  // namespace tf_random_access_file