relay
packageAPI reference for the relay
package.
Imports
(9)Transport
Transport routes incoming go-relay jobs to registered Handler implementations,
injecting dependencies via the DI container and emitting on go-signal after execution.
type Transport struct
Methods
Provide registers a dependency in the DI container.
Parameters
func (*Transport) Provide(name string, instance any)
{
t.container.Provide(name, instance)
}
Register subscribes a handler prototype to its declared relay topic. The prototype must be a pointer to a struct containing a core.Pattern field with a relay:"topic" struct tag. Panics if the tag is missing.
Parameters
func (*Transport) Register(prototype core.Handler)
{
val := reflect.ValueOf(prototype)
if val.Kind() != reflect.Ptr || val.Elem().Kind() != reflect.Struct {
panic("Transport.Register: prototype must be a pointer to a struct")
}
elemType := val.Elem().Type()
var topic string
for i := 0; i < elemType.NumField(); i++ {
field := elemType.Field(i)
if field.Type == reflect.TypeOf(core.Pattern{}) {
topic = field.Tag.Get("relay")
break
}
}
if topic == "" {
panic(fmt.Sprintf("Transport.Register: struct %s missing Pattern with relay tag", elemType.Name()))
}
t.mu.Lock()
t.handlers[topic] = prototype
t.mu.Unlock()
relaymanager.Register[map[string]any](t.relay, topic, t.dispatch(topic, prototype))
t.Logger.Info("Registered relay handler", "topic", topic)
}
dispatch returns a go-relay job handler that creates a fresh handler instance, injects dependencies, applies the job payload, executes Handle, and emits on the bus.
Parameters
Returns
func (*Transport) dispatch(topic string, prototype core.Handler) func(ctx context.Context, payload map[string]any) error
{
return func(ctx context.Context, payload map[string]any) error {
val := reflect.ValueOf(prototype)
elemType := val.Elem().Type()
newVal := reflect.New(elemType).Elem()
newVal.Set(val.Elem())
instance := newVal.Addr().Interface()
t.container.Inject(instance)
if err := t.applyPayload(instance, payload); err != nil {
return fmt.Errorf("payload binding failed: %w", err)
}
handler := instance.(core.Handler)
_, err := handler.Handle(ctx)
busInstance := t.Bus
if busInstance != nil {
bus.EmitAsync(ctx, busInstance, instance)
}
return err
}
}
applyPayload maps fields from the job payload map to the target struct, matching by field name (case-insensitive) or relay struct tag.
Parameters
Returns
func (*Transport) applyPayload(target any, payload map[string]any) error
{
dstVal := reflect.ValueOf(target).Elem()
dstType := dstVal.Type()
for k, v := range payload {
t.setFieldByNameOrTag(dstVal, dstType, k, reflect.ValueOf(v))
}
return nil
}
Parameters
func (*Transport) setFieldByNameOrTag(dst reflect.Value, dstType reflect.Type, name string, val reflect.Value)
{
for i := 0; i < dst.NumField(); i++ {
fieldMeta := dstType.Field(i)
field := dst.Field(i)
if !field.CanSet() {
continue
}
tag := fieldMeta.Tag.Get("relay")
if strings.EqualFold(fieldMeta.Name, name) || (tag != "" && tag == name) {
if field.Type() == val.Type() {
field.Set(val)
} else if val.Kind() == reflect.Interface && reflect.TypeOf(val.Interface()) == field.Type() {
field.Set(reflect.ValueOf(val.Interface()))
}
break
}
}
}
Start begins consuming jobs from the relay broker. It blocks until ctx is cancelled.
Parameters
Returns
func (*Transport) Start(ctx context.Context) error
{
return t.relay.Start(ctx)
}
Fields
| Name | Type | Description |
|---|---|---|
| relay | *relaymanager.Relay | |
| container | *core.Container | |
| Logger | logger.Logger | |
| Bus | *bus.Bus | |
| handlers | map[string]core.Handler | |
| mu | sync.RWMutex |
Option
Option configures a Transport.
type Option func(*Transport)
WithBus
WithBus sets a custom event bus for the transport.
func WithBus(b *bus.Bus) Option
{
return func(t *Transport) { t.Bus = b }
}
Uses
WithLogger
WithLogger sets a custom logger for the transport.
Parameters
Returns
func WithLogger(l logger.Logger) Option
{
return func(t *Transport) { t.Logger = l }
}
Uses
New
New creates a new relay transport wrapping the given go-relay instance.
Parameters
Returns
func New(r *relaymanager.Relay, opts ...Option) *Transport
{
t := &Transport{
relay: r,
container: core.NewContainer(),
Logger: logger.Nop,
handlers: make(map[string]core.Handler),
Bus: bus.Default(),
}
for _, opt := range opts {
opt(t)
}
return t
}