RxSwift 高阶函数用法

distinctUntilChanged().

是ObservableType的扩展方法,当 ObservableType 的Element 必须是遵循Equatable 协议的类型。 该函数返回一个ObservableType,只有Element的值发生变化时,才会抛出onNext事件。 例如 Observable.of(1,2,1,3,5,4).distinctUntilChanged() , 会抛出 1,2,1,3,5,4 而 Observable.of(1,1,3,5,4).distinctUntilChanged() , 会抛出 1,3,5,4

element(at:)

只通知 指定index的Element 的onNext事件

ignoreElements()

忽略onNext事件,只抛出 onError 和 completed事件

take(count)

只抛出序列中指定count数的事件,满足count 数后,直接抛出 completed

takeLast(count)

只抛出序列中最后的指定count数的事件。满足count 数后,直接抛出 completed

PS: 注意take系列,如果是未指定个数的事件序列,则takeLast是不生效的。

skip(count):

跳过指定个数的事件。

delaySubscription()

在指定的时间内,无法获取到序列事件,到时间后,就能获取到事件了。

delay()

将所有信号事件 delay 指定时间后再发送。订阅者获取到的事件都是延迟后的。

PS:但对于指定个数序列事件的ObservableType,两者行为是一致的,都会延迟 指定时间后,订阅者才收到消息

debounce()

指定信号之间间隔未达到指定时差时,就会忽略前面的事件,只抛出最后一个事件。

groupBy()

可以将指定的Observable 分割成多个 字定义key的Observable。 样例代码(根据自定义的 keySelector,将 1-6的序列化的Observable 分组成 两个GroupedObservable,它们的key 就是自定义的 “奇数”和“偶数”):

Observable.of(1,2,3,4,5,6).groupBy(keySelector:  { (ele) -> String in
            return ele % 2 == 1 ? “奇数” : “偶数”
        }).subscribe(onNext: { group in
            os_log("groupBy key=\(group.key)")
group.asObservable()
                                    .subscribe({ event in
                                        switch event
                                        {
                                        case .next(let v):
                                            os_log("groupBy onNext=\(v)")
                                        default:
                                            break
                                        }
                                })
})

flatMap()

将操作源Observable的每一个元素转换成 对应的一个Observable,然后把这些新生成的Observables 数组拍扁,变成一个 Observable 序列。 当Observable的元素也是一个Observable时,flatMap 很有用。 样例代码:

//声明两个 Observable
	let subject1 = BehaviorSubject(value: "A")
        let subject2 = BehaviorSubject(value: "a")
//用两个Observable 新建一个Observable序列
        let variable = Observable.of(subject1, subject2)
             variable.asObservable()
            .flatMap {
                    $0 //将序列里的Observable直接返回,也就是 subject1 和 subject2
                }
                    .subscribe(onNext: {
                        print("hhhhhhh \($0)")
                        
                    })
                //再发送两个Observable的新值事件
        subject1.onNext("B")
        subject2.onNext("b")

PS:打印出来的应该是 AaBb

flatMapLatest

不同于 flatMap的是,它只响应最后转换的那个Observable的事件了。 同上的代码,使用 flatMapLatest的话,打印出来的应该是 Aab

concatMap()

功能同flatMap,但它在转换 元素为多个Observable的Observable时,必须等待前一个Observable 完成(onCompleted),后一个Observable才能发出数据。 样例代码

	let subject1 = BehaviorSubject(value: "A")
        let subject2 = BehaviorSubject(value: "a")
        let variable = Observable.of(subject1, subject2)
                variable.asObservable()
            .concatMap {
                    $0
                }
                    .subscribe(onNext: {
                        print("hhhhhhh \($0)")
                        
                    })
                
        subject1.onNext("B")
        subject1.onCompleted()   //在subject1 完成后,才开始接收到subject2的事件
	print(“Subject1 Completed”)
        subject2.onNext("b")

PS:打印结果应该是 A B “Subject1 Completed” a b

buffer()

返回一个 Observable<[Element]>, 在原Observable的元素达到指定count 或是 设定的timeSpan 到时了,都会触发一次事件。该事件会将一个 array(count是这段时间内接收到的原Observable的元素个数)。 如果是满足count个数的事件,那么array的元素个数就是指定的count;如果是到期了,则array的元素个数是该时间内接收到的原Observable的元素个数。 样例代码

