服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Netty 轻松实现文件上传功能

Netty 轻松实现文件上传功能

2021-09-28 10:14Java技术栈 Java教程

今天我们来完成一个使用netty进行文件传输的任务。在实际项目中,文件传输通常采用FTP或者HTTP附件的方式,对Netty 文件上传功能感兴趣的朋友一起看看吧

今天我们来完成一个使用netty进行文件传输的任务。在实际项目中,文件传输通常采用FTP或者HTTP附件的方式。事实上通过TCP Socket+File的方式进行文件传输也有一定的应用场景,尽管不是主流,但是掌握这种文件传输方式还是比较重要的,特别是针对两个跨主机的JVM进程之间进行持久化数据的相互交换。

而使用netty来进行文件传输也是利用netty天然的优势:零拷贝功能。很多同学都听说过netty的”零拷贝”功能,但是具体体现在哪里又不知道,下面我们就简要介绍下:

Netty的“零拷贝”主要体现在如下三个方面:

  • Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
  • Netty提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便的对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer。
  • Netty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题。

具体的分析在此就不多做介绍,有兴趣的可以查阅相关文档。我们还是把重点放在文件传输上。Netty作为高性能的服务器端异步IO框架必然也离不开文件读写功能,我们可以使用netty模拟http的形式通过网页上传文件写入服务器,当然要使用http的形式那你也用不着netty!大材小用。

netty4中如果想使用http形式上传文件你还得借助第三方jar包:okhttp。使用该jar完成http请求的发送。但是在netty5 中已经为我们写好了,我们可以直接调用netty5的API就可以实现。所以netty4和5的差别还是挺大的,至于使用哪个,那就看你们公司选择哪一个了!本文目前使用netty4来实现文件上传功能。下面我们上代码:

pom文件:

?
1
2
3
4
5
<dependency>
      <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
      <version>4.1.5.Final</version>
</dependency>

server端:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
 
