Skip to content

Commit

Permalink
feat: handle both team member deleted events (#2374)
Browse files Browse the repository at this point in the history
* Revert "feat: stop handling team member add/removed events (#2273)"

This reverts commit e2f290b

* feat: handle member leave team event

* delete unneeded stuff

* detekt
  • Loading branch information
MohamadJaara authored Jan 17, 2024
1 parent 0bd9d9b commit b16d0fd
Show file tree
Hide file tree
Showing 24 changed files with 337 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ interface ConversationRepository {
conversationID: ConversationId
): Either<CoreFailure, Unit>

suspend fun deleteMembersFromEvent(userIDList: List<UserId>, conversationID: ConversationId): Either<CoreFailure, Unit>
suspend fun observeOneToOneConversationWithOtherUser(otherUserId: UserId): Flow<Either<CoreFailure, Conversation>>

suspend fun getOneOnOneConversationsWithOtherUser(
Expand Down Expand Up @@ -653,17 +652,6 @@ internal class ConversationDataSource internal constructor(
memberDAO.updateMemberRole(member.id.toDao(), conversationID.toDao(), conversationRoleMapper.toDAO(member.role))
}

override suspend fun deleteMembersFromEvent(
userIDList: List<UserId>,
conversationID: ConversationId
): Either<CoreFailure, Unit> =
wrapStorageRequest {
memberDAO.deleteMembersByQualifiedID(
userIDList.map { it.toDao() },
conversationID.toDao()
)
}

override suspend fun getConversationsByGroupState(
groupState: GroupState
): Either<StorageFailure, List<Conversation>> =
Expand Down Expand Up @@ -1096,6 +1084,7 @@ internal class ConversationDataSource internal constructor(
}
}
}

