Skip to content

Commit

Permalink
Revert "[trello.com/c/CdfJ7GTd] ChatProvider crash fix"
Browse files Browse the repository at this point in the history
This reverts commit 9662f93.
  • Loading branch information
just-software-dev committed Oct 16, 2024
1 parent b7bf742 commit aeeb543
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 113 deletions.
29 changes: 20 additions & 9 deletions Adamant/Modules/Chat/ViewModel/ChatViewModel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// Copyright © 2022 Adamant. All rights reserved.
//

import Combine
@preconcurrency import Combine
import CoreData
import MarkdownKit
import UIKit
Expand Down Expand Up @@ -1122,8 +1122,8 @@ private extension ChatViewModel {

Task {
await chatsProvider.stateObserver
.makeSequence()
.sink { @MainActor [weak self] state in
.receive(on: DispatchQueue.main)
.sink { [weak self] state in
self?.isHeaderLoading = state == .updating ? true : false
}
.store(in: &subscriptions)
Expand Down Expand Up @@ -1198,7 +1198,7 @@ private extension ChatViewModel {
updateMessages(
resetLoadingProperty: performFetch,
completion: isNewReaction
? { @MainActor [commitVibro] in commitVibro.send() }
? { @Sendable [commitVibro] in commitVibro.send() }
: {}
)
}
Expand Down Expand Up @@ -1468,11 +1468,22 @@ private extension ChatViewModel {
}

func waitForChatLoading(with address: String) async {
_ = await chatsProvider.chatLoadingStatus.makeSequence()
.filter { $0.contains { $0.key == address && $0.value == .loaded } }
.first

tempCancellables.removeAll()
await withUnsafeContinuation { continuation in
Task {
let publisher = await chatsProvider.chatLoadingStatusPublisher
publisher
.filter { dict in
dict.contains {
$0.key == address && $0.value == .loaded
}
}
.receive(on: DispatchQueue.main)
.sink { [weak self] _ in
self?.tempCancellables.removeAll()
continuation.resume()
}.store(in: &tempCancellables)
}
}
}

// TODO: Post process
Expand Down
2 changes: 1 addition & 1 deletion Adamant/ServiceProtocols/DataProviders/ChatsProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ protocol ChatsProvider: DataProvider, Actor {
var roomsMaxCount: Int? { get }
var roomsLoadedCount: Int? { get }

var chatLoadingStatus: AnyAsyncStreamable<[String: ChatRoomLoadingStatus]> {
var chatLoadingStatusPublisher: AnyObservable<[String: ChatRoomLoadingStatus]> {
get
}

Expand Down
2 changes: 1 addition & 1 deletion Adamant/ServiceProtocols/DataProviders/DataProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ enum State {

protocol DataProvider: AnyObject, Actor {
var state: State { get }
var stateObserver: AnyAsyncStreamable<State> { get }
var stateObserver: AnyObservable<State> { get }
var isInitiallySynced: Bool { get }

func reload() async
Expand Down
50 changes: 15 additions & 35 deletions Adamant/Services/DataProviders/AdamantChatsProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ actor AdamantChatsProvider: ChatsProvider {
let stack: CoreDataStack

// MARK: Properties
private let stateNotifier = SendablePublisher(ObservableValue(State.empty))
var stateObserver: AnyAsyncStreamable<State> { stateNotifier.eraseToAnyAsyncStreamable() }
@ObservableValue private var stateNotifier: State = .empty
var stateObserver: AnyObservable<State> { $stateNotifier.eraseToAnyPublisher() }

private(set) var state: State = .empty
private(set) var receivedLastHeight: Int64?
Expand All @@ -43,12 +43,9 @@ actor AdamantChatsProvider: ChatsProvider {
private(set) var blockList: [String] = []
private(set) var removedMessages: [String] = []

private let chatLoadingStatusDictionary = SendablePublisher(ObservableValue(
[String: ChatRoomLoadingStatus]()
))

var chatLoadingStatus: AnyAsyncStreamable<[String: ChatRoomLoadingStatus]> {
chatLoadingStatusDictionary.eraseToAnyAsyncStreamable()
@ObservableValue private var chatLoadingStatusDictionary: [String: ChatRoomLoadingStatus] = [:]
var chatLoadingStatusPublisher: AnyObservable<[String: ChatRoomLoadingStatus]> {
$chatLoadingStatusDictionary.eraseToAnyPublisher()
}

var chatMaxMessages: [String : Int] = [:]
Expand Down Expand Up @@ -246,10 +243,7 @@ actor AdamantChatsProvider: ChatsProvider {
]
)

// TODO: Remove Task.sync
Task.sync { [weak self] in
await self?.stateNotifier.isolated { $0.publisher.value = state }
}
stateNotifier = state
return
}

Expand All @@ -264,10 +258,7 @@ actor AdamantChatsProvider: ChatsProvider {
]
)

// TODO: Remove Task.sync
Task.sync { [weak self] in
await self?.stateNotifier.isolated { $0.publisher.value = state }
}
stateNotifier = state
}

private func setupSecuredStore() {
Expand Down Expand Up @@ -306,13 +297,10 @@ extension AdamantChatsProvider {
readedLastHeight = nil
roomsMaxCount = nil
roomsLoadedCount = nil
chatLoadingStatusDictionary.removeAll()
chatMaxMessages.removeAll()
chatLoadedMessages.removeAll()

Task.sync { [self] in
await chatLoadingStatusDictionary.isolated { $0.publisher.value.removeAll() }
}

// Drop store
securedStore.remove(StoreKey.chatProvider.address)
securedStore.remove(StoreKey.chatProvider.receivedLastHeight)
Expand Down Expand Up @@ -522,7 +510,7 @@ extension AdamantChatsProvider {
limit: limit
).get()
return chatrooms
} catch let error {
} catch let error as ApiServiceError {
guard case .networkError = error else {
return nil
}
Expand Down Expand Up @@ -668,27 +656,19 @@ extension AdamantChatsProvider {
}

func isChatLoading(with addressRecipient: String) -> Bool {
Task.sync { [self] in
await chatLoadingStatusDictionary.isolated { $0.publisher.value[addressRecipient] == .loading }
}
chatLoadingStatusDictionary[addressRecipient] == .loading
}

func isChatLoaded(with addressRecipient: String) -> Bool {
Task.sync { [self] in
await chatLoadingStatusDictionary.isolated { $0.publisher.value[addressRecipient] == .loaded }
}
chatLoadingStatusDictionary[addressRecipient] == .loaded
}

func getChatStatus(for recipient: String) -> ChatRoomLoadingStatus {
Task.sync { [self] in
await chatLoadingStatusDictionary.isolated { $0.publisher.value[recipient] ?? .none }
}
chatLoadingStatusDictionary[recipient] ?? .none
}

func setChatStatus(for recipient: String, status: ChatRoomLoadingStatus) {
Task.sync { [self] in
await chatLoadingStatusDictionary.isolated { $0.publisher.value[recipient] = status }
}
chatLoadingStatusDictionary[recipient] = status
}
}

Expand Down Expand Up @@ -839,7 +819,7 @@ extension AdamantChatsProvider {
)
}

_ = try await sendMessageToServer(
let transaction = try await sendMessageToServer(
senderId: loggedAccount.address,
recipientId: recipientId,
transaction: transactionLocaly,
Expand Down Expand Up @@ -1332,7 +1312,7 @@ extension AdamantChatsProvider {

return transaction
} catch {
guard case let(apiError) = (error as ApiServiceError),
guard case let(apiError) = (error as? ApiServiceError),
case let(.serverError(text)) = apiError,
text.contains("Transaction is already confirmed")
|| text.contains("Transaction is already processed")
Expand Down
21 changes: 4 additions & 17 deletions Adamant/Services/DataProviders/AdamantTransfersProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,9 @@ actor AdamantTransfersProvider: TransfersProvider {
let securedStore: SecuredStore
private let transactionService: ChatTransactionService
weak var chatsProvider: ChatsProvider?
private let statePublisher = SendablePublisher(ObservableValue(State.empty))

var state: State {
Task.sync { [self] in
await statePublisher.isolated { $0.publisher.value }
}
}

var stateObserver: AnyAsyncStreamable<State> {
statePublisher.eraseToAnyAsyncStreamable()
}

@ObservableValue private(set) var state: State = .empty
var stateObserver: AnyObservable<State> { $state.eraseToAnyPublisher() }
private(set) var isInitiallySynced: Bool = false
private(set) var receivedLastHeight: Int64?
private(set) var readedLastHeight: Int64?
Expand All @@ -51,9 +42,7 @@ actor AdamantTransfersProvider: TransfersProvider {

/// Free stateSemaphore before calling this method, or you will deadlock.
private func setState(_ state: State, previous prevState: State, notify: Bool = false) {
Task.sync { [weak self] in
await self?.statePublisher.isolated { $0.publisher.value = state }
}
self.state = state

if notify {
switch prevState {
Expand Down Expand Up @@ -199,9 +188,7 @@ extension AdamantTransfersProvider {
}

let prevState = state
Task.sync { [weak self] in
await self?.statePublisher.isolated { $0.publisher.value = .updating }
}
state = .updating

guard let address = accountService.account?.address else {
self.setState(.failedToUpdate(TransfersProviderError.notLogged), previous: prevState)
Expand Down
15 changes: 0 additions & 15 deletions CommonKit/Sources/CommonKit/Helpers/Actor+Extension.swift

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ import Combine

/// `Published` changes its `wrappedValue` after calling `sink` or `assign`.
/// But `ObservableValue` does it before.
@propertyWrapper public final class ObservableValue<Output> {
@propertyWrapper public final class ObservableValue<Output>: Publisher {
public typealias Output = Output
public typealias Failure = Never

private let subject: CurrentValueSubject<Output, Failure>

public var wrappedValue: Output {
get { value }
set { value = newValue }
}

public var value: Output {
get { subject.value }
set { subject.value = newValue }
}
Expand All @@ -27,35 +25,15 @@ import Combine
subject
}

public init(_ value: Output) {
subject = .init(value)
}

public convenience init(wrappedValue: Output) {
self.init(wrappedValue)
}
}

extension ObservableValue: Subject {
public typealias Failure = Never

public func send(_ value: Output) {
subject.send(value)
}

public func send(completion: Subscribers.Completion<Never>) {
subject.send(completion: completion)
}

public func send(subscription: any Subscription) {
subject.send(subscription: subscription)
}

public func receive<S>(
subscriber: S
) where S: Subscriber, Never == S.Failure, Output == S.Input {
subject.receive(subscriber: subscriber)
}

public init(wrappedValue: Output) {
subject = .init(wrappedValue)
}
}

public extension Publisher where Failure == Never {
Expand Down
8 changes: 4 additions & 4 deletions CommonKit/Sources/CommonKit/Helpers/Task+Extension.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@ public extension Task where Success == Never, Failure == Never {

/// Avoid using it. It lowers performance due to changing threads.
@discardableResult
static func sync<T: Sendable>(_ action: @Sendable @escaping () async throws -> T) rethrows -> T {
try _sync(action)
static func sync<T: Sendable>(_ action: @Sendable @escaping () async -> T) -> T {
_sync(action)
}
}

@discardableResult
private func _sync<T: Sendable>(_ action: @Sendable @escaping () async throws -> T) rethrows -> T {
private func _sync<T: Sendable>(_ action: @Sendable @escaping () async -> T) -> T {
var result: T?
let semaphore = DispatchSemaphore(value: .zero)

Task {
result = try await action()
result = await action()
semaphore.signal()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import Foundation

public enum ChatRoomLoadingStatus: Sendable {
public enum ChatRoomLoadingStatus {
case loaded
case loading
case none
Expand Down

0 comments on commit aeeb543

Please sign in to comment.