是ObservableType的扩展方法,当 ObservableType 的Element 必须是遵循Equatable 协议的类型。 该函数返回一个ObservableType,只有Element的值发生变化时,才会抛出onNext事件。 例如 Observable.of(1,2,1,3,5,4).distinctUntilChanged() , 会抛出 1,2,1,3,5,4 而 Observable.of(1,1,3,5,4).distinctUntilChanged() , 会抛出 1,3,5,4
只通知 指定index的Element 的onNext事件
忽略onNext事件,只抛出 onError 和 completed事件
只抛出序列中指定count数的事件,满足count 数后,直接抛出 completed
只抛出序列中最后的指定count数的事件。满足count 数后,直接抛出 completed
PS: 注意take系列,如果是未指定个数的事件序列,则takeLast是不生效的。
跳过指定个数的事件。
在指定的时间内,无法获取到序列事件,到时间后,就能获取到事件了。
将所有信号事件 delay 指定时间后再发送。订阅者获取到的事件都是延迟后的。
PS:但对于指定个数序列事件的ObservableType,两者行为是一致的,都会延迟 指定时间后,订阅者才收到消息
指定信号之间间隔未达到指定时差时,就会忽略前面的事件,只抛出最后一个事件。
可以将指定的Observable 分割成多个 字定义key的Observable。 样例代码(根据自定义的 keySelector,将 1-6的序列化的Observable 分组成 两个GroupedObservable,它们的key 就是自定义的 “奇数”和“偶数”):
Observable.of(1,2,3,4,5,6).groupBy(keySelector: { (ele) -> String in
return ele % 2 == 1 ? “奇数” : “偶数”
}).subscribe(onNext: { group in
os_log("groupBy key=\(group.key)")
group.asObservable()
.subscribe({ event in
switch event
{
case .next(let v):
os_log("groupBy onNext=\(v)")
default:
break
}
})
})
将操作源Observable的每一个元素转换成 对应的一个Observable,然后把这些新生成的Observables 数组拍扁,变成一个 Observable 序列。 当Observable的元素也是一个Observable时,flatMap 很有用。 样例代码:
//声明两个 Observable
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "a")
//用两个Observable 新建一个Observable序列
let variable = Observable.of(subject1, subject2)
variable.asObservable()
.flatMap {
$0 //将序列里的Observable直接返回,也就是 subject1 和 subject2
}
.subscribe(onNext: {
print("hhhhhhh \($0)")
})
//再发送两个Observable的新值事件
subject1.onNext("B")
subject2.onNext("b")
PS:打印出来的应该是 AaBb
不同于 flatMap的是,它只响应最后转换的那个Observable的事件了。 同上的代码,使用 flatMapLatest的话,打印出来的应该是 Aab
功能同flatMap,但它在转换 元素为多个Observable的Observable时,必须等待前一个Observable 完成(onCompleted),后一个Observable才能发出数据。 样例代码
let subject1 = BehaviorSubject(value: "A")
let subject2 = BehaviorSubject(value: "a")
let variable = Observable.of(subject1, subject2)
variable.asObservable()
.concatMap {
$0
}
.subscribe(onNext: {
print("hhhhhhh \($0)")
})
subject1.onNext("B")
subject1.onCompleted() //在subject1 完成后,才开始接收到subject2的事件
print(“Subject1 Completed”)
subject2.onNext("b")
PS:打印结果应该是 A B “Subject1 Completed” a b
返回一个 Observable<[Element]>, 在原Observable的元素达到指定count 或是 设定的timeSpan 到时了,都会触发一次事件。该事件会将一个 array(count是这段时间内接收到的原Observable的元素个数)。 如果是满足count个数的事件,那么array的元素个数就是指定的count;如果是到期了,则array的元素个数是该时间内接收到的原Observable的元素个数。 样例代码
var subject = PublishSubject<String>()
subject.buffer(timeSpan: RxTimeInterval.seconds(1), count: 2, scheduler: MainScheduler.instance)
.subscribe(onNext: { (event) in
os_log("\(flag) buffer=\(event)")
})
.disposed(by: bag)
subject.onNext("111")
subject.onNext("222")
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 1.5) {
subject.onNext("444")
}
PS:打印结果应该是 [111,222] (第一次,满足了指定count条件) 然后一秒后 [], (这一次,timeSpan到期了,但原Observable 没有发送事件) 再过1秒后 [444] (这一次,也是timeSpan到期,原Observable 发送了一个 444)
返回一个 Observable<Observable>
指定该Observable发送的事件最大间隔。count指定 收到的作为Element的 Observable所能发送的最大事件个数。 一旦timespan到期,或是 作为Element的Observable 已发送了count个事件,原Observable都会重新发送一个Observable过来,而上一个则会 Completed。 样例代码:
subject = PublishSubject<String>()
subject.window(timeSpan: RxTimeInterval.seconds(1), count: 2, scheduler: MainScheduler.instance)
.subscribe(onNext: { (event) in
os_log("get window event")
event.subscribe(onNext: {
os_log("\(flag) window=\($0)")
},onCompleted: {
os_log("\(flag) subwindow completed")
}).disposed(by: bag)
},onCompleted: {
os_log("\(flag) window completed")
})
.disposed(by: bag)
//subject 会先发送第一个Observable,该Observable 会发送 111, 222 两个事件,然后completed
subject.onNext("111")
subject.onNext("222")
//接着发送第二个Observable,该Observable会发送 333 这个事件,由于指定的timeSpan 1秒内,原Observable(subject)没有其他事件,则该Observable会再1秒后complete
subject.onNext(“333”)
DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 1.5) {
//第三个Observable会在 1秒时发送,接着 0.5秒后收到subject的 444消息。
subject.onNext(“444”)
}
Log输出如下: 2022-04-25 15:26:21.78 get window event 2022-04-25 15:26:21.78 RxSwiftGaojie window=111 2022-04-25 15:26:21.78 RxSwiftGaojie window=222 2022-04-25 15:26:21.78 RxSwiftGaojie subwindow completed //满足count个数,该Observable 会complete 2022-04-25 15:26:21.78 get window event 2022-04-25 15:26:21.78 RxSwiftGaojie window=333 2022-04-25 15:26:22.78 RxSwiftGaojie subwindow completed //第二个Observable 1秒内只收到1个事件,到期了会complete 2022-04-25 15:26:22.78 get window event 2022-04-25 15:26:23.42 RxSwiftGaojie window=444 //第三个Observable在 0.5秒后收到了444消息 2022-04-25 15:26:23.78 RxSwiftGaojie subwindow completed //第三个Observable 同第二个,到期了自动complete
可以将多个Observable的事件整合为一个Observable 必须每个Observable都有onNext事件,才会触发整合Observable的 onNext。 completed事件同上,所有Observable都触发了complete才行。 Error 只要其中一个 Observable触发了,就能触发整合Observable的error 样例代码:
let comSub = BehaviorSubject(value: 1)
let comSub2 = PublishSubject<String>()
Observable.combineLatest(comSub, comSub2)
.subscribe({ event in
switch event {
case .next(let i, let s):
print("\(s)")
break
case .completed:
break
case .error(let e):
break
default:
break
}
})
comSub2.onNext(“A”) //会触发整合Observable的next事件,元素值分别为两个subject的最后一次元素值
comSub.onCompleted()
comSub2.onCompleted() //触发整合Observable的complete事件
将多个Observable 整合为一个多参数的Observable。 打包的Observable 必须事件个数完全一样,才会触发新Observable的事件,会将所有Observable相同index的事件打包触发。 样例代码:
let zip1 = BehaviorSubject(value: 1)
let zip2 = BehaviorSubject(value: "a")
Observable.zip(zip1, zip2)
.subscribe(onNext: { (i, s) in
print("zip 1=\(i) 2=\(s)")
}, onCompleted: {
print("")
}).dispose()
zip1.onNext(2)
zip1.onNext(3) //zip1 有三个事件了,但zip2 只有一个,不会触发onNext
zip2.onNext("b") //zip2 的第二个事件,与 zip1的第二个一起打包触发
zip1.onCompleted()
zip2.onCompleted()
那么打印出来的应该是 zip 1=1 2=a zip 1=2 2=b