API Reference
Queue API
The queue package provides distributed job queues.
Queue Interface
type Queue interface {
Enqueue(ctx context.Context, job *Job) error
Dequeue(ctx context.Context, timeout time.Duration) (*Job, error)
Peek(ctx context.Context) (*Job, error)
Len(ctx context.Context) (int64, error)
Close() error
}
func NewDragonflyQueue(cfg Config) (Queue, error)Job
type Job struct {
ID string
Type string
Payload json.RawMessage
Priority int
CreatedAt time.Time
Attempts int
MaxRetries int
Metadata map[string]string
}
func NewJob[T any](jobType string, payload T) (*Job, error)
func (j *Job) UnmarshalPayload(v any) error
func (j *Job) WithPriority(p int) *Job
func (j *Job) WithMaxRetries(n int) *Job
func (j *Job) WithMetadata(key, value string) *JobConfig
type Config struct {
Address string
Password string
Database int
QueueName string
MaxRetries int
}Worker
type Worker struct{}
type Handler func(ctx context.Context, job *Job) error
func NewWorker(queue Queue) *Worker
func (w *Worker) Handle(jobType string, handler Handler)
func (w *Worker) Start(ctx context.Context, concurrency int)
func (w *Worker) Stop()ShardedQueue
type ShardedQueue struct{}
type ShardedConfig struct {
Shards []Config
Strategy ShardStrategy
}
type ShardStrategy int
const (
HashShard ShardStrategy = iota
RoundRobinShard
LeastLoadedShard
)
func NewShardedQueue(cfg ShardedConfig) (*ShardedQueue, error)
func (sq *ShardedQueue) NumShards() int
func (sq *ShardedQueue) LenPerShard(ctx context.Context) ([]int64, error)DistributedLock
type DistributedLock struct{}
type Lock struct{}
func NewDistributedLock(client *redis.Client) *DistributedLock
func (dl *DistributedLock) Acquire(ctx context.Context, key string, ttl time.Duration) (*Lock, error)
func (dl *DistributedLock) TryAcquire(ctx context.Context, key string, ttl, maxWait time.Duration) (*Lock, error)
func (dl *DistributedLock) WithLock(ctx context.Context, key string, ttl time.Duration, fn func() error) error
func (l *Lock) Release(ctx context.Context) error
func (l *Lock) Extend(ctx context.Context, ttl time.Duration) error
func (l *Lock) IsHeld(ctx context.Context) (bool, error)EventStore
type EventStore struct{}
type Event struct {
ID string
Type EventType
JobID string
JobType string
Timestamp time.Time
Data map[string]any
WorkerID string
Error string
Duration time.Duration
}
type EventType string
const (
EventJobCreated EventType = "job.created"
EventJobQueued EventType = "job.queued"
EventJobStarted EventType = "job.started"
EventJobCompleted EventType = "job.completed"
EventJobFailed EventType = "job.failed"
EventJobRetried EventType = "job.retried"
EventJobDLQ EventType = "job.dlq"
)
func NewEventStore(client *redis.Client) *EventStore
func (es *EventStore) Append(ctx context.Context, event Event) error
func (es *EventStore) GetJobEvents(ctx context.Context, jobID string) ([]Event, error)
func (es *EventStore) GetRecentEvents(ctx context.Context, count int64) ([]Event, error)
func (es *EventStore) Subscribe(ctx context.Context, handler func(Event)) errorDLQ
type DLQ struct{}
type DLQEntry struct {
Job *Job
Error string
FailedAt time.Time
Attempts int
WorkerID string
StackTrace string
}
type Alerter interface {
Alert(ctx context.Context, entry DLQEntry) error
}
func NewDLQ(client *redis.Client, name string) *DLQ
func (d *DLQ) AddAlerter(alerter Alerter)
func (d *DLQ) Add(ctx context.Context, job *Job, err error, workerID string) error
func (d *DLQ) Get(ctx context.Context, start, stop int64) ([]DLQEntry, error)
func (d *DLQ) Len(ctx context.Context) (int64, error)
func (d *DLQ) Retry(ctx context.Context, queue Queue, index int64) error
func (d *DLQ) RetryAll(ctx context.Context, queue Queue) (int, error)
func (d *DLQ) Purge(ctx context.Context) error
func NewWebhookAlerter(url string) *WebhookAlerter
func NewSlackAlerter(webhookURL, channel string) *SlackAlerter
func NewLogAlerter(logger func(string, ...any)) *LogAlerterSemaphore
type Semaphore struct{}
func NewSemaphore(client *redis.Client, key string, limit int) *Semaphore
func (s *Semaphore) Acquire(ctx context.Context, ttl time.Duration) (string, error)
func (s *Semaphore) Release(ctx context.Context, id string) error
func (s *Semaphore) Available(ctx context.Context) (int, error)