336 lines
8.4 KiB
Go
336 lines
8.4 KiB
Go
package otel
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"os"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"go.opentelemetry.io/contrib/bridges/otelslog"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
|
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
|
|
"go.opentelemetry.io/otel/exporters/stdout/stdoutlog"
|
|
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
|
|
"go.opentelemetry.io/otel/log/global"
|
|
"go.opentelemetry.io/otel/metric"
|
|
"go.opentelemetry.io/otel/propagation"
|
|
"go.opentelemetry.io/otel/sdk/log"
|
|
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
|
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
var (
|
|
tracer trace.Tracer
|
|
meter metric.Meter
|
|
logger *slog.Logger
|
|
)
|
|
|
|
func Init(ctx context.Context, name string) (shutdown func(context.Context) error, err error) {
|
|
tracer = otel.Tracer(name)
|
|
meter = otel.Meter(name)
|
|
logger = otelslog.NewLogger(name)
|
|
|
|
return setupOTelSDK(ctx, name)
|
|
}
|
|
|
|
func Meter() metric.Meter { return meter }
|
|
|
|
// func Error(err error, v ...any) {
|
|
// if err == nil {
|
|
// return
|
|
// }
|
|
// fmt.Println("ERR:", append([]any{err}, v...))
|
|
// logger.Error(err.Error(), v...)
|
|
// }
|
|
// func Info(msg string, v ...any) { fmt.Println(append([]any{msg}, v...)); logger.Info(msg, v...) }
|
|
|
|
type spanny struct {
|
|
trace.Span
|
|
}
|
|
|
|
func (s *spanny) RecordError(err error, options ...trace.EventOption) {
|
|
if err == nil {
|
|
return
|
|
}
|
|
ec := trace.NewEventConfig(options...)
|
|
|
|
attrs := make([]any, len(ec.Attributes()))
|
|
for i, v := range ec.Attributes() {
|
|
attrs[i] = v
|
|
}
|
|
|
|
fmt.Println(append([]any{"ERR:", err}, attrs...)...)
|
|
logger.Error(err.Error(), attrs...)
|
|
s.Span.RecordError(err, options...)
|
|
}
|
|
func (s *spanny) AddEvent(name string, options ...trace.EventOption) {
|
|
ec := trace.NewEventConfig(options...)
|
|
|
|
attrs := make([]any, 2*len(ec.Attributes()))
|
|
for i, v := range ec.Attributes() {
|
|
attrs[2*i] = v.Key
|
|
attrs[2*i+1] = v.Value.Emit()
|
|
}
|
|
fmt.Println(append([]any{name}, attrs...)...)
|
|
logger.Info(name, attrs...)
|
|
|
|
s.Span.AddEvent(name, options...)
|
|
}
|
|
|
|
func Span(ctx context.Context, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
|
|
name, attrs := Attrs()
|
|
ctx, span := tracer.Start(ctx, name, opts...)
|
|
span.SetAttributes(attrs...)
|
|
|
|
return ctx, &spanny{span}
|
|
}
|
|
func Attrs() (string, []attribute.KeyValue) {
|
|
var attrs []attribute.KeyValue
|
|
var name string
|
|
if pc, file, line, ok := runtime.Caller(2); ok {
|
|
if fn := runtime.FuncForPC(pc); fn != nil {
|
|
name = fn.Name()
|
|
}
|
|
attrs = append(attrs,
|
|
attribute.String("pc", fmt.Sprintf("%v", pc)),
|
|
attribute.String("file", file),
|
|
attribute.Int("line", line),
|
|
)
|
|
}
|
|
return name, attrs
|
|
}
|
|
|
|
// setupOTelSDK bootstraps the OpenTelemetry pipeline.
|
|
// If it does not return an error, make sure to call shutdown for proper cleanup.
|
|
func setupOTelSDK(ctx context.Context, name string) (shutdown func(context.Context) error, err error) {
|
|
var shutdownFuncs []func(context.Context) error
|
|
|
|
// shutdown calls cleanup functions registered via shutdownFuncs.
|
|
// The errors from the calls are joined.
|
|
// Each registered cleanup will be invoked once.
|
|
shutdown = func(ctx context.Context) error {
|
|
fmt.Println("shutdown")
|
|
var err error
|
|
for _, fn := range shutdownFuncs {
|
|
err = errors.Join(err, fn(ctx))
|
|
}
|
|
shutdownFuncs = nil
|
|
return err
|
|
}
|
|
|
|
// playShutdown := otelplay.ConfigureOpentelemetry(ctx)
|
|
// shutdownFuncs = append(shutdownFuncs, func(ctx context.Context) error { playShutdown(); return nil })
|
|
|
|
// handleErr calls shutdown for cleanup and makes sure that all errors are returned.
|
|
handleErr := func(inErr error) {
|
|
err = errors.Join(inErr, shutdown(ctx))
|
|
}
|
|
|
|
// Set up propagator.
|
|
prop := newPropagator()
|
|
otel.SetTextMapPropagator(prop)
|
|
|
|
// Set up trace provider.
|
|
tracerShutdown, err := newTraceProvider(ctx, name)
|
|
if err != nil {
|
|
handleErr(err)
|
|
return
|
|
}
|
|
shutdownFuncs = append(shutdownFuncs, tracerShutdown)
|
|
|
|
// Set up meter provider.
|
|
meterShutdown, err := newMeterProvider(ctx, name)
|
|
if err != nil {
|
|
handleErr(err)
|
|
return
|
|
}
|
|
shutdownFuncs = append(shutdownFuncs, meterShutdown)
|
|
|
|
// Set up logger provider.
|
|
loggerShutdown, err := newLoggerProvider(ctx, name)
|
|
if err != nil {
|
|
handleErr(err)
|
|
return
|
|
}
|
|
shutdownFuncs = append(shutdownFuncs, loggerShutdown)
|
|
|
|
return
|
|
}
|
|
|
|
func newPropagator() propagation.TextMapPropagator {
|
|
return propagation.NewCompositeTextMapPropagator(
|
|
propagation.TraceContext{},
|
|
propagation.Baggage{},
|
|
)
|
|
}
|
|
|
|
func newTraceProvider(ctx context.Context, name string) (func(context.Context) error, error) {
|
|
r, err := resource.Merge(
|
|
resource.Default(),
|
|
resource.NewWithAttributes(
|
|
semconv.SchemaURL,
|
|
semconv.ServiceName(name),
|
|
),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if v := env("XT_TRACER", ""); v == "stdout" {
|
|
traceExporter, err := stdouttrace.New(
|
|
stdouttrace.WithWriter(os.Stderr),
|
|
stdouttrace.WithPrettyPrint(),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tracerProvider := sdktrace.NewTracerProvider(
|
|
sdktrace.WithResource(r),
|
|
sdktrace.WithBatcher(traceExporter,
|
|
// Default is 5s. Set to 1s for demonstrative purposes.
|
|
sdktrace.WithBatchTimeout(time.Second)),
|
|
)
|
|
otel.SetTracerProvider(tracerProvider)
|
|
|
|
return tracerProvider.Shutdown, nil
|
|
} else if v != "" {
|
|
fmt.Println("use tracer", v)
|
|
exp, err := otlptracegrpc.New(
|
|
ctx,
|
|
otlptracegrpc.WithEndpoint(v),
|
|
otlptracegrpc.WithInsecure(),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tracerProvider := sdktrace.NewTracerProvider(
|
|
sdktrace.WithBatcher(exp),
|
|
sdktrace.WithResource(r),
|
|
)
|
|
otel.SetTracerProvider(tracerProvider)
|
|
return func(ctx context.Context) error {
|
|
return tracerProvider.Shutdown(ctx)
|
|
}, nil
|
|
}
|
|
|
|
return func(ctx context.Context) error { return nil }, nil
|
|
}
|
|
|
|
func newMeterProvider(ctx context.Context, name string) (func(context.Context) error, error) {
|
|
_, _ = ctx, name
|
|
// metricExporter, err := stdoutmetric.New()
|
|
// if err != nil {
|
|
// return nil, err
|
|
// }
|
|
|
|
// meterProvider := sdkmetric.NewMeterProvider(
|
|
// sdkmetric.WithReader(sdkmetric.NewPeriodicReader(metricExporter,
|
|
// // Default is 1m. Set to 3s for demonstrative purposes.
|
|
// sdkmetric.WithInterval(3*time.Second))),
|
|
// )
|
|
// otel.SetMeterProvider(meterProvider)
|
|
|
|
metricExporter, err := otelprom.New(
|
|
otelprom.WithRegisterer(prometheus.DefaultRegisterer),
|
|
|
|
// OTEL default buckets assume you're using milliseconds. Substitute defaults
|
|
// appropriate for units of seconds.
|
|
otelprom.WithAggregationSelector(func(ik sdkmetric.InstrumentKind) sdkmetric.Aggregation {
|
|
switch ik {
|
|
case sdkmetric.InstrumentKindHistogram:
|
|
return sdkmetric.AggregationExplicitBucketHistogram{
|
|
Boundaries: prometheus.DefBuckets,
|
|
NoMinMax: false,
|
|
}
|
|
default:
|
|
return sdkmetric.DefaultAggregationSelector(ik)
|
|
}
|
|
}),
|
|
)
|
|
|
|
p := sdkmetric.NewMeterProvider(
|
|
sdkmetric.WithReader(metricExporter),
|
|
)
|
|
|
|
otel.SetMeterProvider(p)
|
|
http.Handle("/metrics", promhttp.Handler())
|
|
return func(ctx context.Context) error { return nil }, err
|
|
}
|
|
|
|
func newLoggerProvider(ctx context.Context, name string) (func(context.Context) error, error) {
|
|
r, err := resource.Merge(
|
|
resource.Default(),
|
|
resource.NewWithAttributes(
|
|
semconv.SchemaURL,
|
|
semconv.ServiceName(name),
|
|
),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if v := env("XT_LOGGER", ""); v == "stdout" {
|
|
logExporter, err := stdoutlog.New(
|
|
stdoutlog.WithPrettyPrint(),
|
|
stdoutlog.WithWriter(os.Stderr),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
loggerProvider := log.NewLoggerProvider(
|
|
log.WithProcessor(
|
|
log.NewBatchProcessor(logExporter),
|
|
),
|
|
log.WithResource(r),
|
|
)
|
|
global.SetLoggerProvider(loggerProvider)
|
|
|
|
return loggerProvider.Shutdown, nil
|
|
} else if v != "" {
|
|
fmt.Println("use logger", v)
|
|
|
|
exp, err := otlploghttp.New(
|
|
ctx,
|
|
otlploghttp.WithInsecure(),
|
|
otlploghttp.WithEndpointURL(v),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
processor := log.NewBatchProcessor(exp)
|
|
provider := log.NewLoggerProvider(
|
|
log.WithProcessor(processor),
|
|
log.WithResource(r),
|
|
)
|
|
global.SetLoggerProvider(provider)
|
|
|
|
return processor.Shutdown, nil
|
|
}
|
|
|
|
return func(ctx context.Context) error { return nil }, nil
|
|
}
|
|
|
|
func env(key, def string) string {
|
|
if v, ok := os.LookupEnv(key); ok {
|
|
return v
|
|
}
|
|
return def
|
|
}
|