public class FileUploadServer {
    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<Channel>() {
 
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(null))); // 最大长度
                    ch.pipeline().addLast(new FileUploadServerHandler());
                }
            });
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
 
    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        try {
            new FileUploadServer().bind(port);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

server端handler:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
import java.io.File;
import java.io.RandomAccessFile;
 
public class FileUploadServerHandler extends ChannelInboundHandlerAdapter {
    private int byteRead;
    private volatile int start = 0;
    private String file_dir = "D:";
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FileUploadFile) {
            FileUploadFile ef = (FileUploadFile) msg;
            byte[] bytes = ef.getBytes();
            byteRead = ef.getEndPos();
            String md5 = ef.getFile_md5();//文件名
            String path = file_dir + File.separator + md5;
            File file = new File(path);
            RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
            randomAccessFile.seek(start);
            randomAccessFile.write(bytes);
            start = start + byteRead;
            if (byteRead > 0) {
                ctx.writeAndFlush(start);
            } else {
                randomAccessFile.close();
                ctx.close();
            }
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

client端:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.io.File;
 
public class FileUploadClient {
    public void connect(int port, String host, final FileUploadFile fileUploadFile) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<Channel>() {
 
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(null)));
                    ch.pipeline().addLast(new FileUploadClientHandler(fileUploadFile));
                }
            });
            ChannelFuture f = b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
 
    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                e.printStackTrace();
            }
        }
        try {
            FileUploadFile uploadFile = new FileUploadFile();
            File file = new File("c:/1.txt");
            String fileMd5 = file.getName();// 文件名
            uploadFile.setFile(file);
            uploadFile.setFile_md5(fileMd5);
            uploadFile.setStarPos(0);// 文件开始位置
            new FileUploadClient().connect(port, "127.0.0.1", uploadFile);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

client端handler:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
 
public class FileUploadClientHandler extends ChannelInboundHandlerAdapter {
    private int byteRead;
    private volatile int start = 0;
    private volatile int lastLength = 0;
    public RandomAccessFile randomAccessFile;
    private FileUploadFile fileUploadFile;
 
    public FileUploadClientHandler(FileUploadFile ef) {
        if (ef.getFile().exists()) {
            if (!ef.getFile().isFile()) {
                System.out.println("Not a file :" + ef.getFile());
                return;
            }
        }
        this.fileUploadFile = ef;
    }
 
    public void channelActive(ChannelHandlerContext ctx) {
        try {
            randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");
            randomAccessFile.seek(fileUploadFile.getStarPos());
            lastLength = (int) randomAccessFile.length() / 10;
            byte[] bytes = new byte[lastLength];
            if ((byteRead = randomAccessFile.read(bytes)) != -1) {
                fileUploadFile.setEndPos(byteRead);
                fileUploadFile.setBytes(bytes);
                ctx.writeAndFlush(fileUploadFile);
            } else {
                System.out.println("文件已经读完");
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException i) {
            i.printStackTrace();
        }
    }
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof Integer) {
            start = (Integer) msg;
            if (start != -1) {
                randomAccessFile = new RandomAccessFile(fileUploadFile.getFile(), "r");
                randomAccessFile.seek(start);
                System.out.println("块儿长度:" + (randomAccessFile.length() / 10));
                System.out.println("长度:" + (randomAccessFile.length() - start));
                int a = (int) (randomAccessFile.length() - start);
                int b = (int) (randomAccessFile.length() / 10);
                if (a < b) {
                    lastLength = a;
                }
                byte[] bytes = new byte[lastLength];
                System.out.println("-----------------------------" + bytes.length);
                if ((byteRead = randomAccessFile.read(bytes)) != -1 && (randomAccessFile.length() - start) > 0) {
                    System.out.println("byte 长度:" + bytes.length);
                    fileUploadFile.setEndPos(byteRead);
                    fileUploadFile.setBytes(bytes);
                    try {
                        ctx.writeAndFlush(fileUploadFile);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    randomAccessFile.close();
                    ctx.close();
                    System.out.println("文件已经读完--------" + byteRead);
                }
            }
        }
    }
 
    // @Override
    // public void channelRead(ChannelHandlerContext ctx, Object msg) throws
    // Exception {
    // System.out.println("Server is speek :"+msg.toString());
    // FileRegion filer = (FileRegion) msg;
    // String path = "E://Apk//APKMD5.txt";
    // File fl = new File(path);
    // fl.createNewFile();
    // RandomAccessFile rdafile = new RandomAccessFile(path, "rw");
    // FileRegion f = new DefaultFileRegion(rdafile.getChannel(), 0,
    // rdafile.length());
    //
    // System.out.println("This is" + ++counter + "times receive server:["
    // + msg + "]");
    // }
 
    // @Override
    // public void channelReadComplete(ChannelHandlerContext ctx) throws
    // Exception {
    // ctx.flush();
    // }
 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
    // @Override
    // protected void channelRead0(ChannelHandlerContext ctx, String msg)
    // throws Exception {
    // String a = msg;
    // System.out.println("This is"+
    // ++counter+"times receive server:["+msg+"]");
    // }
}

我们还自定义了一个对象,用于统计文件上传进度的:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import java.io.File;
import java.io.Serializable;
 
public class FileUploadFile implements Serializable {
 
 
    private static final long serialVersionUID = 1L;
    private File file;// 文件
    private String file_md5;// 文件名
    private int starPos;// 开始位置
    private byte[] bytes;// 文件字节数组
    private int endPos;// 结尾位置
 
    public int getStarPos() {
        return starPos;
    }
 
    public void setStarPos(int starPos) {
        this.starPos = starPos;
    }
 
    public int getEndPos() {
        return endPos;
    }
 
    public void setEndPos(int endPos) {
        this.endPos = endPos;
    }
 
    public byte[] getBytes() {
        return bytes;
    }
 
    public void setBytes(byte[] bytes) {
        this.bytes = bytes;
    }
 
    public File getFile() {
        return file;
    }
 
    public void setFile(File file) {
        this.file = file;
    }
 
    public String getFile_md5() {
        return file_md5;
    }
 
    public void setFile_md5(String file_md5) {
        this.file_md5 = file_md5;
    }
}

输出为:

块儿长度:894
长度:8052
-----------------------------894
byte 长度:894
块儿长度:894
长度:7158
-----------------------------894
byte 长度:894
块儿长度:894
长度:6264
-----------------------------894
byte 长度:894
块儿长度:894
长度:5370
-----------------------------894
byte 长度:894
块儿长度:894
长度:4476
-----------------------------894
byte 长度:894
块儿长度:894
长度:3582
-----------------------------894
byte 长度:894
块儿长度:894
长度:2688
-----------------------------894
byte 长度:894
块儿长度:894
长度:1794
-----------------------------894
byte 长度:894
块儿长度:894
长度:900
-----------------------------894
byte 长度:894
块儿长度:894
长度:6
-----------------------------6
byte 长度:6
块儿长度:894
长度:0
-----------------------------0
文件已经读完--------0

Process finished with exit code 0

这样就实现了服务器端文件的上传,当然我们也可以使用http的形式。

server端:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
 
 
public class HttpFileServer implements Runnable {
    private int port;
 
    public HttpFileServer(int port) {
        super();
        this.port = port;
    }
 
    @Override
    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup);
        serverBootstrap.channel(NioServerSocketChannel.class);
        //serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
        serverBootstrap.childHandler(new HttpChannelInitlalizer());
        try {
            ChannelFuture f = serverBootstrap.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
 
    public static void main(String[] args) {
        HttpFileServer b = new HttpFileServer(9003);
        new Thread(b).start();
    }
}

Server端initializer:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
 
 
public class HttpChannelInitlalizer extends ChannelInitializer<SocketChannel> {
 
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpServerCodec());
        pipeline.addLast(new HttpObjectAggregator(65536));
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new HttpChannelHandler());
    }
 
}

