summary refs log tree commit diff
path: root/vendor/maunium.net/go/mautrix/sync.go
diff options
context:
space:
mode:
authorEmile <git@emile.space>2024-10-25 15:55:50 +0200
committerEmile <git@emile.space>2024-10-25 15:55:50 +0200
commitc90f36e3dd179d2de96f4f5fe38d8dc9a9de6dfe (patch)
tree89e9afb41c5bf76f48cfb09305a2d3db8d302b06 /vendor/maunium.net/go/mautrix/sync.go
parent98bbb0f559a8883bc47bae80607dbe326a448e61 (diff)
vendor HEAD main
Diffstat (limited to 'vendor/maunium.net/go/mautrix/sync.go')
-rw-r--r--vendor/maunium.net/go/mautrix/sync.go284
1 files changed, 284 insertions, 0 deletions
diff --git a/vendor/maunium.net/go/mautrix/sync.go b/vendor/maunium.net/go/mautrix/sync.go
new file mode 100644
index 0000000..d420840
--- /dev/null
+++ b/vendor/maunium.net/go/mautrix/sync.go
@@ -0,0 +1,284 @@
+// Copyright (c) 2024 Tulir Asokan
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+package mautrix
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"runtime/debug"
+	"time"
+
+	"maunium.net/go/mautrix/event"
+	"maunium.net/go/mautrix/id"
+)
+
+// EventHandler handles a single event from a sync response.
+type EventHandler func(ctx context.Context, evt *event.Event)
+
+// SyncHandler handles a whole sync response. If the return value is false, handling will be stopped completely.
+type SyncHandler func(ctx context.Context, resp *RespSync, since string) bool
+
+// Syncer is an interface that must be satisfied in order to do /sync requests on a client.
+type Syncer interface {
+	// ProcessResponse processes the /sync response. The since parameter is the since= value that was used to produce the response.
+	// This is useful for detecting the very first sync (since=""). If an error is return, Syncing will be stopped permanently.
+	ProcessResponse(ctx context.Context, resp *RespSync, since string) error
+	// OnFailedSync returns either the time to wait before retrying or an error to stop syncing permanently.
+	OnFailedSync(res *RespSync, err error) (time.Duration, error)
+	// GetFilterJSON for the given user ID. NOT the filter ID.
+	GetFilterJSON(userID id.UserID) *Filter
+}
+
+type ExtensibleSyncer interface {
+	OnSync(callback SyncHandler)
+	OnEvent(callback EventHandler)
+	OnEventType(eventType event.Type, callback EventHandler)
+}
+
+type DispatchableSyncer interface {
+	Dispatch(ctx context.Context, evt *event.Event)
+}
+
+// DefaultSyncer is the default syncing implementation. You can either write your own syncer, or selectively
+// replace parts of this default syncer (e.g. the ProcessResponse method). The default syncer uses the observer
+// pattern to notify callers about incoming events. See DefaultSyncer.OnEventType for more information.
+type DefaultSyncer struct {
+	// syncListeners want the whole sync response, e.g. the crypto machine
+	syncListeners []SyncHandler
+	// globalListeners want all events
+	globalListeners []EventHandler
+	// listeners want a specific event type
+	listeners map[event.Type][]EventHandler
+	// ParseEventContent determines whether or not event content should be parsed before passing to handlers.
+	ParseEventContent bool
+	// ParseErrorHandler is called when event.Content.ParseRaw returns an error.
+	// If it returns false, the event will not be forwarded to listeners.
+	ParseErrorHandler func(evt *event.Event, err error) bool
+	// FilterJSON is used when the client starts syncing and doesn't get an existing filter ID from SyncStore's LoadFilterID.
+	FilterJSON *Filter
+}
+
+var _ Syncer = (*DefaultSyncer)(nil)
+var _ ExtensibleSyncer = (*DefaultSyncer)(nil)
+
+// NewDefaultSyncer returns an instantiated DefaultSyncer
+func NewDefaultSyncer() *DefaultSyncer {
+	return &DefaultSyncer{
+		listeners:         make(map[event.Type][]EventHandler),
+		syncListeners:     []SyncHandler{},
+		globalListeners:   []EventHandler{},
+		ParseEventContent: true,
+		ParseErrorHandler: func(evt *event.Event, err error) bool {
+			// By default, drop known events that can't be parsed, but let unknown events through
+			return errors.Is(err, event.ErrUnsupportedContentType) ||
+				// Also allow events that had their content already parsed by some other function
+				errors.Is(err, event.ErrContentAlreadyParsed)
+		},
+	}
+}
+
+// ProcessResponse processes the /sync response in a way suitable for bots. "Suitable for bots" means a stream of
+// unrepeating events. Returns a fatal error if a listener panics.
+func (s *DefaultSyncer) ProcessResponse(ctx context.Context, res *RespSync, since string) (err error) {
+	defer func() {
+		if r := recover(); r != nil {
+			err = fmt.Errorf("ProcessResponse panicked! since=%s panic=%s\n%s", since, r, debug.Stack())
+		}
+	}()
+
+	for _, listener := range s.syncListeners {
+		if !listener(ctx, res, since) {
+			return
+		}
+	}
+
+	s.processSyncEvents(ctx, "", res.ToDevice.Events, event.SourceToDevice)
+	s.processSyncEvents(ctx, "", res.Presence.Events, event.SourcePresence)
+	s.processSyncEvents(ctx, "", res.AccountData.Events, event.SourceAccountData)
+
+	for roomID, roomData := range res.Rooms.Join {
+		s.processSyncEvents(ctx, roomID, roomData.State.Events, event.SourceJoin|event.SourceState)
+		s.processSyncEvents(ctx, roomID, roomData.Timeline.Events, event.SourceJoin|event.SourceTimeline)
+		s.processSyncEvents(ctx, roomID, roomData.Ephemeral.Events, event.SourceJoin|event.SourceEphemeral)
+		s.processSyncEvents(ctx, roomID, roomData.AccountData.Events, event.SourceJoin|event.SourceAccountData)
+	}
+	for roomID, roomData := range res.Rooms.Invite {
+		s.processSyncEvents(ctx, roomID, roomData.State.Events, event.SourceInvite|event.SourceState)
+	}
+	for roomID, roomData := range res.Rooms.Leave {
+		s.processSyncEvents(ctx, roomID, roomData.State.Events, event.SourceLeave|event.SourceState)
+		s.processSyncEvents(ctx, roomID, roomData.Timeline.Events, event.SourceLeave|event.SourceTimeline)
+	}
+	return
+}
+
+func (s *DefaultSyncer) processSyncEvents(ctx context.Context, roomID id.RoomID, events []*event.Event, source event.Source) {
+	for _, evt := range events {
+		s.processSyncEvent(ctx, roomID, evt, source)
+	}
+}
+
+func (s *DefaultSyncer) processSyncEvent(ctx context.Context, roomID id.RoomID, evt *event.Event, source event.Source) {
+	evt.RoomID = roomID
+
+	// Ensure the type class is correct. It's safe to mutate the class since the event type is not a pointer.
+	// Listeners are keyed by type structs, which means only the correct class will pass.
+	switch {
+	case evt.StateKey != nil:
+		evt.Type.Class = event.StateEventType
+	case source == event.SourcePresence, source&event.SourceEphemeral != 0:
+		evt.Type.Class = event.EphemeralEventType
+	case source&event.SourceAccountData != 0:
+		evt.Type.Class = event.AccountDataEventType
+	case source == event.SourceToDevice:
+		evt.Type.Class = event.ToDeviceEventType
+	default:
+		evt.Type.Class = event.MessageEventType
+	}
+
+	if s.ParseEventContent {
+		err := evt.Content.ParseRaw(evt.Type)
+		if err != nil && !s.ParseErrorHandler(evt, err) {
+			return
+		}
+	}
+
+	evt.Mautrix.EventSource = source
+	s.Dispatch(ctx, evt)
+}
+
+func (s *DefaultSyncer) Dispatch(ctx context.Context, evt *event.Event) {
+	for _, fn := range s.globalListeners {
+		fn(ctx, evt)
+	}
+	listeners, exists := s.listeners[evt.Type]
+	if exists {
+		for _, fn := range listeners {
+			fn(ctx, evt)
+		}
+	}
+}
+
+// OnEventType allows callers to be notified when there are new events for the given event type.
+// There are no duplicate checks.
+func (s *DefaultSyncer) OnEventType(eventType event.Type, callback EventHandler) {
+	_, exists := s.listeners[eventType]
+	if !exists {
+		s.listeners[eventType] = []EventHandler{}
+	}
+	s.listeners[eventType] = append(s.listeners[eventType], callback)
+}
+
+func (s *DefaultSyncer) OnSync(callback SyncHandler) {
+	s.syncListeners = append(s.syncListeners, callback)
+}
+
+func (s *DefaultSyncer) OnEvent(callback EventHandler) {
+	s.globalListeners = append(s.globalListeners, callback)
+}
+
+// OnFailedSync always returns a 10 second wait period between failed /syncs, never a fatal error.
+func (s *DefaultSyncer) OnFailedSync(res *RespSync, err error) (time.Duration, error) {
+	if errors.Is(err, MUnknownToken) {
+		return 0, err
+	}
+	return 10 * time.Second, nil
+}
+
+var defaultFilter = Filter{
+	Room: RoomFilter{
+		Timeline: FilterPart{
+			Limit: 50,
+		},
+	},
+}
+
+// GetFilterJSON returns a filter with a timeline limit of 50.
+func (s *DefaultSyncer) GetFilterJSON(userID id.UserID) *Filter {
+	if s.FilterJSON == nil {
+		defaultFilterCopy := defaultFilter
+		s.FilterJSON = &defaultFilterCopy
+	}
+	return s.FilterJSON
+}
+
+// DontProcessOldEvents is a sync handler that removes rooms that the user just joined.
+// It's meant for bots to ignore events from before the bot joined the room.
+//
+// To use it, register it with your Syncer, e.g.:
+//
+//	cli.Syncer.(mautrix.ExtensibleSyncer).OnSync(cli.DontProcessOldEvents)
+func (cli *Client) DontProcessOldEvents(_ context.Context, resp *RespSync, since string) bool {
+	return dontProcessOldEvents(cli.UserID, resp, since)
+}
+
+var _ SyncHandler = (*Client)(nil).DontProcessOldEvents
+
+func dontProcessOldEvents(userID id.UserID, resp *RespSync, since string) bool {
+	if since == "" {
+		return false
+	}
+	// This is a horrible hack because /sync will return the most recent messages for a room
+	// as soon as you /join it. We do NOT want to process those events in that particular room
+	// because they may have already been processed (if you toggle the bot in/out of the room).
+	//
+	// Work around this by inspecting each room's timeline and seeing if an m.room.member event for us
+	// exists and is "join" and then discard processing that room entirely if so.
+	// TODO: We probably want to process messages from after the last join event in the timeline.
+	for roomID, roomData := range resp.Rooms.Join {
+		for i := len(roomData.Timeline.Events) - 1; i >= 0; i-- {
+			evt := roomData.Timeline.Events[i]
+			if evt.Type == event.StateMember && evt.GetStateKey() == string(userID) {
+				membership, _ := evt.Content.Raw["membership"].(string)
+				if membership == "join" {
+					_, ok := resp.Rooms.Join[roomID]
+					if !ok {
+						continue
+					}
+					delete(resp.Rooms.Join, roomID)   // don't re-process messages
+					delete(resp.Rooms.Invite, roomID) // don't re-process invites
+					break
+				}
+			}
+		}
+	}
+	return true
+}
+
+// MoveInviteState is a sync handler that moves events from the state event list to the InviteRoomState in the invite event.
+//
+// To use it, register it with your Syncer, e.g.:
+//
+//	cli.Syncer.(mautrix.ExtensibleSyncer).OnSync(cli.MoveInviteState)
+func (cli *Client) MoveInviteState(ctx context.Context, resp *RespSync, _ string) bool {
+	for _, meta := range resp.Rooms.Invite {
+		var inviteState []event.StrippedState
+		var inviteEvt *event.Event
+		for _, evt := range meta.State.Events {
+			if evt.Type == event.StateMember && evt.GetStateKey() == cli.UserID.String() {
+				inviteEvt = evt
+			} else {
+				evt.Type.Class = event.StateEventType
+				_ = evt.Content.ParseRaw(evt.Type)
+				inviteState = append(inviteState, event.StrippedState{
+					Content:  evt.Content,
+					Type:     evt.Type,
+					StateKey: evt.GetStateKey(),
+					Sender:   evt.Sender,
+				})
+			}
+		}
+		if inviteEvt != nil {
+			inviteEvt.Unsigned.InviteRoomState = inviteState
+			meta.State.Events = []*event.Event{inviteEvt}
+		}
+	}
+	return true
+}
+
+var _ SyncHandler = (*Client)(nil).MoveInviteState