How to instantiate FSDataInputStream with raw InputStream?

I don't really understand how I can create such inputStream which is Seekable and PositionedReadable...

Resource resource = new ClassPathResource("somefile");
InputStream bla = resource.getInputStream();
FSDataInputStream inputStream = new FSDataInputStream (bla);

Throwing at FS line:

java.lang.IllegalArgumentException: In is not an instance of Seekable or PositionedReadable

I need to do mocks and this is a blocker for me.


FSDataInputStream constructor, as shown below, defined in FSDataInputStream.java expects InputStream parameter to be an instance of Seekable or PositionedReadable

public FSDataInputStream(InputStream in) throws IOException 
 {
    super(in);
    if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
      throw new IllegalArgumentException(
          "In is not an instance of Seekable or PositionedReadable");
    }
 }

Hope following solution helps you.

import java.io.*;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;

public class SeekableTest {

    public static void main(String[] args) throws IOException
    {
        Resource resource = new ClassPathResource("somefile");
        InputStream in = resource.getInputStream();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        byte buf[] = new byte[1024];
        int read;

        while ((read = in.read(buf)) > 0)
          baos.write(buf, 0, read);

        byte data[] = baos.toByteArray();
        SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data);
        FSDataInputStream in2 = new FSDataInputStream(bais);
    }

    static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {

        public SeekableByteArrayInputStream(byte[] buf)
        {
            super(buf);
        }
        @Override
        public long getPos() throws IOException{
            return pos;
        }

        @Override
        public void seek(long pos) throws IOException {
          if (mark != 0)
            throw new IllegalStateException();

          reset();
          long skipped = skip(pos);

          if (skipped != pos)
            throw new IOException();
        }

        @Override
        public boolean seekToNewSource(long targetPos) throws IOException {
          return false;
        }

        @Override
        public int read(long position, byte[] buffer, int offset, int length) throws IOException {

          if (position >= buf.length)
            throw new IllegalArgumentException();
          if (position + length > buf.length)
            throw new IllegalArgumentException();
          if (length > buffer.length)
            throw new IllegalArgumentException();

          System.arraycopy(buf, (int) position, buffer, offset, length);
          return length;
        }

        @Override
        public void readFully(long position, byte[] buffer) throws IOException {
          read(position, buffer, 0, buffer.length);

        }

        @Override
        public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
          read(position, buffer, offset, length);
        }
    }
}

Reference: accumulo

链接地址: http://www.djcxy.com/p/21906.html

上一篇: 对象数组与嵌套对象?

下一篇: 如何用原始InputStream实例化FSDataInputStream?