如何用原始InputStream实例化FSDataInputStream?

我真的不知道如何创建这样的inputStream,它是Seekable和PositionedReadable ...

Resource resource = new ClassPathResource("somefile");
InputStream bla = resource.getInputStream();
FSDataInputStream inputStream = new FSDataInputStream (bla);

在FS线投掷:

java.lang.IllegalArgumentException: In is not an instance of Seekable or PositionedReadable

我需要做嘲笑,这对我来说是一个阻碍。


FSDataInputStream构造函数,如下所示,在FSDataInputStream.java中定义,它需要InputStream参数为SeekablePositionedReadableinstance

public FSDataInputStream(InputStream in) throws IOException 
 {
    super(in);
    if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
      throw new IllegalArgumentException(
          "In is not an instance of Seekable or PositionedReadable");
    }
 }

希望以下solution可帮助您

import java.io.*;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

public class SeekableTest {

    public static void main(String[] args) throws IOException
    {
        Resource resource = new ClassPathResource("somefile");
        InputStream in = resource.getInputStream();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        byte buf[] = new byte[1024];
        int read;

        while ((read = in.read(buf)) > 0)
          baos.write(buf, 0, read);

        byte data[] = baos.toByteArray();
        SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
        FSDataInputStream in2 = new FSDataInputStream(bais);
    }

    static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {

        public SeekableByteArrayInputStream(byte[] buf)
        {
            super(buf);
        }
        @Override
        public long getPos() throws IOException{
            return pos;
        }

        @Override
        public void seek(long pos) throws IOException {
          if (mark != 0)
            throw new IllegalStateException();

          reset();
          long skipped = skip(pos);

          if (skipped != pos)
            throw new IOException();
        }

        @Override
        public boolean seekToNewSource(long targetPos) throws IOException {
          return false;
        }

        @Override
        public int read(long position, byte[] buffer, int offset, int length) throws IOException {

          if (position >= buf.length)
            throw new IllegalArgumentException();
          if (position + length > buf.length)
            throw new IllegalArgumentException();
          if (length > buffer.length)
            throw new IllegalArgumentException();

          System.arraycopy(buf, (int) position, buffer, offset, length);
          return length;
        }

        @Override
        public void readFully(long position, byte[] buffer) throws IOException {
          read(position, buffer, 0, buffer.length);

        }

        @Override
        public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
          read(position, buffer, offset, length);
        }
    }
}

参考:accumulo

链接地址: http://www.djcxy.com/p/21905.html

上一篇: How to instantiate FSDataInputStream with raw InputStream?

下一篇: Twilio: Update the Voice (or Message) URL via API?