override suspend fun setLegalHoldStatusChangeNotified(conversationId: ConversationId): Either<CoreFailure, Boolean> =
wrapStorageRequest {
conversationDAO.updateLegalHoldStatusChangeNotified(conversationId = conversationId.toDao(), notified = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import com.wire.kalium.logger.obfuscateDomain
import com.wire.kalium.logger.obfuscateId
import com.wire.kalium.logic.data.client.Client
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.conversation.Conversation.Member
import com.wire.kalium.logic.data.conversation.Conversation.Protocol
import com.wire.kalium.logic.data.conversation.Conversation.Member
import com.wire.kalium.logic.data.conversation.Conversation.ReceiptMode
import com.wire.kalium.logic.data.conversation.Conversation.TypingIndicatorMode
import com.wire.kalium.logic.data.conversation.MutedConversationStatus
Expand Down Expand Up @@ -447,6 +447,23 @@ sealed class Event(open val id: String, open val transient: Boolean, open val li
)
}

data class MemberLeave(
override val id: String,
override val transient: Boolean,
override val live: Boolean,
override val teamId: String,
val memberId: String,
val timestampIso: String,
) : Team(id, teamId, transient, live) {
override fun toLogMap(): Map<String, Any?> = mapOf(
typeKey to "Team.MemberLeave",
idKey to id.obfuscateId(),
teamIdKey to teamId.obfuscateId(),
timestampIsoKey to timestampIso,
memberIdKey to memberId.obfuscateId(),
)
}

data class MemberUpdate(
override val id: String,
override val teamId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class EventMapper(
is EventContentDTO.Conversation.AccessUpdate -> unknown(id, transient, live, eventContentDTO)
is EventContentDTO.Conversation.DeletedConversationDTO -> conversationDeleted(id, eventContentDTO, transient, live)
is EventContentDTO.Conversation.ConversationRenameDTO -> conversationRenamed(id, eventContentDTO, transient, live)
is EventContentDTO.Team.MemberLeave -> teamMemberLeft(id, eventContentDTO, transient, live)
is EventContentDTO.Team.MemberUpdate -> teamMemberUpdate(id, eventContentDTO, transient, live)
is EventContentDTO.Team.Update -> teamUpdate(id, eventContentDTO, transient, live)
is EventContentDTO.User.UpdateDTO -> userUpdate(id, eventContentDTO, transient, live)
Expand Down Expand Up @@ -655,6 +656,20 @@ class EventMapper(
timestampIso = event.time,
)

private fun teamMemberLeft(
id: String,
event: EventContentDTO.Team.MemberLeave,
transient: Boolean,
live: Boolean
) = Event.Team.MemberLeave(
id = id,
teamId = event.teamId,
memberId = event.teamMember.nonQualifiedUserId,
transient = transient,
live = live,
timestampIso = event.time
)

private fun teamMemberUpdate(
id: String,
event: EventContentDTO.Team.MemberUpdate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import com.wire.kalium.network.api.base.model.SelfUserDTO
import com.wire.kalium.network.api.base.model.UserProfileDTO
import com.wire.kalium.network.api.base.model.isTeamMember
import com.wire.kalium.persistence.dao.ConnectionEntity
import com.wire.kalium.persistence.dao.ConversationIDEntity
import com.wire.kalium.persistence.dao.MetadataDAO
import com.wire.kalium.persistence.dao.QualifiedIDEntity
import com.wire.kalium.persistence.dao.UserDAO
Expand Down Expand Up @@ -111,9 +112,9 @@ interface UserRepository {
*/
suspend fun getAllRecipients(): Either<CoreFailure, Pair<List<Recipient>, List<Recipient>>>
suspend fun updateUserFromEvent(event: Event.User.Update): Either<CoreFailure, Unit>
suspend fun markUserAsDeletedAndRemoveFromGroupConversations(userId: UserId): Either<CoreFailure, Unit>
suspend fun markUserAsDeletedAndRemoveFromGroupConversations(userId: UserId): Either<CoreFailure, List<ConversationId>>

suspend fun markUserAsDeletedAndRemoveFromGroupConversations(userId: List<UserId>): Either<CoreFailure, Unit>
suspend fun markAsDeleted(userId: List<UserId>): Either<StorageFailure, Unit>

/**
* Marks federated user as defederated in order to hold conversation history
Expand Down Expand Up @@ -511,14 +512,16 @@ internal class UserDataSource internal constructor(
}
}

override suspend fun markUserAsDeletedAndRemoveFromGroupConversations(userId: UserId): Either<CoreFailure, Unit> = wrapStorageRequest {
userDAO.markUserAsDeletedAndRemoveFromGroupConv(userId.toDao())
}

override suspend fun markUserAsDeletedAndRemoveFromGroupConversations(userId: List<UserId>): Either<CoreFailure, Unit> =
override suspend fun markUserAsDeletedAndRemoveFromGroupConversations(
userId: UserId
): Either<CoreFailure, List<ConversationId>> =
wrapStorageRequest {
userDAO.markUserAsDeletedAndRemoveFromGroupConv(userId.map { it.toDao() })
}
userDAO.markUserAsDeletedAndRemoveFromGroupConv(userId.toDao())
}.map { it.map(ConversationIDEntity::toModel) }

override suspend fun markAsDeleted(userId: List<UserId>): Either<StorageFailure, Unit> = wrapStorageRequest {
userDAO.markAsDeleted(userId.map { it.toDao() })
}

override suspend fun defederateUser(userId: UserId): Either<CoreFailure, Unit> {
return wrapStorageRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,6 @@ class UserSessionScope internal constructor(
get() = UserEventReceiverImpl(
clientRepository,
connectionRepository,
conversationRepository,
userRepository,
logout,
oneOnOneResolver,
Expand All @@ -1443,7 +1442,7 @@ class UserSessionScope internal constructor(
)

private val teamEventReceiver: TeamEventReceiver
get() = TeamEventReceiverImpl(teamRepository)
get() = TeamEventReceiverImpl(teamRepository, conversationRepository, userRepository, persistMessage, userId)

private val guestRoomConfigHandler
get() = GuestRoomConfigHandler(userConfigRepository, kaliumConfigs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class FederationEventReceiverImpl internal constructor(
userIDList.map { it.toDao() },
conversationID.toDao()
)
}
}.map { }

private suspend fun handleMemberRemovedEvent(conversationID: ConversationId, userIDList: List<UserId>) {
val message = Message.System(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@

package com.wire.kalium.logic.sync.receiver

import com.benasher44.uuid.uuid4
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.event.Event
import com.wire.kalium.logic.data.event.EventLoggingStatus
import com.wire.kalium.logic.data.event.logEventProcessing
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.message.PersistMessageUseCase
import com.wire.kalium.logic.data.team.Team
import com.wire.kalium.logic.data.team.TeamRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
Expand All @@ -32,12 +39,16 @@ import com.wire.kalium.logic.kaliumLogger
internal interface TeamEventReceiver : EventReceiver<Event.Team>

internal class TeamEventReceiverImpl(
private val teamRepository: TeamRepository

private val teamRepository: TeamRepository,
private val conversationRepository: ConversationRepository,
private val userRepository: UserRepository,
private val persistMessage: PersistMessageUseCase,
private val selfUserId: UserId,
) : TeamEventReceiver {

override suspend fun onEvent(event: Event.Team): Either<CoreFailure, Unit> {
when (event) {
is Event.Team.MemberLeave -> handleMemberLeave(event)
is Event.Team.MemberUpdate -> handleMemberUpdate(event)
is Event.Team.Update -> handleUpdate(event)
}
Expand All @@ -48,6 +59,27 @@ internal class TeamEventReceiverImpl(
return Either.Right(Unit)
}

@Suppress("LongMethod")
private suspend fun handleMemberLeave(event: Event.Team.MemberLeave) {
val removedUser = UserId(event.memberId, selfUserId.domain)
userRepository.markUserAsDeletedAndRemoveFromGroupConversations(removedUser)
.onSuccess {
it.forEach { conversationId ->
val message = Message.System(
id = uuid4().toString(), // We generate a random uuid for this new system message
content = MessageContent.MemberChange.RemovedFromTeam(listOf(removedUser)),
conversationId = conversationId,
date = event.timestampIso,
senderUserId = removedUser,
status = Message.Status.Sent,
visibility = Message.Visibility.VISIBLE,
expirationData = null
)
persistMessage(message)
}
}
}

private suspend fun handleMemberUpdate(event: Event.Team.MemberUpdate) =
teamRepository.updateMemberRole(
teamId = event.teamId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.client.ClientRepository
import com.wire.kalium.logic.data.connection.ConnectionRepository
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.conversation.NewGroupConversationSystemMessagesCreator
import com.wire.kalium.logic.data.event.Event
import com.wire.kalium.logic.data.event.EventLoggingStatus
Expand Down Expand Up @@ -52,7 +51,6 @@ internal interface UserEventReceiver : EventReceiver<Event.User>
internal class UserEventReceiverImpl internal constructor(
private val clientRepository: ClientRepository,
private val connectionRepository: ConnectionRepository,
private val conversationRepository: ConversationRepository,
private val userRepository: UserRepository,
private val logout: LogoutUseCase,
private val oneOnOneResolver: OneOnOneResolver,
Expand Down Expand Up @@ -181,6 +179,7 @@ internal class UserEventReceiverImpl internal constructor(
Either.Right(Unit)
} else {
userRepository.markUserAsDeletedAndRemoveFromGroupConversations(event.userId)
.map { Unit }
.onFailure {
kaliumLogger
.logEventProcessing(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import com.wire.kalium.logic.feature.call.usecase.UpdateConversationClientsForCu
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.getOrElse
import com.wire.kalium.logic.functional.getOrNull
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.kaliumLogger
Expand All @@ -53,67 +54,68 @@ internal class MemberLeaveEventHandlerImpl(
private val selfTeamIdProvider: SelfTeamIdProvider
) : MemberLeaveEventHandler {

override suspend fun handle(event: Event.Conversation.MemberLeave) =
override suspend fun handle(event: Event.Conversation.MemberLeave): Either<CoreFailure, Unit> =
let {
when (event.reason) {
MemberLeaveReason.Removed,
MemberLeaveReason.Left -> {
deleteMembers(event.removedList, event.conversationId)
}

MemberLeaveReason.UserDeleted -> {
userRepository.markUserAsDeletedAndRemoveFromGroupConversations(event.removedList)
}
if (event.reason == MemberLeaveReason.UserDeleted) {
userRepository.markAsDeleted(event.removedList)
}
}.onSuccess {
updateConversationClientsForCurrentCall.value(event.conversationId)
}.onSuccess {
// fetch required unknown users that haven't been persisted during slow sync, e.g. from another team
// and keep them to properly show this member-leave message
userRepository.fetchUsersIfUnknownByIds(event.removedList.toSet())
}.onSuccess {
val content: MessageContent.System? = resolveMessageContent(event)
deleteMembers(event.removedList, event.conversationId)
}
.onSuccess {
updateConversationClientsForCurrentCall.value(event.conversationId)
}.onSuccess {
// fetch required unknown users that haven't been persisted during slow sync, e.g. from another team
// and keep them to properly show this member-leave message
userRepository.fetchUsersIfUnknownByIds(event.removedList.toSet())
}.onSuccess { numberOfUsersDeleted ->
val content: MessageContent.System? = resolveMessageContent(event, numberOfUsersDeleted)

content?.let {
Message.System(
id = event.id,
content = it,
conversationId = event.conversationId,
date = event.timestampIso,
senderUserId = event.removedBy,
status = Message.Status.Sent,
visibility = Message.Visibility.VISIBLE,
expirationData = null
).also {
persistMessage(it)
content?.let {
Message.System(
id = event.id,
content = it,
conversationId = event.conversationId,
date = event.timestampIso,
senderUserId = event.removedBy,
status = Message.Status.Sent,
visibility = Message.Visibility.VISIBLE,
expirationData = null
).also {
persistMessage(it)
}
}
}
}.onSuccess {
kaliumLogger
.logEventProcessing(
EventLoggingStatus.SUCCESS,
event
)
}.onFailure {
kaliumLogger
.logEventProcessing(
EventLoggingStatus.FAILURE,
event,
Pair("errorInfo", "$it")
)
}
}.onSuccess {
kaliumLogger
.logEventProcessing(
EventLoggingStatus.SUCCESS,
event
)
}.onFailure {
kaliumLogger
.logEventProcessing(
EventLoggingStatus.FAILURE,
event,
Pair("errorInfo", "$it")
)
}.map { }

private suspend fun resolveMessageContent(event: Event.Conversation.MemberLeave): MessageContent.System? {
private suspend fun resolveMessageContent(event: Event.Conversation.MemberLeave, numberOfUsersDeleted: Long): MessageContent.System? {
return when (event.reason) {
MemberLeaveReason.Left,
MemberLeaveReason.Removed -> MessageContent.MemberChange.Removed(members = event.removedList)
MemberLeaveReason.UserDeleted -> handleUserDeleted(event)

MemberLeaveReason.UserDeleted -> handleUserDeleted(event, numberOfUsersDeleted)
}
}
private suspend fun handleUserDeleted(event: Event.Conversation.MemberLeave): MessageContent.System? {

private suspend fun handleUserDeleted(event: Event.Conversation.MemberLeave, numberOfUsersDeleted: Long): MessageContent.System? {
val teamId = selfTeamIdProvider().getOrNull() ?: return null
val isMemberRemoved = userRepository.isAtLeastOneUserATeamMember(event.removedList, teamId).getOrElse(false)
return if (isMemberRemoved) {

val isMemberRemoved = userRepository.isAtLeastOneUserATeamMember(
event.removedList,
teamId
).getOrElse(false)
return if (isMemberRemoved && numberOfUsersDeleted > 0) {
MessageContent.MemberChange.RemovedFromTeam(members = event.removedList)
} else {
null
Expand All @@ -123,7 +125,7 @@ internal class MemberLeaveEventHandlerImpl(
private suspend fun deleteMembers(
userIDList: List<UserId>,
conversationID: ConversationId
): Either<CoreFailure, Unit> =
): Either<CoreFailure, Long> =
wrapStorageRequest {
memberDAO.deleteMembersByQualifiedID(
userIDList.map { it.toDao() },
Expand Down
Loading

0 comments on commit b16d0fd

Please sign in to comment.