var subject = PublishSubject<String>()
        subject.buffer(timeSpan: RxTimeInterval.seconds(1), count: 2, scheduler: MainScheduler.instance)
            .subscribe(onNext: { (event) in
                os_log("\(flag) buffer=\(event)")
            })
            .disposed(by: bag)

        subject.onNext("111")
        subject.onNext("222")
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 1.5) {
            subject.onNext("444")
        }

PS:打印结果应该是 [111,222] (第一次,满足了指定count条件) 然后一秒后 [], (这一次,timeSpan到期了,但原Observable 没有发送事件) 再过1秒后 [444] (这一次,也是timeSpan到期,原Observable 发送了一个 444)

window()

返回一个 Observable<Observable>

timeSpan

指定该Observable发送的事件最大间隔。count指定 收到的作为Element的 Observable所能发送的最大事件个数。 一旦timespan到期,或是 作为Element的Observable 已发送了count个事件,原Observable都会重新发送一个Observable过来,而上一个则会 Completed。 样例代码:

subject = PublishSubject<String>()
        subject.window(timeSpan: RxTimeInterval.seconds(1), count: 2, scheduler: MainScheduler.instance)
            .subscribe(onNext: { (event) in
                os_log("get window event")
                event.subscribe(onNext: {
                    os_log("\(flag) window=\($0)")
                },onCompleted: {
                    os_log("\(flag) subwindow completed")
                }).disposed(by: bag)

            },onCompleted: {
                os_log("\(flag) window completed")
            })
            .disposed(by: bag)
//subject 会先发送第一个Observable,该Observable 会发送 111, 222 两个事件,然后completed
        subject.onNext("111")
        subject.onNext("222")
//接着发送第二个Observable,该Observable会发送 333 这个事件,由于指定的timeSpan 1秒内,原Observable(subject)没有其他事件,则该Observable会再1秒后complete
        subject.onNext(“333”)
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 1.5) {
//第三个Observable会在 1秒时发送,接着 0.5秒后收到subject的 444消息。 
            subject.onNext(“444”)
        }

Log输出如下: 2022-04-25 15:26:21.78 get window event 2022-04-25 15:26:21.78 RxSwiftGaojie window=111 2022-04-25 15:26:21.78 RxSwiftGaojie window=222 2022-04-25 15:26:21.78 RxSwiftGaojie subwindow completed //满足count个数,该Observable 会complete 2022-04-25 15:26:21.78 get window event 2022-04-25 15:26:21.78 RxSwiftGaojie window=333 2022-04-25 15:26:22.78 RxSwiftGaojie subwindow completed //第二个Observable 1秒内只收到1个事件,到期了会complete 2022-04-25 15:26:22.78 get window event 2022-04-25 15:26:23.42 RxSwiftGaojie window=444 //第三个Observable在 0.5秒后收到了444消息 2022-04-25 15:26:23.78 RxSwiftGaojie subwindow completed //第三个Observable 同第二个,到期了自动complete

combineLatest()

可以将多个Observable的事件整合为一个Observable 必须每个Observable都有onNext事件,才会触发整合Observable的 onNext。 completed事件同上,所有Observable都触发了complete才行。 Error 只要其中一个 Observable触发了,就能触发整合Observable的error 样例代码:

        let comSub = BehaviorSubject(value: 1)
        let comSub2 = PublishSubject<String>()
Observable.combineLatest(comSub, comSub2)
            .subscribe({ event in
                switch event {
                case .next(let i, let s):
                    print("\(s)")
                    break
                case .completed:
                    break
                case .error(let e):
                    break
                default:
                    break
                }
            })
comSub2.onNext(“A”)  //会触发整合Observable的next事件,元素值分别为两个subject的最后一次元素值
comSub.onCompleted()
comSub2.onCompleted() //触发整合Observable的complete事件

zip()

将多个Observable 整合为一个多参数的Observable。 打包的Observable 必须事件个数完全一样,才会触发新Observable的事件,会将所有Observable相同index的事件打包触发。 样例代码:

        let zip1 = BehaviorSubject(value: 1)
        let zip2 = BehaviorSubject(value: "a")
        Observable.zip(zip1, zip2)
            .subscribe(onNext: { (i, s) in
                print("zip 1=\(i) 2=\(s)")
            }, onCompleted: {
                print("")
            }).dispose()
        zip1.onNext(2)
        zip1.onNext(3) //zip1 有三个事件了,但zip2 只有一个,不会触发onNext
        zip2.onNext("b") //zip2 的第二个事件,与 zip1的第二个一起打包触发
        zip1.onCompleted()
        zip2.onCompleted()

那么打印出来的应该是 zip 1=1 2=a zip 1=2 2=b