背景
公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用activemq等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发
一、本文涉及知识点
- excel文件读写--阿里easyexcel sdk
- 文件上传、下载--腾讯云对象存储
- 远程服务调用--resttemplate
- 生产者、消费者--redistemplate leftpush和rightpop操作
- 异步处理数据--executors线程池
- 读取网络文件流--httpclient
- 自定义注解实现用户身份认证--jwt token认证, 拦截器拦截标注有@loginrequired注解的请求入口
当然, java实现咯
涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习
二、项目目录结构
说明: 数据库dao层放到另一个模块了, 不是本文重点
三、主要maven依赖
1、easyexcel
1
2
3
4
5
6
7
|
<easyexcel-latestversion> 1.1 . 2 -beta4</easyexcel-latestversion> <dependency> <groupid>com.alibaba</groupid> <artifactid>easyexcel</artifactid> <version>${easyexcel-latestversion}</version> </dependency> |
jwt
1
2
3
4
5
|
<dependency> <groupid>io.jsonwebtoken</groupid> <artifactid>jjwt</artifactid> <version> 0.7 . 0 </version> </dependency> |
redis
1
2
3
4
5
|
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-redis</artifactid> <version> 1.3 . 5 .release</version> </dependency> |
腾讯cos
1
2
3
4
5
|
<dependency> <groupid>com.qcloud</groupid> <artifactid>cos_api</artifactid> <version> 5.4 . 5 </version> </dependency> |
四、流程
- 用户上传文件
- 将文件存储到腾讯cos
- 将上传后的文件id及上传记录保存到数据库
- redis生产一条导入消息, 即保存文件id到redis
- 请求结束, 返回"处理中"状态
- redis消费消息
- 读取cos文件, 异步处理数据
- 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"
- 客户端轮询查询处理状态, 并可以下载错误文件
- 结束
五、实现效果
上传文件
数据库导入记录
导入的数据
下载错误文件
错误数据提示
查询导入记录
六、代码实现
1、导入excel控制层
1
2
3
4
5
6
|
@loginrequired @requestmapping (value = "doimport" , method = requestmethod.post) public jsonresponse doimport( @requestparam ( "file" ) multipartfile file, httpservletrequest request) { pluser user = getuser(request); return orderimportservice.doimport(file, user.getid()); } |
2、service层
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
@override public jsonresponse doimport(multipartfile file, integer userid) { if ( null == file || file.isempty()) { throw new serviceexception( "文件不能为空" ); } string filename = file.getoriginalfilename(); if (!checkfilesuffix(filename)) { throw new serviceexception( "当前仅支持xlsx格式的excel" ); } // 存储文件 string fileid = savetooss(file); if (stringutils.isblank(fileid)) { throw new serviceexception( "文件上传失败, 请稍后重试" ); } // 保存记录到数据库 saverecordtodb(userid, fileid, filename); // 生产一条订单导入消息 redisproducer.produce(rediskey.orderimportkey, fileid); return jsonresponse.ok( "导入成功, 处理中..." ); } /** * 校验文件格式 * @param filename * @return */ private static boolean checkfilesuffix(string filename) { if (stringutils.isblank(filename) || filename.lastindexof( "." ) <= 0 ) { return false ; } int pointindex = filename.lastindexof( "." ); string suffix = filename.substring(pointindex, filename.length()).tolowercase(); if ( ".xlsx" .equals(suffix)) { return true ; } return false ; } /** * 将文件存储到腾讯oss * @param file * @return */ private string savetooss(multipartfile file) { inputstream ins = null ; try { ins = file.getinputstream(); } catch (ioexception e) { e.printstacktrace(); } string fileid; try { string originalfilename = file.getoriginalfilename(); file f = new file(originalfilename); inputstreamtofile(ins, f); filesystemresource resource = new filesystemresource(f); multivaluemap<string, object> param = new linkedmultivaluemap<>(); param.add( "file" , resource); responseresult responseresult = resttemplate.postforobject(txossuploadurl, param, responseresult. class ); fileid = (string) responseresult.getdata(); } catch (exception e) { fileid = null ; } return fileid; } |
3、redis生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@service public class redisproducerimpl implements redisproducer { @autowired private redistemplate redistemplate; @override public jsonresponse produce(string key, string msg) { map<string, string> map = maps.newhashmap(); map.put( "fileid" , msg); redistemplate.opsforlist().leftpush(key, map); return jsonresponse.ok(); } } |
4、redis消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
@service public class redisconsumer { @autowired public redistemplate redistemplate; @value ( "${txossfileurl}" ) private string txossfileurl; @value ( "${txossuploadurl}" ) private string txossuploadurl; @postconstruct public void init() { processorderimport(); } /** * 处理订单导入 */ private void processorderimport() { executorservice executorservice = executors.newcachedthreadpool(); executorservice.execute(() -> { while ( true ) { object object = redistemplate.opsforlist().rightpop(rediskey.orderimportkey, 1 , timeunit.seconds); if ( null == object) { continue ; } string msg = json.tojsonstring(object); executorservice.execute( new orderimporttask(msg, txossfileurl, txossuploadurl)); } }); } } |
5、处理任务线程类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
public class orderimporttask implements runnable { public orderimporttask(string msg, string txossfileurl, string txossuploadurl) { this .msg = msg; this .txossfileurl = txossfileurl; this .txossuploadurl = txossuploadurl; } } /** * 注入bean */ private void autowirebean() { this .resttemplate = beancontext.getapplicationcontext().getbean(resttemplate. class ); this .transactiontemplate = beancontext.getapplicationcontext().getbean(transactiontemplate. class ); this .orderimportservice = beancontext.getapplicationcontext().getbean(orderimportservice. class ); } @override public void run() { // 注入bean autowirebean(); jsonobject jsonobject = json.parseobject(msg); string fileid = jsonobject.getstring( "fileid" ); multivaluemap<string, object> param = new linkedmultivaluemap<>(); param.add( "id" , fileid); responseresult responseresult = resttemplate.postforobject(txossfileurl, param, responseresult. class ); string fileurl = (string) responseresult.getdata(); if (stringutils.isblank(fileurl)) { return ; } inputstream inputstream = httpclientutil.readfilefromurl(fileurl); list<object> list = excelutil.read(inputstream); process(list, fileid); } /** * 将文件上传至oss * @param file * @return */ private string savetooss(file file) { string fileid; try { filesystemresource resource = new filesystemresource(file); multivaluemap<string, object> param = new linkedmultivaluemap<>(); param.add( "file" , resource); responseresult responseresult = resttemplate.postforobject(txossuploadurl, param, responseresult. class ); fileid = (string) responseresult.getdata(); } catch (exception e) { fileid = null ; } return fileid; } |
说明: 处理数据的业务逻辑代码就不用贴了
6、上传文件到cos
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
@requestmapping ( "/txossupload" ) @responsebody public responseresult txossupload( @requestparam ( "file" ) multipartfile file) throws unsupportedencodingexception { if ( null == file || file.isempty()) { return responseresult.fail( "文件不能为空" ); } string originalfilename = file.getoriginalfilename(); originalfilename = mimeutility.decodetext(originalfilename); // 解决中文乱码问题 string contenttype = getcontenttype(originalfilename); string key; inputstream ins = null ; file f = null ; try { ins = file.getinputstream(); f = new file(originalfilename); inputstreamtofile(ins, f); key = ifilestorageclient.txossupload( new fileinputstream(f), originalfilename, contenttype); } catch (exception e) { return responseresult.fail(e.getmessage()); } finally { if ( null != ins) { try { ins.close(); } catch (ioexception e) { e.printstacktrace(); } } if (f.exists()) { // 删除临时文件 f.delete(); } } return responseresult.ok(key); } public static void inputstreamtofile(inputstream ins,file file) { try { outputstream os = new fileoutputstream(file); int bytesread = 0 ; byte [] buffer = new byte [ 8192 ]; while ((bytesread = ins.read(buffer, 0 , 8192 )) != - 1 ) { os.write(buffer, 0 , bytesread); } os.close(); ins.close(); } catch (exception e) { e.printstacktrace(); } } public string txossupload(fileinputstream inputstream, string key, string contenttype) { key = uuid.getuuid() + "-" + key; ossutil.txossupload(inputstream, key, contenttype); try { if ( null != inputstream) { inputstream.close(); } } catch (ioexception e) { e.printstacktrace(); } return key; } public static void txossupload(fileinputstream inputstream, string key, string contenttype) { objectmetadata objectmetadata = new objectmetadata(); try { int length = inputstream.available(); objectmetadata.setcontentlength(length); } catch (exception e){ logger.info(e.getmessage()); } objectmetadata.setcontenttype(contenttype); cosclient.putobject(txbucketname, key, inputstream, objectmetadata); } |
7、下载文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
/** * 腾讯云文件下载 * @param response * @param id * @return */ @requestmapping ( "/txossdownload" ) public object txossdownload(httpservletresponse response, string id) { cosobjectinputstream cosobjectinputstream = ifilestorageclient.txossdownload(id, response); string contenttype = getcontenttype(id); fileutil.txossdownload(response, contenttype, cosobjectinputstream, id); return null ; } public static void txossdownload(httpservletresponse response, string contenttype, inputstream filestream, string filename) { fileoutputstream fos = null ; response.reset(); outputstream os = null ; try { response.setcontenttype(contenttype + "; charset=utf-8" ); if (!contenttype.equals(plconstans.filecontenttype.image)){ try { response.setheader( "content-disposition" , "attachment; filename=" + new string(filename.getbytes( "utf-8" ), "iso8859-1" )); } catch (unsupportedencodingexception e) { response.setheader( "content-disposition" , "attachment; filename=" + filename); logger.error( "encoding file name failed" , e); } } os = response.getoutputstream(); byte [] b = new byte [ 1024 * 1024 ]; int len; while ((len = filestream.read(b)) > 0 ) { os.write(b, 0 , len); os.flush(); try { if (fos != null ) { fos.write(b, 0 , len); fos.flush(); } } catch (exception e) { logger.error(e.getmessage()); } } } catch (ioexception e) { ioutils.closequietly(fos); fos = null ; } finally { ioutils.closequietly(os); ioutils.closequietly(filestream); if (fos != null ) { ioutils.closequietly(fos); } } } |
8、读取网络文件流
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
/** * 读取网络文件流 * @param url * @return */ public static inputstream readfilefromurl(string url) { if (stringutils.isblank(url)) { return null ; } httpclient httpclient = new defaulthttpclient(); httpget methodget = new httpget(url); try { httpresponse response = httpclient.execute(methodget); if (response.getstatusline().getstatuscode() == 200 ) { httpentity entity = response.getentity(); return entity.getcontent(); } } catch (exception e) { e.printstacktrace(); } return null ; } |
9、excelutil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/** * 读excel * @param inputstream 文件输入流 * @return list集合 */ public static list<object> read(inputstream inputstream) { return easyexcelfactory.read(inputstream, new sheet( 1 , 1 )); } /** * 写excel * @param data list数据 * @param clazz * @param savefilepath 文件保存路径 * @throws ioexception */ public static void write(list<? extends baserowmodel> data, class <? extends baserowmodel> clazz, string savefilepath) throws ioexception { file tempfile = new file(savefilepath); outputstream out = new fileoutputstream(tempfile); excelwriter writer = easyexcelfactory.getwriter(out); sheet sheet = new sheet( 1 , 3 , clazz, "sheet1" , null ); writer.write(data, sheet); writer.finish(); out.close(); } |
说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考
七、其他
1、@loginrequired注解
1
2
3
4
5
6
7
|
/** * 在需要登录验证的controller的方法上使用此注解 */ @target ({elementtype.method}) @retention (retentionpolicy.runtime) public @interface loginrequired { } |
2、mycontrolleradvice
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@controlleradvice public class mycontrolleradvice { @responsebody @exceptionhandler (tokenvalidationexception. class ) public jsonresponse tokenvalidationexceptionhandler() { return jsonresponse.logininvalid(); } @responsebody @exceptionhandler (serviceexception. class ) public jsonresponse serviceexceptionhandler(serviceexception se) { return jsonresponse.fail(se.getmsg()); } @responsebody @exceptionhandler (exception. class ) public jsonresponse exceptionhandler(exception e) { e.printstacktrace(); return jsonresponse.fail(e.getmessage()); } } |
3、authenticationinterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
public class authenticationinterceptor implements handlerinterceptor { private static final string current_user = "user" ; @autowired private userservice userservice; @override public boolean prehandle(httpservletrequest request, httpservletresponse response, object handler) { // 如果不是映射到方法直接通过 if (!(handler instanceof handlermethod)) { return true ; } handlermethod handlermethod = (handlermethod) handler; method method = handlermethod.getmethod(); // 判断接口是否有@loginrequired注解, 有则需要登录 loginrequired methodannotation = method.getannotation(loginrequired. class ); if (methodannotation != null ) { // 验证token integer userid = jwtutil.verifytoken(request); pluser pluser = userservice.selectbyprimarykey(userid); if ( null == pluser) { throw new runtimeexception( "用户不存在,请重新登录" ); } request.setattribute(current_user, pluser); return true ; } return true ; } @override public void posthandle(httpservletrequest httpservletrequest, httpservletresponse httpservletresponse, object o, modelandview modelandview) throws exception { } @override public void aftercompletion(httpservletrequest httpservletrequest, httpservletresponse httpservletresponse, object o, exception e) throws exception { } } |
4、jwtutil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
public static final long expiration_time = 2592_000_000l; // 有效期30天 public static final string secret = "pl_token_secret" ; public static final string header = "token" ; public static final string user_id = "userid" ; /** * 根据userid生成token * @param userid * @return */ public static string generatetoken(string userid) { hashmap<string, object> map = new hashmap<>(); map.put(user_id, userid); string jwt = jwts.builder() .setclaims(map) .setexpiration( new date(system.currenttimemillis() + expiration_time)) .signwith(signaturealgorithm.hs512, secret) .compact(); return jwt; } /** * 验证token * @param request * @return 验证通过返回userid */ public static integer verifytoken(httpservletrequest request) { string token = request.getheader(header); if (token != null ) { try { map<string, object> body = jwts.parser() .setsigningkey(secret) .parseclaimsjws(token) .getbody(); for (map.entry entry : body.entryset()) { object key = entry.getkey(); object value = entry.getvalue(); if (key.tostring().equals(user_id)) { return integer.valueof(value.tostring()); // userid } } return null ; } catch (exception e) { logger.error(e.getmessage()); throw new tokenvalidationexception( "unauthorized" ); } } else { throw new tokenvalidationexception( "missing token" ); } } |
结语: ok, 搞定,睡了, 好困
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对服务器之家的支持。
原文链接:https://www.cnblogs.com/wangzaiplus/p/10660520.html