使用HDFS的时候
final Configuration conf = new Configuration();
final FileSystem fs = FileSystem.get(URI.create(hdfsFile), conf);
final Path path = new Path(hdfsFile);
if (fs.exists(path)) {final FSDataInputStream is = fs.open(path);final FileStatus stat = fs.getFileStatus(path);final byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];is.readFully(0, buffer);is.close();fs.close();return buffer;
} else {throw new Exception("the file is not found .");
}
在高并发情况下会报错:
java.io.IOException: Failed on local exception: java.io.InterruptedIOException: Interrupted while waiting for IO on channel java.nio.channels.SocketChannel[connected local=/10.16.3.2:52305 remote=/10.16.3.2:59000]. 60000 millis timeout left.; Host Details : local host is: "hadoop-test/10.16.3.2"; destination host is: "hadoop-alone-test":59000; at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)at org.apache.hadoop.ipc.Client.call(Client.java:1479)at org.apache.hadoop.ipc.Client.call(Client.java:1412)at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)at com.sun.proxy.$Proxy109.getListing(Unknown Source)
问题原因为:多线程访问问题,线程A、B同时获取filesystem后使用,线程B使用完后调用了filesystem.close()方法,这个时候线程A还在操作filesystem,所以报错上面种种异常
禁用FileSystem缓存
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl.disable.cache", "true");
那么明明使用了两个集群,为什么会使用到Cache呢,分析FileSystem.get源码便知道原因了
public static FileSystem get(URI uri, Configuration conf) throws IOException {String scheme = uri.getScheme();String authority = uri.getAuthority();if (scheme == null && authority == null) { // use default FSreturn get(conf);}if (scheme != null && authority == null) { // no authorityURI defaultUri = getDefaultUri(conf);if (scheme.equals(defaultUri.getScheme()) // if scheme matches default&& defaultUri.getAuthority() != null) { // & default has authorityreturn get(defaultUri, conf); // return default}}String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);if (conf.getBoolean(disableCacheName, false)) {LOGGER.debug("Bypassing cache to create filesystem {}", uri);return createFileSystem(uri, conf);}return CACHE.get(uri, conf);}
应用在获取FileSystem时,提供了完整的hdfs目录,同时没有设置fs.hdfs.impl.disable.cache为true,所以创建slave集群的filesystem对象时,会使用CACHE.get(uri, conf)获取,Cache内部使用一个HashMap来维护filesystem对象,很容易想到,当HashMap的key相同时,便返回了同一个filesystem对象,那么Cache中的key是什么样的呢,代码如下:
FileSystem get(URI uri, Configuration conf) throws IOException{Key key = new Key(uri, conf);return getInternal(uri, conf, key);}static class Key {final String scheme;final String authority;final UserGroupInformation ugi;final long unique; // an artificial way to make a key uniqueKey(URI uri, Configuration conf) throws IOException {this(uri, conf, 0);}Key(URI uri, Configuration conf, long unique) throws IOException {scheme = uri.getScheme()==null ?"" : StringUtils.toLowerCase(uri.getScheme());authority = uri.getAuthority()==null ?"" : StringUtils.toLowerCase(uri.getAuthority());this.unique = unique;this.ugi = UserGroupInformation.getCurrentUser();}@Overridepublic int hashCode() {return (scheme + authority).hashCode() + ugi.hashCode() + (int)unique;}static boolean isEqual(Object a, Object b) {return a == b || (a != null && a.equals(b));}@Overridepublic boolean equals(Object obj) {if (obj == this) {return true;}if (obj instanceof Key) {Key that = (Key)obj;return isEqual(this.scheme, that.scheme)&& isEqual(this.authority, that.authority)&& isEqual(this.ugi, that.ugi)&& (this.unique == that.unique);}return false;}@Overridepublic String toString() {return "("+ugi.toString() + ")@" + scheme + "://" + authority;}}}
可以看到Key由四个要素构成,其中前2个跟URI相关,我们使用的为一个hdfs://nameservice1,ugi为安全认证的用户,使用的是同一个,unique为0,因此Key相同,第二次获取filesystem对象时,直接返回了第一次创建的filesystem对象,最终造成了应用虽然使用了不同的集群配置文件,但最中获取的是同一个filesystem对象。
fs.hdfs.impl.disable.cache参数本身不建议修改,修改集群的fs.defaultFS,使不同集群的fs.defaultFS不一样
参考:
多个HDFS集群的fs.defaultFS配置一样,造成应用一直连接同一个集群的问题分析 - 远去的列车 - 博客园
Filesystem closed错误排查 - 简书
java.io.IOException: Filesystem closed - 简书
java.io.IOException: Filesystem closed_bitcarmanlee的博客-CSDN博客_filesystem closed