前言
最近在学习Retrofit,虽然Retrofit没有提供文件下载进度的回调,但是Retrofit底层依赖的是OkHttp,实际上所需要的实现OkHttp对下载进度的监听,在OkHttp的官方Demo中,有一个Progress.java的文件,顾名思义。点我查看。
准备工作
本文采用Dagger2,Retrofit,RxJava。
1
2
3
4
5
6
7
8
9
10
|
compile 'com.squareup.retrofit2:retrofit:2.0.2' compile 'com.squareup.retrofit2:converter-gson:2.0.2' compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2' //dagger2 compile 'com.google.dagger:dagger:2.6' apt 'com.google.dagger:dagger-compiler:2.6' //RxJava compile 'io.reactivex:rxandroid:1.2.0' compile 'io.reactivex:rxjava:1.1.5' compile 'com.jakewharton.rxbinding:rxbinding:0.4.0' |
改造ResponseBody
okHttp3默认的ResponseBody因为不知道进度的相关信息,所以需要对其进行改造。可以使用接口监听进度信息。这里采用的是RxBus发送FileLoadEvent对象实现对下载进度的实时更新。这里先讲改造的ProgressResponseBody。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
public class ProgressResponseBody extends ResponseBody { private ResponseBody responseBody; private BufferedSource bufferedSource; public ProgressResponseBody(ResponseBody responseBody) { this .responseBody = responseBody; } @Override public MediaType contentType() { return responseBody.contentType(); } @Override public long contentLength() { return responseBody.contentLength(); } @Override public BufferedSource source() { if (bufferedSource == null ) { bufferedSource = Okio.buffer(source(responseBody.source())); } return bufferedSource; } private Source source(Source source) { return new ForwardingSource(source) { long bytesReaded = 0 ; @Override public long read(Buffer sink, long byteCount) throws IOException { long bytesRead = super .read(sink, byteCount); bytesReaded += bytesRead == - 1 ? 0 : bytesRead; //实时发送当前已读取的字节和总字节 RxBus.getInstance().post( new FileLoadEvent(contentLength(), bytesReaded)); return bytesRead; } }; } } |
呃,OKIO相关知识我也正在学,这个是从官方Demo中copy的代码,只不过中间使用了RxBus实时发送FileLoadEvent对象。
FileLoadEvent
FileLoadEvent很简单,包含了当前已加载进度和文件总大小。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public class FileLoadEvent { long total; long bytesLoaded; public long getBytesLoaded() { return bytesLoaded; } public long getTotal() { return total; } public FileLoadEvent( long total, long bytesLoaded) { this .total = total; this .bytesLoaded = bytesLoaded; } } |
RxBus
RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用OTTO或者 EventBus。点我查看详情。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
public class RxBus { private static volatile RxBus mInstance; private SerializedSubject<Object, Object> mSubject; private HashMap<String, CompositeSubscription> mSubscriptionMap; /** * PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者 * Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题, * 需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。 */ private RxBus() { mSubject = new SerializedSubject<>(PublishSubject.create()); } /** * 单例 双重锁 * @return */ public static RxBus getInstance() { if (mInstance == null ) { synchronized (RxBus. class ) { if (mInstance == null ) { mInstance = new RxBus(); } } } return mInstance; } /** * 发送一个新的事件 * @param o */ public void post(Object o) { mSubject.onNext(o); } /** * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者 * @param type * @param <T> * @return */ public <T> Observable<T> tObservable( final Class<T> type) { //ofType操作符只发射指定类型的数据,其内部就是filter+cast return mSubject.ofType(type); } public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) { return tObservable(type) .onBackpressureBuffer() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(next, error); } public void addSubscription(Object o, Subscription subscription) { if (mSubscriptionMap == null ) { mSubscriptionMap = new HashMap<>(); } String key = o.getClass().getName(); if (mSubscriptionMap.get(key) != null ) { mSubscriptionMap.get(key).add(subscription); } else { CompositeSubscription compositeSubscription = new CompositeSubscription(); compositeSubscription.add(subscription); mSubscriptionMap.put(key, compositeSubscription); // Log.e("air", "addSubscription:订阅成功 " ); } } public void unSubscribe(Object o) { if (mSubscriptionMap == null ) { return ; } String key = o.getClass().getName(); if (!mSubscriptionMap.containsKey(key)) { return ; } if (mSubscriptionMap.get(key) != null ) { mSubscriptionMap.get(key).unsubscribe(); } mSubscriptionMap.remove(key); //Log.e("air", "unSubscribe: 取消订阅" ); } } |
FileCallBack
那么,重点来了。代码其实有5个方法需要重写,好吧,其实这些方法可以精简一下。其中progress()方法有两个参数,progress和total,分别表示文件已下载的大小和总大小,我们将这两个参数不断更新到UI上就行了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
public abstract class FileCallBack<T> { private String destFileDir; private String destFileName; public FileCallBack(String destFileDir, String destFileName) { this .destFileDir = destFileDir; this .destFileName = destFileName; subscribeLoadProgress(); } public abstract void onSuccess(T t); public abstract void progress( long progress, long total); public abstract void onStart(); public abstract void onCompleted(); public abstract void onError(Throwable e); public void saveFile(ResponseBody body) { InputStream is = null ; byte [] buf = new byte [ 2048 ]; int len; FileOutputStream fos = null ; try { is = body.byteStream(); File dir = new File(destFileDir); if (!dir.exists()) { dir.mkdirs(); } File file = new File(dir, destFileName); fos = new FileOutputStream(file); while ((len = is.read(buf)) != - 1 ) { fos.write(buf, 0 , len); } fos.flush(); unsubscribe(); //onCompleted(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { if (is != null ) is.close(); if (fos != null ) fos.close(); } catch (IOException e) { Log.e( "saveFile" , e.getMessage()); } } } /** * 订阅加载的进度条 */ public void subscribeLoadProgress() { Subscription subscription = RxBus.getInstance().doSubscribe(FileLoadEvent. class , new Action1<FileLoadEvent>() { @Override public void call(FileLoadEvent fileLoadEvent) { progress(fileLoadEvent.getBytesLoaded(),fileLoadEvent.getTotal()); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { //TODO 对异常的处理 } }); RxBus.getInstance().addSubscription( this , subscription); } /** * 取消订阅,防止内存泄漏 */ public void unsubscribe() { RxBus.getInstance().unSubscribe( this ); } } |
开始下载
使用自己的ProgressResponseBody
通过OkHttpClient的拦截器去拦截Response,并将我们的ProgressReponseBody设置进去监听进度。
1
2
3
4
5
6
7
8
9
|
public class ProgressInterceptor implements Interceptor { @Override public Response intercept(Chain chain) throws IOException { Response originalResponse = chain.proceed(chain.request()); return originalResponse.newBuilder() .body( new ProgressResponseBody(originalResponse.body())) .build(); } } |
构建Retrofit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
@Module public class ApiModule { @Provides @Singleton public OkHttpClient provideClient() { OkHttpClient client = new OkHttpClient.Builder() .addInterceptor( new ProgressInterceptor()) .build(); return client; } @Provides @Singleton public Retrofit provideRetrofit(OkHttpClient client){ Retrofit retrofit = new Retrofit.Builder() .client(client) .baseUrl(Constant.HOST) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .addConverterFactory(GsonConverterFactory.create()) .build(); return retrofit; } @Provides @Singleton public ApiInfo provideApiInfo(Retrofit retrofit){ return retrofit.create(ApiInfo. class ); } @Provides @Singleton public ApiManager provideApiManager(Application application, ApiInfo apiInfo){ return new ApiManager(application,apiInfo); } } |
请求接口
1
2
3
4
5
|
public interface ApiInfo { @Streaming @GET Observable<ResponseBody> download( @Url String url); } |
执行请求
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public void load(String url, final FileCallBack<ResponseBody> callBack){ apiInfo.download(url) .subscribeOn(Schedulers.io()) //请求网络 在调度者的io线程 .observeOn(Schedulers.io()) //指定线程保存文件 .doOnNext( new Action1<ResponseBody>() { @Override public void call(ResponseBody body) { callBack.saveFile(body); } }) .observeOn(AndroidSchedulers.mainThread()) //在主线程中更新ui .subscribe( new FileSubscriber<ResponseBody>(application,callBack)); } |
在presenter层中执行网络请求。
通过V层依赖注入的presenter对象调用请求网络,请求网络后调用V层更新UI的操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public void load(String url){ String fileName = "app.apk" ; String fileStoreDir = Environment.getExternalStorageDirectory().getAbsolutePath(); Log.e(TAG, "load: " +fileStoreDir.toString() ); FileCallBack<ResponseBody> callBack = new FileCallBack<ResponseBody>(fileStoreDir,fileName) { @Override public void onSuccess( final ResponseBody responseBody) { } @Override public void progress( long progress, long total) { iHomeView.update(total,progress); } @Override public void onStart() { iHomeView.showLoading(); } @Override public void onCompleted() { iHomeView.hideLoading(); } @Override public void onError(Throwable e) { //TODO: 对异常的一些处理 e.printStackTrace(); } }; apiManager.load(url, callBack); } |
踩到的坑。
依赖的Retrofit版本一定要保持一致!!!说多了都是泪啊。
保存文件时要使用RxJava的doOnNext操作符,后续更新UI的操作切换到UI线程。
总结
看似代码很多,其实过程并不复杂:
在保存文件时,调用ForwardingSource的read方法,通过RxBus发送实时的FileLoadEvent对象。
FileCallBack订阅RxBus发送的FileLoadEvent。通过接收到FileLoadEvent中的下载进度和文件总大小对UI进行更新。
在下载保存文件完成后,取消订阅,防止内存泄漏。
Demo地址:https://github.com/AirMiya/DownloadDemo
原文链接:http://www.jianshu.com/p/060d55fc1c82