server端hadler:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.SystemPropertyUtil;
 
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.regex.Pattern;
 
import javax.activation.MimetypesFileTypeMap;
 
public class HttpChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
    public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
    public static final int HTTP_CACHE_SECONDS = 60;
 
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        // 监测解码情况
        if (!request.getDecoderResult().isSuccess()) {
            sendError(ctx, BAD_REQUEST);
            return;
        }
        final String uri = request.getUri();
        final String path = sanitizeUri(uri);
        System.out.println("get file:"+path);
        if (path == null) {
            sendError(ctx, FORBIDDEN);
            return;
        }
        //读取要下载的文件
        File file = new File(path);
        if (file.isHidden() || !file.exists()) {
            sendError(ctx, NOT_FOUND);
            return;
        }
        if (!file.isFile()) {
            sendError(ctx, FORBIDDEN);
            return;
        }
        RandomAccessFile raf;
        try {
            raf = new RandomAccessFile(file, "r");
        } catch (FileNotFoundException ignore) {
            sendError(ctx, NOT_FOUND);
            return;
        }
        long fileLength = raf.length();
        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        HttpHeaders.setContentLength(response, fileLength);
        setContentTypeHeader(response, file);
        //setDateAndCacheHeaders(response, file);
        if (HttpHeaders.isKeepAlive(request)) {
            response.headers().set("CONNECTION", HttpHeaders.Values.KEEP_ALIVE);
        }
 
        // Write the initial line and the header.
        ctx.write(response);
 
        // Write the content.
        ChannelFuture sendFileFuture =
        ctx.write(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), ctx.newProgressivePromise());
        //sendFuture用于监视发送数据的状态
        sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
            @Override
            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
                if (total < 0) { // total unknown
                    System.err.println(future.channel() + " Transfer progress: " + progress);
                } else {
                    System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
                }
            }
 
            @Override
            public void operationComplete(ChannelProgressiveFuture future) {
                System.err.println(future.channel() + " Transfer complete.");
            }
        });
 
        // Write the end marker
        ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
 
        // Decide whether to close the connection or not.
        if (!HttpHeaders.isKeepAlive(request)) {
            // Close the connection when the whole content is written out.
            lastContentFuture.addListener(ChannelFutureListener.CLOSE);
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        if (ctx.channel().isActive()) {
            sendError(ctx, INTERNAL_SERVER_ERROR);
        }
        ctx.close();
    }
 
    private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
 
    private static String sanitizeUri(String uri) {
        // Decode the path.
        try {
            uri = URLDecoder.decode(uri, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new Error(e);
        }
 
        if (!uri.startsWith("/")) {
            return null;
        }
 
        // Convert file separators.
        uri = uri.replace('/', File.separatorChar);
 
        // Simplistic dumb security check.
        // You will have to do something serious in the production environment.
        if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || uri.startsWith(".") || uri.endsWith(".")
                || INSECURE_URI.matcher(uri).matches()) {
            return null;
        }
 
        // Convert to absolute path.
        return SystemPropertyUtil.get("user.dir") + File.separator + uri;
    }
 
 
    private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
        response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
 
        // Close the connection as soon as the error message is sent.
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }
 
    /**
     * Sets the content type header for the HTTP Response
     *
     * @param response
     *            HTTP response
     * @param file
     *            file to extract content type
     */
    private static void setContentTypeHeader(HttpResponse response, File file) {
        MimetypesFileTypeMap m = new MimetypesFileTypeMap();
        String contentType = m.getContentType(file.getPath());
        if (!contentType.equals("application/octet-stream")) {
            contentType += "; charset=utf-8";
        }
        response.headers().set(CONTENT_TYPE, contentType);
    }
 
}

