Store LibHDFS* in filesystem to avoid memory leak
diff --git a/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc b/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc
index ab847ec..b63db80 100644
--- a/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc
+++ b/tensorflow/c/experimental/filesystem/plugins/hadoop/hadoop_filesystem.cc
@@ -160,27 +160,19 @@
   void* handle_;
 };
 
-static const LibHDFS* libhdfs(TF_Status* status) {
-  static const LibHDFS* libhdfs = new LibHDFS(status);
-  return libhdfs;
-}
-
 // We rely on HDFS connection caching here. The HDFS client calls
 // org.apache.hadoop.fs.FileSystem.get(), which caches the connection
 // internally.
-hdfsFS Connect(const std::string& path, TF_Status* status) {
-  auto hdfs_file = libhdfs(status);
-  if (TF_GetCode(status) != TF_OK) return nullptr;
+hdfsFS Connect(LibHDFS* libhdfs, const std::string& path, TF_Status* status) {
+  std::string scheme, namenode, hdfs_path;
+  ParseHadoopPath(path, &scheme, &namenode, &hdfs_path);
 
-  std::string scheme, namenode, nodepath;
-  ParseHadoopPath(path, &scheme, &namenode, &nodepath);
-
-  hdfsBuilder* builder = hdfs_file->hdfsNewBuilder();
+  hdfsBuilder* builder = libhdfs->hdfsNewBuilder();
   if (scheme == "file") {
-    hdfs_file->hdfsBuilderSetNameNode(builder, nullptr);
+    libhdfs->hdfsBuilderSetNameNode(builder, nullptr);
   } else if (scheme == "viewfs") {
     char* defaultFS = nullptr;
-    hdfs_file->hdfsConfGetStr("fs.defaultFS", &defaultFS);
+    libhdfs->hdfsConfGetStr("fs.defaultFS", &defaultFS);
     std::string defaultScheme, defaultCluster, defaultPath;
     ParseHadoopPath(defaultFS, &defaultScheme, &defaultCluster, &defaultPath);
 
@@ -193,17 +185,17 @@
     // The default NameNode configuration will be used (from the XML
     // configuration files). See:
     // https://github.com/tensorflow/tensorflow/blob/v1.0.0/third_party/hadoop/hdfs.h#L259
-    hdfs_file->hdfsBuilderSetNameNode(builder, "default");
+    libhdfs->hdfsBuilderSetNameNode(builder, "default");
   } else if (scheme == "har") {
     std::string path_har = path;
     SplitArchiveNameAndPath(&path_har, &namenode, status);
     if (TF_GetCode(status) != TF_OK) return nullptr;
-    hdfs_file->hdfsBuilderSetNameNode(builder, namenode.c_str());
+    libhdfs->hdfsBuilderSetNameNode(builder, namenode.c_str());
   } else {
-    hdfs_file->hdfsBuilderSetNameNode(
+    libhdfs->hdfsBuilderSetNameNode(
         builder, namenode.empty() ? "default" : namenode.c_str());
   }
-  auto fs = hdfs_file->hdfsBuilderConnect(builder);
+  auto fs = libhdfs->hdfsBuilderConnect(builder);
   if (fs == nullptr)
     TF_SetStatusFromIOError(status, TF_NOT_FOUND, strerror(errno));
   else
@@ -240,10 +232,15 @@
 namespace tf_hadoop_filesystem {
 
 void Init(TF_Filesystem* filesystem, TF_Status* status) {
+  filesystem->plugin_filesystem = new LibHDFS(status);
+  if (TF_GetCode(status) != TF_OK) return;
   TF_SetStatus(status, TF_OK, "");
 }
 
-void Cleanup(TF_Filesystem* filesystem) {}
+void Cleanup(TF_Filesystem* filesystem) {
+  auto libhdfs = static_cast<LibHDFS*>(filesystem->plugin_filesystem);
+  delete libhdfs;
+}
 
 // TODO(vnvo2409): Implement later