diff options
author | Emile <git@emile.space> | 2024-10-25 15:55:50 +0200 |
---|---|---|
committer | Emile <git@emile.space> | 2024-10-25 15:55:50 +0200 |
commit | c90f36e3dd179d2de96f4f5fe38d8dc9a9de6dfe (patch) | |
tree | 89e9afb41c5bf76f48cfb09305a2d3db8d302b06 /vendor/maunium.net/go/mautrix/sync.go | |
parent | 98bbb0f559a8883bc47bae80607dbe326a448e61 (diff) |
Diffstat (limited to 'vendor/maunium.net/go/mautrix/sync.go')
-rw-r--r-- | vendor/maunium.net/go/mautrix/sync.go | 284 |
1 files changed, 284 insertions, 0 deletions
diff --git a/vendor/maunium.net/go/mautrix/sync.go b/vendor/maunium.net/go/mautrix/sync.go new file mode 100644 index 0000000..d420840 --- /dev/null +++ b/vendor/maunium.net/go/mautrix/sync.go @@ -0,0 +1,284 @@ +// Copyright (c) 2024 Tulir Asokan +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package mautrix + +import ( + "context" + "errors" + "fmt" + "runtime/debug" + "time" + + "maunium.net/go/mautrix/event" + "maunium.net/go/mautrix/id" +) + +// EventHandler handles a single event from a sync response. +type EventHandler func(ctx context.Context, evt *event.Event) + +// SyncHandler handles a whole sync response. If the return value is false, handling will be stopped completely. +type SyncHandler func(ctx context.Context, resp *RespSync, since string) bool + +// Syncer is an interface that must be satisfied in order to do /sync requests on a client. +type Syncer interface { + // ProcessResponse processes the /sync response. The since parameter is the since= value that was used to produce the response. + // This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped permanently. + ProcessResponse(ctx context.Context, resp *RespSync, since string) error + // OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently. + OnFailedSync(res *RespSync, err error) (time.Duration, error) + // GetFilterJSON for the given user ID. NOT the filter ID. + GetFilterJSON(userID id.UserID) *Filter +} + +type ExtensibleSyncer interface { + OnSync(callback SyncHandler) + OnEvent(callback EventHandler) + OnEventType(eventType event.Type, callback EventHandler) +} + +type DispatchableSyncer interface { + Dispatch(ctx context.Context, evt *event.Event) +} + +// DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively +// replace parts of this default syncer (e.g. the ProcessResponse method). The default syncer uses the observer +// pattern to notify callers about incoming events. See DefaultSyncer.OnEventType for more information. +type DefaultSyncer struct { + // syncListeners want the whole sync response, e.g. the crypto machine + syncListeners []SyncHandler + // globalListeners want all events + globalListeners []EventHandler + // listeners want a specific event type + listeners map[event.Type][]EventHandler + // ParseEventContent determines whether or not event content should be parsed before passing to handlers. + ParseEventContent bool + // ParseErrorHandler is called when event.Content.ParseRaw returns an error. + // If it returns false, the event will not be forwarded to listeners. + ParseErrorHandler func(evt *event.Event, err error) bool + // FilterJSON is used when the client starts syncing and doesn't get an existing filter ID from SyncStore's LoadFilterID. + FilterJSON *Filter +} + +var _ Syncer = (*DefaultSyncer)(nil) +var _ ExtensibleSyncer = (*DefaultSyncer)(nil) + +// NewDefaultSyncer returns an instantiated DefaultSyncer +func NewDefaultSyncer() *DefaultSyncer { + return &DefaultSyncer{ + listeners: make(map[event.Type][]EventHandler), + syncListeners: []SyncHandler{}, + globalListeners: []EventHandler{}, + ParseEventContent: true, + ParseErrorHandler: func(evt *event.Event, err error) bool { + // By default, drop known events that can't be parsed, but let unknown events through + return errors.Is(err, event.ErrUnsupportedContentType) || + // Also allow events that had their content already parsed by some other function + errors.Is(err, event.ErrContentAlreadyParsed) + }, + } +} + +// ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of +// unrepeating events. Returns a fatal error if a listener panics. +func (s *DefaultSyncer) ProcessResponse(ctx context.Context, res *RespSync, since string) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("ProcessResponse panicked! since=%s panic=%s\n%s", since, r, debug.Stack()) + } + }() + + for _, listener := range s.syncListeners { + if !listener(ctx, res, since) { + return + } + } + + s.processSyncEvents(ctx, "", res.ToDevice.Events, event.SourceToDevice) + s.processSyncEvents(ctx, "", res.Presence.Events, event.SourcePresence) + s.processSyncEvents(ctx, "", res.AccountData.Events, event.SourceAccountData) + + for roomID, roomData := range res.Rooms.Join { + s.processSyncEvents(ctx, roomID, roomData.State.Events, event.SourceJoin|event.SourceState) + s.processSyncEvents(ctx, roomID, roomData.Timeline.Events, event.SourceJoin|event.SourceTimeline) + s.processSyncEvents(ctx, roomID, roomData.Ephemeral.Events, event.SourceJoin|event.SourceEphemeral) + s.processSyncEvents(ctx, roomID, roomData.AccountData.Events, event.SourceJoin|event.SourceAccountData) + } + for roomID, roomData := range res.Rooms.Invite { + s.processSyncEvents(ctx, roomID, roomData.State.Events, event.SourceInvite|event.SourceState) + } + for roomID, roomData := range res.Rooms.Leave { + s.processSyncEvents(ctx, roomID, roomData.State.Events, event.SourceLeave|event.SourceState) + s.processSyncEvents(ctx, roomID, roomData.Timeline.Events, event.SourceLeave|event.SourceTimeline) + } + return +} + +func (s *DefaultSyncer) processSyncEvents(ctx context.Context, roomID id.RoomID, events []*event.Event, source event.Source) { + for _, evt := range events { + s.processSyncEvent(ctx, roomID, evt, source) + } +} + +func (s *DefaultSyncer) processSyncEvent(ctx context.Context, roomID id.RoomID, evt *event.Event, source event.Source) { + evt.RoomID = roomID + + // Ensure the type class is correct. It's safe to mutate the class since the event type is not a pointer. + // Listeners are keyed by type structs, which means only the correct class will pass. + switch { + case evt.StateKey != nil: + evt.Type.Class = event.StateEventType + case source == event.SourcePresence, source&event.SourceEphemeral != 0: + evt.Type.Class = event.EphemeralEventType + case source&event.SourceAccountData != 0: + evt.Type.Class = event.AccountDataEventType + case source == event.SourceToDevice: + evt.Type.Class = event.ToDeviceEventType + default: + evt.Type.Class = event.MessageEventType + } + + if s.ParseEventContent { + err := evt.Content.ParseRaw(evt.Type) + if err != nil && !s.ParseErrorHandler(evt, err) { + return + } + } + + evt.Mautrix.EventSource = source + s.Dispatch(ctx, evt) +} + +func (s *DefaultSyncer) Dispatch(ctx context.Context, evt *event.Event) { + for _, fn := range s.globalListeners { + fn(ctx, evt) + } + listeners, exists := s.listeners[evt.Type] + if exists { + for _, fn := range listeners { + fn(ctx, evt) + } + } +} + +// OnEventType allows callers to be notified when there are new events for the given event type. +// There are no duplicate checks. +func (s *DefaultSyncer) OnEventType(eventType event.Type, callback EventHandler) { + _, exists := s.listeners[eventType] + if !exists { + s.listeners[eventType] = []EventHandler{} + } + s.listeners[eventType] = append(s.listeners[eventType], callback) +} + +func (s *DefaultSyncer) OnSync(callback SyncHandler) { + s.syncListeners = append(s.syncListeners, callback) +} + +func (s *DefaultSyncer) OnEvent(callback EventHandler) { + s.globalListeners = append(s.globalListeners, callback) +} + +// OnFailedSync always returns a 10 second wait period between failed /syncs, never a fatal error. +func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) { + if errors.Is(err, MUnknownToken) { + return 0, err + } + return 10 * time.Second, nil +} + +var defaultFilter = Filter{ + Room: RoomFilter{ + Timeline: FilterPart{ + Limit: 50, + }, + }, +} + +// GetFilterJSON returns a filter with a timeline limit of 50. +func (s *DefaultSyncer) GetFilterJSON(userID id.UserID) *Filter { + if s.FilterJSON == nil { + defaultFilterCopy := defaultFilter + s.FilterJSON = &defaultFilterCopy + } + return s.FilterJSON +} + +// DontProcessOldEvents is a sync handler that removes rooms that the user just joined. +// It's meant for bots to ignore events from before the bot joined the room. +// +// To use it, register it with your Syncer, e.g.: +// +// cli.Syncer.(mautrix.ExtensibleSyncer).OnSync(cli.DontProcessOldEvents) +func (cli *Client) DontProcessOldEvents(_ context.Context, resp *RespSync, since string) bool { + return dontProcessOldEvents(cli.UserID, resp, since) +} + +var _ SyncHandler = (*Client)(nil).DontProcessOldEvents + +func dontProcessOldEvents(userID id.UserID, resp *RespSync, since string) bool { + if since == "" { + return false + } + // This is a horrible hack because /sync will return the most recent messages for a room + // as soon as you /join it. We do NOT want to process those events in that particular room + // because they may have already been processed (if you toggle the bot in/out of the room). + // + // Work around this by inspecting each room's timeline and seeing if an m.room.member event for us + // exists and is "join" and then discard processing that room entirely if so. + // TODO: We probably want to process messages from after the last join event in the timeline. + for roomID, roomData := range resp.Rooms.Join { + for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- { + evt := roomData.Timeline.Events[i] + if evt.Type == event.StateMember && evt.GetStateKey() == string(userID) { + membership, _ := evt.Content.Raw["membership"].(string) + if membership == "join" { + _, ok := resp.Rooms.Join[roomID] + if !ok { + continue + } + delete(resp.Rooms.Join, roomID) // don't re-process messages + delete(resp.Rooms.Invite, roomID) // don't re-process invites + break + } + } + } + } + return true +} + +// MoveInviteState is a sync handler that moves events from the state event list to the InviteRoomState in the invite event. +// +// To use it, register it with your Syncer, e.g.: +// +// cli.Syncer.(mautrix.ExtensibleSyncer).OnSync(cli.MoveInviteState) +func (cli *Client) MoveInviteState(ctx context.Context, resp *RespSync, _ string) bool { + for _, meta := range resp.Rooms.Invite { + var inviteState []event.StrippedState + var inviteEvt *event.Event + for _, evt := range meta.State.Events { + if evt.Type == event.StateMember && evt.GetStateKey() == cli.UserID.String() { + inviteEvt = evt + } else { + evt.Type.Class = event.StateEventType + _ = evt.Content.ParseRaw(evt.Type) + inviteState = append(inviteState, event.StrippedState{ + Content: evt.Content, + Type: evt.Type, + StateKey: evt.GetStateKey(), + Sender: evt.Sender, + }) + } + } + if inviteEvt != nil { + inviteEvt.Unsigned.InviteRoomState = inviteState + meta.State.Events = []*event.Event{inviteEvt} + } + } + return true +} + +var _ SyncHandler = (*Client)(nil).MoveInviteState |