client端:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
 
import java.net.URI;
 
 
public class HttpDownloadClient {
    /**
     * 下载http资源 向服务器下载直接填写要下载的文件的相对路径
     *        (↑↑↑建议只使用字母和数字对特殊字符对字符进行部分过滤可能导致异常↑↑↑)
     *        向互联网下载输入完整路径
     * @param host 目的主机ip或域名
     * @param port 目标主机端口
     * @param url 文件路径
     * @param local 本地存储路径
     * @throws Exception
     */
    public void connect(String host, int port, String url, final String local) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChildChannelHandler(local));
 
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync();
 
            URI uri = new URI(url);
            DefaultFullHttpRequest request = new DefaultFullHttpRequest(
                    HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString());
 
            // 构建http请求
            request.headers().set(HttpHeaders.Names.HOST, host);
            request.headers().set(HttpHeaders.Names.CONNECTION,
                    HttpHeaders.Values.KEEP_ALIVE);
            request.headers().set(HttpHeaders.Names.CONTENT_LENGTH,
                    request.content().readableBytes());
            // 发送http请求
            f.channel().write(request);
            f.channel().flush();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
 
    }
 
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        String local;
        public ChildChannelHandler(String local) {
            this.local = local;
        }
 
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
            ch.pipeline().addLast(new HttpResponseDecoder());
            // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
            ch.pipeline().addLast(new HttpRequestEncoder());
            ch.pipeline().addLast(new ChunkedWriteHandler());
            ch.pipeline().addLast(new HttpDownloadHandler(local));
        }
 
    }
    public static void main(String[] args) throws Exception {
        HttpDownloadClient client = new HttpDownloadClient();
        //client.connect("127.0.0.1", 9003,"/file/pppp/1.doc","1.doc");
//        client.connect("zlysix.gree.com", 80, "http://zlysix.gree.com/HelloWeb/download/20m.apk", "20m.apk");
        client.connect("www.ghost64.com", 80, "http://www.ghost64.com/qqtupian/zixunImg/local/2017/05/27/1495855297602.jpg", "1495855297602.jpg");
 
    }
}

client端handler:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import java.io.File;
import java.io.FileOutputStream;
 
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContent;
//import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.internal.SystemPropertyUtil;
/**
 * @Author:yangyue
 * @Description:
 * @Date: Created in 9:15 on 2017/5/28.
 */
 
public class HttpDownloadHandler extends ChannelInboundHandlerAdapter {
    private boolean readingChunks = false; // 分块读取开关
    private FileOutputStream fOutputStream = null;// 文件输出流
    private File localfile = null;// 下载文件的本地对象
    private String local = null;// 待下载文件名
    private int succCode;// 状态码
 
