relay API

relay

package

API reference for the relay package.

S
struct

Transport

Transport routes incoming go-relay jobs to registered Handler implementations,
injecting dependencies via the DI container and emitting on go-signal after execution.

pkg/transport/relay/transport.go:18-25
type Transport struct

Methods

Provide
Method

Provide registers a dependency in the DI container.

Parameters

name string
instance any
func (*Transport) Provide(name string, instance any)
{
	t.container.Provide(name, instance)
}
Register
Method

Register subscribes a handler prototype to its declared relay topic. The prototype must be a pointer to a struct containing a core.Pattern field with a relay:"topic" struct tag. Panics if the tag is missing.

Parameters

prototype core.Handler
func (*Transport) Register(prototype core.Handler)
{
	val := reflect.ValueOf(prototype)
	if val.Kind() != reflect.Ptr || val.Elem().Kind() != reflect.Struct {
		panic("Transport.Register: prototype must be a pointer to a struct")
	}

	elemType := val.Elem().Type()

	var topic string
	for i := 0; i < elemType.NumField(); i++ {
		field := elemType.Field(i)
		if field.Type == reflect.TypeOf(core.Pattern{}) {
			topic = field.Tag.Get("relay")
			break
		}
	}

	if topic == "" {
		panic(fmt.Sprintf("Transport.Register: struct %s missing Pattern with relay tag", elemType.Name()))
	}

	t.mu.Lock()
	t.handlers[topic] = prototype
	t.mu.Unlock()

	relaymanager.Register[map[string]any](t.relay, topic, t.dispatch(topic, prototype))

	t.Logger.Info("Registered relay handler", "topic", topic)
}
dispatch
Method

dispatch returns a go-relay job handler that creates a fresh handler instance, injects dependencies, applies the job payload, executes Handle, and emits on the bus.

Parameters

topic string
prototype core.Handler
func (*Transport) dispatch(topic string, prototype core.Handler) func(ctx context.Context, payload map[string]any) error
{
	return func(ctx context.Context, payload map[string]any) error {
		val := reflect.ValueOf(prototype)
		elemType := val.Elem().Type()
		newVal := reflect.New(elemType).Elem()
		newVal.Set(val.Elem())

		instance := newVal.Addr().Interface()
		t.container.Inject(instance)

		if err := t.applyPayload(instance, payload); err != nil {
			return fmt.Errorf("payload binding failed: %w", err)
		}

		handler := instance.(core.Handler)
		_, err := handler.Handle(ctx)

		busInstance := t.Bus
		if busInstance != nil {
			bus.EmitAsync(ctx, busInstance, instance)
		}

		return err
	}
}
applyPayload
Method

applyPayload maps fields from the job payload map to the target struct, matching by field name (case-insensitive) or relay struct tag.

Parameters

target any
payload map[string]any

Returns

error
func (*Transport) applyPayload(target any, payload map[string]any) error
{
	dstVal := reflect.ValueOf(target).Elem()
	dstType := dstVal.Type()

	for k, v := range payload {
		t.setFieldByNameOrTag(dstVal, dstType, k, reflect.ValueOf(v))
	}

	return nil
}

Parameters

dstType reflect.Type
name string
func (*Transport) setFieldByNameOrTag(dst reflect.Value, dstType reflect.Type, name string, val reflect.Value)
{
	for i := 0; i < dst.NumField(); i++ {
		fieldMeta := dstType.Field(i)
		field := dst.Field(i)

		if !field.CanSet() {
			continue
		}

		tag := fieldMeta.Tag.Get("relay")
		if strings.EqualFold(fieldMeta.Name, name) || (tag != "" && tag == name) {
			if field.Type() == val.Type() {
				field.Set(val)
			} else if val.Kind() == reflect.Interface && reflect.TypeOf(val.Interface()) == field.Type() {
				field.Set(reflect.ValueOf(val.Interface()))
			}
			break
		}
	}
}
Start
Method

Start begins consuming jobs from the relay broker. It blocks until ctx is cancelled.

Parameters

Returns

error
func (*Transport) Start(ctx context.Context) error
{
	return t.relay.Start(ctx)
}

Fields

Name Type Description
relay *relaymanager.Relay
container *core.Container
Logger logger.Logger
Bus *bus.Bus
handlers map[string]core.Handler
mu sync.RWMutex
T
type

Option

Option configures a Transport.

pkg/transport/relay/transport.go:28-28
type Option func(*Transport)
F
function

WithBus

WithBus sets a custom event bus for the transport.

Parameters

b

Returns

pkg/transport/relay/transport.go:31-33
func WithBus(b *bus.Bus) Option

{
	return func(t *Transport) { t.Bus = b }
}
F
function

WithLogger

WithLogger sets a custom logger for the transport.

Parameters

Returns

pkg/transport/relay/transport.go:36-38
func WithLogger(l logger.Logger) Option

{
	return func(t *Transport) { t.Logger = l }
}
F
function

New

New creates a new relay transport wrapping the given go-relay instance.

Parameters

opts
...Option

Returns

pkg/transport/relay/transport.go:41-53
func New(r *relaymanager.Relay, opts ...Option) *Transport

{
	t := &Transport{
		relay:     r,
		container: core.NewContainer(),
		Logger:    logger.Nop,
		handlers:  make(map[string]core.Handler),
		Bus:       bus.Default(),
	}
	for _, opt := range opts {
		opt(t)
	}
	return t
}