    public HttpDownloadHandler(String local) {
        this.local = local;
    }
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        if (msg instanceof HttpResponse) {// response头信息
            HttpResponse response = (HttpResponse) msg;
            succCode = response.getStatus().code();
            if (succCode == 200) {
                setDownLoadFile();// 设置下载文件
                readingChunks = true;
            }
            // System.out.println("CONTENT_TYPE:"
            // + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
        }
        if (msg instanceof HttpContent) {// response体信息
            HttpContent chunk = (HttpContent) msg;
            if (chunk instanceof LastHttpContent) {
                readingChunks = false;
            }
 
            ByteBuf buffer = chunk.content();
            byte[] dst = new byte[buffer.readableBytes()];
            if (succCode == 200) {
                while (buffer.isReadable()) {
                    buffer.readBytes(dst);
                    fOutputStream.write(dst);
                    buffer.release();
                }
                if (null != fOutputStream) {
                    fOutputStream.flush();
                }
            }
 
        }
        if (!readingChunks) {
            if (null != fOutputStream) {
                System.out.println("Download done->"+ localfile.getAbsolutePath());
                fOutputStream.flush();
                fOutputStream.close();
                localfile = null;
                fOutputStream = null;
            }
            ctx.channel().close();
        }
    }
 
    /**
     * 配置本地参数,准备下载
     */
    private void setDownLoadFile() throws Exception {
        if (null == fOutputStream) {
            local = SystemPropertyUtil.get("user.dir") + File.separator +local;
            //System.out.println(local);
            localfile = new File(local);
            if (!localfile.exists()) {
                localfile.createNewFile();
            }
            fOutputStream = new FileOutputStream(localfile);
        }
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.out.println("管道异常:" + cause.getMessage());
        cause.printStackTrace();
        ctx.channel().close();
    }
}

这里客户端我放的是网络连接,下载的是一副图片,启动服务端和客户端就可以看到这个图片被下载到了工程的根目录下。

Netty 轻松实现文件上传功能

到此这篇关于Netty 轻松实现文件上传的文章就介绍到这了,更多相关Netty 文件上传内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/javastack/archive/2021/07/06/14976487.html

延伸 · 阅读

精彩推荐
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    这篇文章主要介绍了Java使用SAX解析xml的示例,帮助大家更好的理解和学习使用Java,感兴趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程小米推送Java代码

    小米推送Java代码

    今天小编就为大家分享一篇关于小米推送Java代码,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    富贵稳中求8032021-07-12
  • Java教程Java8中Stream使用的一个注意事项

    Java8中Stream使用的一个注意事项

    最近在工作中发现了对于集合操作转换的神器,java8新特性 stream,但在使用中遇到了一个非常重要的注意点,所以这篇文章主要给大家介绍了关于Java8中S...

    阿杜7472021-02-04
  • Java教程xml与Java对象的转换详解

    xml与Java对象的转换详解

    这篇文章主要介绍了xml与Java对象的转换详解的相关资料,需要的朋友可以参考下...

    Java教程网2942020-09-17
  • Java教程Java BufferWriter写文件写不进去或缺失数据的解决

    Java BufferWriter写文件写不进去或缺失数据的解决

    这篇文章主要介绍了Java BufferWriter写文件写不进去或缺失数据的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望...

    spcoder14552021-10-18
  • Java教程20个非常实用的Java程序代码片段

    20个非常实用的Java程序代码片段

    这篇文章主要为大家分享了20个非常实用的Java程序片段,对java开发项目有所帮助,感兴趣的小伙伴们可以参考一下 ...

    lijiao5352020-04-06
  • Java教程升级IDEA后Lombok不能使用的解决方法

    升级IDEA后Lombok不能使用的解决方法

    最近看到提示IDEA提示升级,寻思已经有好久没有升过级了。升级完毕重启之后,突然发现好多错误,本文就来介绍一下如何解决,感兴趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程Java实现抢红包功能

    Java实现抢红包功能

    这篇文章主要为大家详细介绍了Java实现抢红包功能,采用多线程模拟多人同时抢红包,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙...

    littleschemer13532021-05-16