mirror of
https://github.com/SigNoz/signoz.git
synced 2025-12-28 13:34:18 +00:00
Compare commits
1 Commits
update-PR-
...
feat/sso-a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9e3078383e |
@@ -2,6 +2,7 @@ package oidccallbackauthn
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/authn"
|
||||
@@ -20,7 +21,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
scopes []string = []string{"email", oidc.ScopeOpenID}
|
||||
scopes []string = []string{"email", "profile", oidc.ScopeOpenID}
|
||||
)
|
||||
|
||||
var _ authn.CallbackAuthN = (*AuthN)(nil)
|
||||
@@ -126,7 +127,39 @@ func (a *AuthN) HandleCallback(ctx context.Context, query url.Values) (*authtype
|
||||
}
|
||||
}
|
||||
|
||||
return authtypes.NewCallbackIdentity("", email, authDomain.StorableAuthDomain().OrgID, state), nil
|
||||
// DEBUG: Print all assertion values to see what's being received
|
||||
fmt.Printf("\n=== DEBUG: All assertion values ===\n")
|
||||
for key, attr := range claims {
|
||||
fmt.Printf(" Key: %q, Value: %v\n", key, attr)
|
||||
}
|
||||
fmt.Printf("=================================\n\n")
|
||||
|
||||
name := ""
|
||||
if nameClaim := authDomain.AuthDomainConfig().OIDC.ClaimMapping.Name; nameClaim != "" {
|
||||
if n, ok := claims[nameClaim].(string); ok {
|
||||
name = n
|
||||
}
|
||||
}
|
||||
|
||||
var groups []string
|
||||
if groupsClaim := authDomain.AuthDomainConfig().OIDC.ClaimMapping.Groups; groupsClaim != "" {
|
||||
if g, ok := claims[groupsClaim].([]interface{}); ok {
|
||||
for _, group := range g {
|
||||
if gs, ok := group.(string); ok {
|
||||
groups = append(groups, gs)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
role := ""
|
||||
if roleClaim := authDomain.AuthDomainConfig().OIDC.ClaimMapping.Role; roleClaim != "" {
|
||||
if r, ok := claims[roleClaim].(string); ok {
|
||||
role = r
|
||||
}
|
||||
}
|
||||
|
||||
return authtypes.NewCallbackIdentity(name, email, authDomain.StorableAuthDomain().OrgID, state, groups, role), nil
|
||||
}
|
||||
|
||||
func (a *AuthN) ProviderInfo(ctx context.Context, authDomain *authtypes.AuthDomain) *authtypes.AuthNProviderInfo {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"crypto/x509"
|
||||
"encoding/base64"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
@@ -87,6 +88,14 @@ func (a *AuthN) HandleCallback(ctx context.Context, formValues url.Values) (*aut
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// DEBUG: Print all assertion values to see what's being received
|
||||
fmt.Printf("\n=== DEBUG: All assertion values ===\n")
|
||||
for key, attr := range assertionInfo.Values {
|
||||
fmt.Printf(" Key: %q, Name: %q, FriendlyName: %q, Values: %v\n",
|
||||
key, attr.Name, attr.FriendlyName, attr.Values)
|
||||
}
|
||||
fmt.Printf("=================================\n\n")
|
||||
|
||||
if assertionInfo.WarningInfo.InvalidTime {
|
||||
return nil, errors.New(errors.TypeForbidden, errors.CodeForbidden, "saml: expired saml response")
|
||||
}
|
||||
@@ -96,7 +105,30 @@ func (a *AuthN) HandleCallback(ctx context.Context, formValues url.Values) (*aut
|
||||
return nil, errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "saml: invalid email").WithAdditional("The nameID assertion is used to retrieve the email address, please check your IDP configuration and try again.")
|
||||
}
|
||||
|
||||
return authtypes.NewCallbackIdentity("", email, authDomain.StorableAuthDomain().OrgID, state), nil
|
||||
name := ""
|
||||
var groups []string
|
||||
role := ""
|
||||
|
||||
attributeMapping := authDomain.AuthDomainConfig().SAML.AttributeMapping
|
||||
if attributeMapping != nil {
|
||||
if attributeMapping.Name != "" {
|
||||
if val := assertionInfo.Values.Get(attributeMapping.Name); val != "" {
|
||||
name = val
|
||||
}
|
||||
}
|
||||
|
||||
if attributeMapping.Groups != "" {
|
||||
groups = assertionInfo.Values.GetAll(attributeMapping.Groups)
|
||||
}
|
||||
|
||||
if attributeMapping.Role != "" {
|
||||
if val := assertionInfo.Values.Get(attributeMapping.Role); val != "" {
|
||||
role = val
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return authtypes.NewCallbackIdentity(name, email, authDomain.StorableAuthDomain().OrgID, state, groups, role), nil
|
||||
}
|
||||
|
||||
func (a *AuthN) ProviderInfo(ctx context.Context, authDomain *authtypes.AuthDomain) *authtypes.AuthNProviderInfo {
|
||||
|
||||
2
go.mod
2
go.mod
@@ -341,7 +341,7 @@ require (
|
||||
golang.org/x/time v0.11.0 // indirect
|
||||
golang.org/x/tools v0.36.0 // indirect
|
||||
gonum.org/v1/gonum v0.16.0 // indirect
|
||||
google.golang.org/api v0.236.0 // indirect
|
||||
google.golang.org/api v0.236.0
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect
|
||||
google.golang.org/grpc v1.75.1 // indirect
|
||||
|
||||
2
go.sum
2
go.sum
@@ -1713,6 +1713,8 @@ google.golang.org/genproto v0.0.0-20220421151946-72621c1f0bd3/go.mod h1:8w6bsBMX
|
||||
google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
|
||||
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
|
||||
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
|
||||
google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2 h1:1tXaIXCracvtsRxSBsYDiSBN0cuJvM7QYW+MrpIRY78=
|
||||
google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:49MsLSx0oWMOZqcpB3uL8ZOkAh1+TndpJ8ONoCBWiZk=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE=
|
||||
|
||||
@@ -3,6 +3,7 @@ package googlecallbackauthn
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/authn"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
@@ -10,6 +11,9 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
"github.com/coreos/go-oidc/v3/oidc"
|
||||
"golang.org/x/oauth2"
|
||||
"golang.org/x/oauth2/google"
|
||||
admin "google.golang.org/api/admin/directory/v1"
|
||||
"google.golang.org/api/option"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -18,7 +22,7 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
scopes []string = []string{"email"}
|
||||
scopes []string = []string{"email", "profile"}
|
||||
)
|
||||
|
||||
var _ authn.CallbackAuthN = (*AuthN)(nil)
|
||||
@@ -113,8 +117,22 @@ func (a *AuthN) HandleCallback(ctx context.Context, query url.Values) (*authtype
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "google: failed to parse email").WithAdditional(err.Error())
|
||||
}
|
||||
|
||||
return authtypes.NewCallbackIdentity(claims.Name, email, authDomain.StorableAuthDomain().OrgID, state), nil
|
||||
var groups []string
|
||||
if authDomain.AuthDomainConfig().Google.FetchGroups {
|
||||
groups, err = a.fetchGoogleWorkspaceGroups(ctx, claims.Email, authDomain.AuthDomainConfig().Google)
|
||||
if err != nil {
|
||||
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "google: could not fetch groups").WithAdditional(err.Error())
|
||||
}
|
||||
|
||||
if len(authDomain.AuthDomainConfig().Google.AllowedGroups) > 0 {
|
||||
groups = filterGroups(groups, authDomain.AuthDomainConfig().Google.AllowedGroups)
|
||||
if len(groups) == 0 {
|
||||
return nil, errors.Newf(errors.TypeForbidden, errors.CodeForbidden, "google: user %q is not in any allowed groups", claims.Email)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return authtypes.NewCallbackIdentity(claims.Name, email, authDomain.StorableAuthDomain().OrgID, state, groups, ""), nil
|
||||
}
|
||||
|
||||
func (a *AuthN) ProviderInfo(ctx context.Context, authDomain *authtypes.AuthDomain) *authtypes.AuthNProviderInfo {
|
||||
@@ -136,3 +154,84 @@ func (a *AuthN) oauth2Config(siteURL *url.URL, authDomain *authtypes.AuthDomain,
|
||||
}).String(),
|
||||
}
|
||||
}
|
||||
|
||||
func (a *AuthN) fetchGoogleWorkspaceGroups(ctx context.Context, userEmail string, config *authtypes.GoogleConfig) ([]string, error) {
|
||||
adminEmail := config.GetAdminEmailForDomain(userEmail)
|
||||
if adminEmail == "" {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "no admin email configured for domain of %s", userEmail)
|
||||
}
|
||||
|
||||
jwtConfig, err := google.JWTConfigFromJSON([]byte(config.ServiceAccountJSON), admin.AdminDirectoryGroupReadonlyScope)
|
||||
if err != nil {
|
||||
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid service account credentials").WithAdditional(err.Error())
|
||||
}
|
||||
|
||||
jwtConfig.Subject = adminEmail
|
||||
|
||||
adminService, err := admin.NewService(ctx, option.WithHTTPClient(jwtConfig.Client(ctx)))
|
||||
if err != nil {
|
||||
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "unable to create directory service").WithAdditional(err.Error())
|
||||
}
|
||||
|
||||
checkedGroups := make(map[string]struct{})
|
||||
|
||||
return a.getGroups(adminService, userEmail, config.FetchTransitiveGroupMembership, checkedGroups)
|
||||
}
|
||||
|
||||
// Recursive method
|
||||
func (a *AuthN) getGroups(adminService *admin.Service, userEmail string, fetchTransitive bool, checkedGroups map[string]struct{}) ([]string, error) {
|
||||
var userGroups []string
|
||||
var pageToken string
|
||||
|
||||
for {
|
||||
call := adminService.Groups.List().UserKey(userEmail)
|
||||
if pageToken != "" {
|
||||
call = call.PageToken(pageToken)
|
||||
}
|
||||
|
||||
groupList, err := call.Do()
|
||||
if err != nil {
|
||||
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "unable to list groups").WithAdditional(err.Error())
|
||||
}
|
||||
|
||||
for _, group := range groupList.Groups {
|
||||
if _, exists := checkedGroups[group.Email]; exists {
|
||||
continue
|
||||
}
|
||||
|
||||
checkedGroups[group.Email] = struct{}{}
|
||||
userGroups = append(userGroups, group.Email)
|
||||
|
||||
if fetchTransitive {
|
||||
transitiveGroups, err := a.getGroups(adminService, group.Email, fetchTransitive, checkedGroups)
|
||||
if err != nil {
|
||||
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "unable to list transitive groups").WithAdditional(err.Error())
|
||||
}
|
||||
userGroups = append(userGroups, transitiveGroups...)
|
||||
}
|
||||
}
|
||||
|
||||
pageToken = groupList.NextPageToken
|
||||
if pageToken == "" {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return userGroups, nil
|
||||
}
|
||||
|
||||
func filterGroups(userGroups, allowedGroups []string) []string {
|
||||
allowed := make(map[string]struct{}, len(allowedGroups))
|
||||
for _, g := range allowedGroups {
|
||||
allowed[strings.ToLower(g)] = struct{}{} // just to make o(1) searches
|
||||
}
|
||||
|
||||
var filtered []string
|
||||
for _, g := range userGroups {
|
||||
if _, ok := allowed[strings.ToLower(g)]; ok {
|
||||
filtered = append(filtered, g)
|
||||
}
|
||||
}
|
||||
|
||||
return filtered
|
||||
}
|
||||
|
||||
@@ -112,7 +112,7 @@ func (b *base) WithUrl(u string) *base {
|
||||
}
|
||||
}
|
||||
|
||||
// WithUrl adds additional messages to the base error and returns a new base error.
|
||||
// WithAdditional adds additional messages to the base error and returns a new base error.
|
||||
func (b *base) WithAdditional(a ...string) *base {
|
||||
return &base{
|
||||
t: b.t,
|
||||
|
||||
@@ -2,6 +2,7 @@ package implsession
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"slices"
|
||||
"strings"
|
||||
@@ -123,7 +124,7 @@ func (module *module) DeprecatedCreateSessionByEmailPassword(ctx context.Context
|
||||
}
|
||||
|
||||
if !factorPassword.Equals(password) {
|
||||
return nil, errors.New(errors.TypeUnauthenticated, types.ErrCodeIncorrectPassword, "invalid email orpassword")
|
||||
return nil, errors.New(errors.TypeUnauthenticated, types.ErrCodeIncorrectPassword, "invalid email or password")
|
||||
}
|
||||
|
||||
identity := authtypes.NewIdentity(users[0].ID, users[0].OrgID, users[0].Email, users[0].Role)
|
||||
@@ -151,13 +152,33 @@ func (module *module) CreateCallbackAuthNSession(ctx context.Context, authNProvi
|
||||
return "", err
|
||||
}
|
||||
|
||||
module.settings.Logger().InfoContext(ctx, "$$$$$$$$$$$$$$ callback values %%%%%%%%%%%%%%", "values", values)
|
||||
|
||||
callbackIdentity, err := callbackAuthN.HandleCallback(ctx, values)
|
||||
if err != nil {
|
||||
module.settings.Logger().ErrorContext(ctx, "failed to handle callback", "error", err, "authn_provider", authNProvider)
|
||||
return "", err
|
||||
}
|
||||
|
||||
user, err := types.NewUser(callbackIdentity.Name, callbackIdentity.Email, types.RoleViewer, callbackIdentity.OrgID)
|
||||
authDomain, err := module.authDomain.GetByOrgIDAndID(ctx, callbackIdentity.OrgID, callbackIdentity.State.DomainID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
roleMapping := authDomain.AuthDomainConfig().RoleMapping
|
||||
|
||||
// DEBUG: Add this line to print the callback identity
|
||||
module.settings.Logger().InfoContext(ctx, "$$$$$$$$$$$$$$$$ DEBUG callback identity ###############",
|
||||
"email", callbackIdentity.Email,
|
||||
"name", callbackIdentity.Name,
|
||||
"groups", callbackIdentity.Groups,
|
||||
"role", callbackIdentity.Role,
|
||||
"orgID", callbackIdentity.OrgID,
|
||||
)
|
||||
|
||||
role := resolveRole(callbackIdentity, roleMapping)
|
||||
|
||||
user, err := types.NewUser(callbackIdentity.Name, callbackIdentity.Email, role, callbackIdentity.OrgID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -231,3 +252,59 @@ func getProvider[T authn.AuthN](authNProvider authtypes.AuthNProvider, authNs ma
|
||||
|
||||
return provider, nil
|
||||
}
|
||||
|
||||
func resolveRole(callbackIdentity *authtypes.CallbackIdentity, roleMapping *authtypes.RoleMapping) types.Role {
|
||||
|
||||
// DEBUG: Print all assertion values to see what's being received
|
||||
fmt.Printf("\n=== DEBUG: callback identity values ===\n")
|
||||
fmt.Printf("callbackIdentity.Name - %s", callbackIdentity.Name)
|
||||
fmt.Printf("callbackIdentity.Role - %s", callbackIdentity.Role)
|
||||
fmt.Printf("=================================\n\n")
|
||||
|
||||
if roleMapping == nil {
|
||||
return types.RoleViewer
|
||||
}
|
||||
|
||||
if roleMapping.UseRoleAttribute && callbackIdentity.Role != "" {
|
||||
if role, err := types.NewRole(strings.ToUpper(callbackIdentity.Role)); err == nil {
|
||||
return role
|
||||
}
|
||||
}
|
||||
|
||||
if len(roleMapping.GroupMappings) > 0 && len(callbackIdentity.Groups) > 0 {
|
||||
highestRole := types.RoleViewer
|
||||
found := false
|
||||
|
||||
for _, group := range callbackIdentity.Groups {
|
||||
if mappedRole, exists := roleMapping.GroupMappings[group]; exists {
|
||||
found = true
|
||||
if role, err := types.NewRole(strings.ToUpper(mappedRole)); err == nil {
|
||||
if compareRoles(role, highestRole) > 0 {
|
||||
highestRole = role
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
return highestRole
|
||||
}
|
||||
}
|
||||
|
||||
if roleMapping.DefaultRole != "" {
|
||||
if role, err := types.NewRole(strings.ToUpper(roleMapping.DefaultRole)); err == nil {
|
||||
return role
|
||||
}
|
||||
}
|
||||
|
||||
return types.RoleViewer
|
||||
}
|
||||
|
||||
func compareRoles(a, b types.Role) int {
|
||||
order := map[types.Role]int{
|
||||
types.RoleViewer: 0,
|
||||
types.RoleEditor: 1,
|
||||
types.RoleAdmin: 2,
|
||||
}
|
||||
return order[a] - order[b]
|
||||
}
|
||||
|
||||
@@ -32,10 +32,12 @@ type Identity struct {
|
||||
}
|
||||
|
||||
type CallbackIdentity struct {
|
||||
Name string `json:"name"`
|
||||
Email valuer.Email `json:"email"`
|
||||
OrgID valuer.UUID `json:"orgId"`
|
||||
State State `json:"state"`
|
||||
Name string `json:"name"`
|
||||
Email valuer.Email `json:"email"`
|
||||
OrgID valuer.UUID `json:"orgId"`
|
||||
State State `json:"state"`
|
||||
Groups []string `json:"groups,omitempty"`
|
||||
Role string `json:"role,omitempty"`
|
||||
}
|
||||
|
||||
type State struct {
|
||||
@@ -85,12 +87,14 @@ func NewIdentity(userID valuer.UUID, orgID valuer.UUID, email valuer.Email, role
|
||||
}
|
||||
}
|
||||
|
||||
func NewCallbackIdentity(name string, email valuer.Email, orgID valuer.UUID, state State) *CallbackIdentity {
|
||||
func NewCallbackIdentity(name string, email valuer.Email, orgID valuer.UUID, state State, groups []string, role string) *CallbackIdentity {
|
||||
return &CallbackIdentity{
|
||||
Name: name,
|
||||
Email: email,
|
||||
OrgID: orgID,
|
||||
State: state,
|
||||
Name: name,
|
||||
Email: email,
|
||||
OrgID: orgID,
|
||||
State: state,
|
||||
Groups: groups,
|
||||
Role: role,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -63,6 +63,7 @@ type AuthDomainConfig struct {
|
||||
SAML *SamlConfig `json:"samlConfig"`
|
||||
Google *GoogleConfig `json:"googleAuthConfig"`
|
||||
OIDC *OIDCConfig `json:"oidcConfig"`
|
||||
RoleMapping *RoleMapping `json:"roleMapping"`
|
||||
}
|
||||
|
||||
type AuthDomain struct {
|
||||
|
||||
@@ -2,10 +2,13 @@ package authtypes
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
)
|
||||
|
||||
const wildCardDomain = "*"
|
||||
|
||||
type GoogleConfig struct {
|
||||
// ClientID is the application's ID. For example, 292085223830.apps.googleusercontent.com.
|
||||
ClientID string `json:"clientId"`
|
||||
@@ -15,6 +18,27 @@ type GoogleConfig struct {
|
||||
|
||||
// What is the meaning of this? Should we remove this?
|
||||
RedirectURI string `json:"redirectURI"`
|
||||
|
||||
// Whether to fetch the Google workspace groups (required additional API scopes)
|
||||
FetchGroups bool `json:"fetchGroups"`
|
||||
|
||||
// Service Account creds JSON stored for Google Admin SDK access
|
||||
// This is content of the JSON file stored directly into db as string
|
||||
// Required if FetchGroups is true (unless running on GCE with default credentials)
|
||||
ServiceAccountJSON string `json:"serviceAccountJson,omitempty"`
|
||||
|
||||
// Map of workspace domain to admin email for service account impersonation
|
||||
// The service account will impersonate this admin to call the directory API
|
||||
// Use "*" as key for wildcard/default that matches any domain
|
||||
// Example: {"example.com": "admin@exmaple.com", "*": "fallbackadmin@company.com"}
|
||||
DomainToAdminEmail map[string]string `json:"adminEmail,omitempty"`
|
||||
|
||||
// If true, fetch transitive group membership (recursive - groups that contains other groups)
|
||||
FetchTransitiveGroupMembership bool `json:"fetchTransitiveGroupMembership,omitempty"`
|
||||
|
||||
// Optional list of allowed groups
|
||||
// If this is present, only users belonging to one of these groups will be allowed to login
|
||||
AllowedGroups []string `json:"allowedGroups,omitempty"`
|
||||
}
|
||||
|
||||
func (config *GoogleConfig) UnmarshalJSON(data []byte) error {
|
||||
@@ -33,6 +57,33 @@ func (config *GoogleConfig) UnmarshalJSON(data []byte) error {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "clientSecret is required")
|
||||
}
|
||||
|
||||
if temp.FetchGroups {
|
||||
if len(temp.DomainToAdminEmail) == 0 {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "domainToAdminEmail is required if fetchGroups is true")
|
||||
}
|
||||
|
||||
if temp.ServiceAccountJSON == "" {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "serviceAccountJSON is required if fetchGroups is true")
|
||||
}
|
||||
}
|
||||
|
||||
*config = GoogleConfig(temp)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (config *GoogleConfig) GetAdminEmailForDomain(userEmail string) string {
|
||||
domain := extractDomainFromEmail(userEmail)
|
||||
|
||||
if adminEmail, ok := config.DomainToAdminEmail[domain]; ok {
|
||||
return adminEmail
|
||||
}
|
||||
|
||||
return config.DomainToAdminEmail[wildCardDomain]
|
||||
}
|
||||
|
||||
func extractDomainFromEmail(email string) string {
|
||||
if at := strings.LastIndex(email, "@"); at >= 0 {
|
||||
return email[at+1:]
|
||||
}
|
||||
return wildCardDomain
|
||||
}
|
||||
|
||||
@@ -34,6 +34,12 @@ type OIDCConfig struct {
|
||||
type ClaimMapping struct {
|
||||
// Configurable key which contains the email claims. Defaults to "email"
|
||||
Email string `json:"email"`
|
||||
// Configuration key which contains the name. Defaults to "name"
|
||||
Name string `json:"name"`
|
||||
// Configuration key which contains the name. Defaults to "groups" (Optional)
|
||||
Groups string `json:"groups"`
|
||||
// Configuration key which contains the name. Defaults to "role" (Optional)
|
||||
Role string `json:"role"`
|
||||
}
|
||||
|
||||
func (config *OIDCConfig) UnmarshalJSON(data []byte) error {
|
||||
|
||||
10
pkg/types/authtypes/rolemapping.go
Normal file
10
pkg/types/authtypes/rolemapping.go
Normal file
@@ -0,0 +1,10 @@
|
||||
package authtypes
|
||||
|
||||
type RoleMapping struct {
|
||||
// Default role any new SSO users. Defaults to "VIEWER"
|
||||
DefaultRole string `json:"defaultRole"`
|
||||
// Map of IDP group names to SigNoz roles. Key is group name, value is SigNoz role
|
||||
GroupMappings map[string]string `json:"groupMappings"`
|
||||
// If true, use the role claim directly from IDP instead of group mappings
|
||||
UseRoleAttribute bool `json:"useRoleAttribute"`
|
||||
}
|
||||
@@ -20,6 +20,18 @@ type SamlConfig struct {
|
||||
// For providers like jumpcloud, this should be set to true.
|
||||
// Note: This is the reverse of WantAuthnRequestsSigned. If WantAuthnRequestsSigned is false, then InsecureSkipAuthNRequestsSigned should be true.
|
||||
InsecureSkipAuthNRequestsSigned bool `json:"insecureSkipAuthNRequestsSigned"`
|
||||
|
||||
// Mapping of SAML assertion attributes
|
||||
AttributeMapping *SamlAttributeMapping `json:"samlAttributeMapping"`
|
||||
}
|
||||
|
||||
type SamlAttributeMapping struct {
|
||||
// SAML attribute name for display name
|
||||
Name string `json:"name"`
|
||||
// SAML attribute name for groups
|
||||
Groups string `json:"groups"`
|
||||
// SAML attribute name for direct role
|
||||
Role string `json:"role"`
|
||||
}
|
||||
|
||||
func (config *SamlConfig) UnmarshalJSON(data []byte) error {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Any, Callable, Dict
|
||||
from typing import Any, Callable, Dict, List
|
||||
from urllib.parse import urljoin
|
||||
from xml.etree import ElementTree
|
||||
|
||||
@@ -26,6 +26,15 @@ def create_saml_client(
|
||||
realm_name="master",
|
||||
)
|
||||
|
||||
# DELETE existing client if it exists (to ensure mappers are updated)
|
||||
saml_client_id = f"{signoz.self.host_configs['8080'].address}:{signoz.self.host_configs['8080'].port}"
|
||||
try:
|
||||
existing_client_id = client.get_client_id(client_id=saml_client_id)
|
||||
if existing_client_id:
|
||||
client.delete_client(existing_client_id)
|
||||
except Exception:
|
||||
pass # Client doesn't exist, that's fine
|
||||
|
||||
client.create_client(
|
||||
skip_exists=True,
|
||||
payload={
|
||||
@@ -114,6 +123,31 @@ def create_saml_client(
|
||||
"attribute.name": "Role",
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "groups",
|
||||
"protocol": "saml",
|
||||
"protocolMapper": "saml-group-membership-mapper",
|
||||
"consentRequired": False,
|
||||
"config": {
|
||||
"full.path": "false",
|
||||
"attribute.nameformat": "Basic",
|
||||
"single": "true", # ! this was changed to true as we need the groups in the single attribute section
|
||||
"friendly.name": "groups",
|
||||
"attribute.name": "groups",
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "role attribute",
|
||||
"protocol": "saml",
|
||||
"protocolMapper": "saml-user-attribute-mapper",
|
||||
"consentRequired": False,
|
||||
"config": {
|
||||
"attribute.nameformat": "Basic",
|
||||
"user.attribute": "signoz_role",
|
||||
"friendly.name": "signoz_role",
|
||||
"attribute.name": "signoz_role",
|
||||
},
|
||||
},
|
||||
],
|
||||
"defaultClientScopes": ["saml_organization", "role_list"],
|
||||
"optionalClientScopes": [],
|
||||
@@ -163,6 +197,16 @@ def create_oidc_client(
|
||||
realm_name="master",
|
||||
)
|
||||
|
||||
_ensure_groups_client_scope(client)
|
||||
|
||||
# DELETE existing client if it exists (to ensure redirect URIs are updated)
|
||||
try:
|
||||
existing_client_id = client.get_client_id(client_id=client_id)
|
||||
if existing_client_id:
|
||||
client.delete_client(existing_client_id)
|
||||
except Exception:
|
||||
pass # Client doesn't exist, that's fine
|
||||
|
||||
client.create_client(
|
||||
skip_exists=True,
|
||||
payload={
|
||||
@@ -208,6 +252,7 @@ def create_oidc_client(
|
||||
"profile",
|
||||
"basic",
|
||||
"email",
|
||||
"groups",
|
||||
],
|
||||
"optionalClientScopes": [
|
||||
"address",
|
||||
@@ -333,3 +378,248 @@ def idp_login(driver: webdriver.Chrome) -> Callable[[str, str], None]:
|
||||
wait.until(EC.invisibility_of_element((By.ID, "kc-login")))
|
||||
|
||||
return _idp_login
|
||||
|
||||
|
||||
@pytest.fixture(name="create_group_idp", scope="function")
|
||||
def create_group_idp(idp: types.TestContainerIDP) -> Callable[[str], str]:
|
||||
"""Creates a group in Keycloak IDP."""
|
||||
client = KeycloakAdmin(
|
||||
server_url=idp.container.host_configs["6060"].base(),
|
||||
username=IDP_ROOT_USERNAME,
|
||||
password=IDP_ROOT_PASSWORD,
|
||||
realm_name="master",
|
||||
)
|
||||
|
||||
created_groups = []
|
||||
|
||||
def _create_group_idp(group_name: str) -> str:
|
||||
group_id = client.create_group({"name": group_name}, skip_exists=True)
|
||||
created_groups.append(group_id)
|
||||
return group_id
|
||||
|
||||
yield _create_group_idp
|
||||
|
||||
for group_id in created_groups:
|
||||
try:
|
||||
client.delete_group(group_id)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(name="create_user_idp_with_groups", scope="function")
|
||||
def create_user_idp_with_groups(
|
||||
idp: types.TestContainerIDP,
|
||||
create_group_idp: Callable[[str], str],
|
||||
) -> Callable[[str, str, bool, List[str]], None]:
|
||||
"""Creates a user in Keycloak IDP with specified groups."""
|
||||
client = KeycloakAdmin(
|
||||
server_url=idp.container.host_configs["6060"].base(),
|
||||
username=IDP_ROOT_USERNAME,
|
||||
password=IDP_ROOT_PASSWORD,
|
||||
realm_name="master",
|
||||
)
|
||||
|
||||
created_users = []
|
||||
|
||||
def _create_user_idp_with_groups(
|
||||
email: str, password: str, verified: bool, groups: List[str]
|
||||
) -> None:
|
||||
# Create groups first
|
||||
group_ids = []
|
||||
for group_name in groups:
|
||||
group_id = create_group_idp(group_name)
|
||||
group_ids.append(group_id)
|
||||
|
||||
# Create user
|
||||
user_id = client.create_user(
|
||||
exist_ok=False,
|
||||
payload={
|
||||
"username": email,
|
||||
"email": email,
|
||||
"enabled": True,
|
||||
"emailVerified": verified,
|
||||
},
|
||||
)
|
||||
client.set_user_password(user_id, password, temporary=False)
|
||||
created_users.append(user_id)
|
||||
|
||||
# Add user to groups
|
||||
for group_id in group_ids:
|
||||
client.group_user_add(user_id, group_id)
|
||||
|
||||
yield _create_user_idp_with_groups
|
||||
|
||||
for user_id in created_users:
|
||||
try:
|
||||
break
|
||||
client.delete_user(user_id)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(name="add_user_to_group", scope="function")
|
||||
def add_user_to_group(
|
||||
idp: types.TestContainerIDP,
|
||||
create_group_idp: Callable[[str], str],
|
||||
) -> Callable[[str, str], None]:
|
||||
"""Adds an existing user to a group."""
|
||||
client = KeycloakAdmin(
|
||||
server_url=idp.container.host_configs["6060"].base(),
|
||||
username=IDP_ROOT_USERNAME,
|
||||
password=IDP_ROOT_PASSWORD,
|
||||
realm_name="master",
|
||||
)
|
||||
|
||||
def _add_user_to_group(email: str, group_name: str) -> None:
|
||||
user_id = client.get_user_id(email)
|
||||
group_id = create_group_idp(group_name)
|
||||
client.group_user_add(user_id, group_id)
|
||||
|
||||
return _add_user_to_group
|
||||
|
||||
|
||||
@pytest.fixture(name="create_user_idp_with_role", scope="function")
|
||||
def create_user_idp_with_role(
|
||||
idp: types.TestContainerIDP,
|
||||
create_group_idp: Callable[[str], str],
|
||||
) -> Callable[[str, str, bool, str, List[str]], None]:
|
||||
"""Creates a user in Keycloak IDP with a custom role attribute and optional groups."""
|
||||
client = KeycloakAdmin(
|
||||
server_url=idp.container.host_configs["6060"].base(),
|
||||
username=IDP_ROOT_USERNAME,
|
||||
password=IDP_ROOT_PASSWORD,
|
||||
realm_name="master",
|
||||
)
|
||||
|
||||
created_users = []
|
||||
|
||||
def _create_user_idp_with_role(
|
||||
email: str, password: str, verified: bool, role: str, groups: List[str]
|
||||
) -> None:
|
||||
# Create groups first
|
||||
group_ids = []
|
||||
for group_name in groups:
|
||||
group_id = create_group_idp(group_name)
|
||||
group_ids.append(group_id)
|
||||
|
||||
# Create user with role attribute
|
||||
user_id = client.create_user(
|
||||
exist_ok=False,
|
||||
payload={
|
||||
"username": email,
|
||||
"email": email,
|
||||
"enabled": True,
|
||||
"emailVerified": verified,
|
||||
"attributes": {
|
||||
"signoz_role": role,
|
||||
},
|
||||
},
|
||||
)
|
||||
client.set_user_password(user_id, password, temporary=False)
|
||||
created_users.append(user_id)
|
||||
|
||||
# Add user to groups
|
||||
for group_id in group_ids:
|
||||
client.group_user_add(user_id, group_id)
|
||||
|
||||
yield _create_user_idp_with_role
|
||||
|
||||
for user_id in created_users:
|
||||
try:
|
||||
break
|
||||
client.delete_user(user_id)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(name="setup_user_profile", scope="package")
|
||||
def setup_user_profile(idp: types.TestContainerIDP) -> Callable[[], None]:
|
||||
"""Setup Keycloak User Profile with signoz_role attribute."""
|
||||
def _setup_user_profile() -> None:
|
||||
client = KeycloakAdmin(
|
||||
server_url=idp.container.host_configs["6060"].base(),
|
||||
username=IDP_ROOT_USERNAME,
|
||||
password=IDP_ROOT_PASSWORD,
|
||||
realm_name="master",
|
||||
)
|
||||
|
||||
# Get current user profile config
|
||||
profile = client.get_realm_users_profile()
|
||||
|
||||
# Check if signoz_role attribute already exists
|
||||
attributes = profile.get("attributes", [])
|
||||
signoz_role_exists = any(attr.get("name") == "signoz_role" for attr in attributes)
|
||||
|
||||
if not signoz_role_exists:
|
||||
# Add signoz_role attribute to user profile
|
||||
attributes.append({
|
||||
"name": "signoz_role",
|
||||
"displayName": "SigNoz Role",
|
||||
"validations": {},
|
||||
"annotations": {},
|
||||
# "required": {
|
||||
# "roles": [] # Not required
|
||||
# },
|
||||
"permissions": {
|
||||
"view": ["admin", "user"],
|
||||
"edit": ["admin"]
|
||||
},
|
||||
"multivalued": False
|
||||
})
|
||||
profile["attributes"] = attributes
|
||||
|
||||
# Update the realm user profile
|
||||
client.update_realm_users_profile(payload=profile)
|
||||
|
||||
return _setup_user_profile
|
||||
|
||||
|
||||
def _ensure_groups_client_scope(client: KeycloakAdmin) -> None:
|
||||
"""Create 'groups' client scope if it doesn't exist."""
|
||||
# Check if groups scope exists
|
||||
scopes = client.get_client_scopes()
|
||||
groups_scope_exists = any(s.get("name") == "groups" for s in scopes)
|
||||
|
||||
if not groups_scope_exists:
|
||||
# Create the groups client scope
|
||||
client.create_client_scope(
|
||||
payload={
|
||||
"name": "groups",
|
||||
"description": "Group membership",
|
||||
"protocol": "openid-connect",
|
||||
"attributes": {
|
||||
"include.in.token.scope": "true",
|
||||
"display.on.consent.screen": "true",
|
||||
},
|
||||
"protocolMappers": [
|
||||
{
|
||||
"name": "groups",
|
||||
"protocol": "openid-connect",
|
||||
"protocolMapper": "oidc-group-membership-mapper",
|
||||
"consentRequired": False,
|
||||
"config": {
|
||||
"full.path": "false",
|
||||
"id.token.claim": "true",
|
||||
"access.token.claim": "true",
|
||||
"claim.name": "groups",
|
||||
"userinfo.token.claim": "true",
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "signoz_role",
|
||||
"protocol": "openid-connect",
|
||||
"protocolMapper": "oidc-usermodel-attribute-mapper",
|
||||
"consentRequired": False,
|
||||
"config": {
|
||||
"user.attribute": "signoz_role",
|
||||
"id.token.claim": "true",
|
||||
"access.token.claim": "true",
|
||||
"claim.name": "signoz_role",
|
||||
"userinfo.token.claim": "true",
|
||||
"jsonType.label": "String",
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
skip_exists=True,
|
||||
)
|
||||
|
||||
@@ -4,6 +4,7 @@ from typing import Any, Callable, Dict, List
|
||||
import requests
|
||||
from selenium import webdriver
|
||||
from wiremock.resources.mappings import Mapping
|
||||
import uuid
|
||||
|
||||
from fixtures.auth import (
|
||||
USER_ADMIN_EMAIL,
|
||||
@@ -195,3 +196,437 @@ def test_idp_initiated_saml_authn(
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
|
||||
|
||||
def _get_saml_domain(signoz: SigNoz, admin_token: str) -> dict:
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/domains"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
return next(
|
||||
(
|
||||
domain
|
||||
for domain in response.json()["data"]
|
||||
if domain["name"] == "saml.integration.test"
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
|
||||
def _get_user_by_email(signoz: SigNoz, admin_token: str, email: str) -> dict:
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
return next(
|
||||
(user for user in response.json()["data"] if user["email"] == email),
|
||||
None,
|
||||
)
|
||||
|
||||
|
||||
def _perform_saml_login(
|
||||
signoz: SigNoz,
|
||||
driver: webdriver.Chrome,
|
||||
get_session_context: Callable[[str], str],
|
||||
idp_login: Callable[[str, str], None],
|
||||
email: str,
|
||||
password: str,
|
||||
) -> None:
|
||||
session_context = get_session_context(email)
|
||||
url = session_context["orgs"][0]["authNSupport"]["callback"][0]["url"]
|
||||
driver.get(url)
|
||||
idp_login(email, password)
|
||||
|
||||
|
||||
def test_saml_update_domain_with_group_mappings(
|
||||
signoz: SigNoz,
|
||||
get_token: Callable[[str, str], str],
|
||||
get_saml_settings: Callable[[], dict],
|
||||
) -> None:
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
domain = _get_saml_domain(signoz, admin_token)
|
||||
settings = get_saml_settings()
|
||||
|
||||
# update the existing saml domain to have role mappings also
|
||||
response = requests.put(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
|
||||
json={
|
||||
"config": {
|
||||
"ssoEnabled": True,
|
||||
"ssoType": "saml",
|
||||
"samlConfig": {
|
||||
"samlEntity": settings["entityID"],
|
||||
"samlIdp": settings["singleSignOnServiceLocation"],
|
||||
"samlCert": settings["certificate"],
|
||||
"samlAttributeMapping": {
|
||||
"name": "displayName",
|
||||
"groups": "groups",
|
||||
"role": "role",
|
||||
},
|
||||
},
|
||||
"roleMapping": {
|
||||
"defaultRole": "VIEWER",
|
||||
"groupMappings": {
|
||||
"signoz-admins": "ADMIN",
|
||||
"signoz-editors": "EDITOR",
|
||||
"signoz-viewers": "VIEWER",
|
||||
},
|
||||
"useRoleAttribute": False,
|
||||
},
|
||||
},
|
||||
},
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
|
||||
def test_saml_role_mapping_single_group_admin(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP, # pylint: disable=unused-argument
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Test: User in 'signoz-admins' group gets ADMIN role.
|
||||
"""
|
||||
email = "admin-group-user@saml.integration.test"
|
||||
create_user_idp_with_groups(email, "password", True, ["signoz-admins"])
|
||||
|
||||
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
|
||||
|
||||
def test_saml_role_mapping_single_group_editor(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP, # pylint: disable=unused-argument
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Test: User in 'signoz-editors' group gets EDITOR role.
|
||||
"""
|
||||
email = "editor-group-user@saml.integration.test"
|
||||
create_user_idp_with_groups(email, "password", True, ["signoz-editors"])
|
||||
|
||||
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
|
||||
|
||||
def test_saml_role_mapping_multiple_groups_highest_wins(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP, # pylint: disable=unused-argument
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Test: User in multiple groups gets highest role.
|
||||
User is in both 'signoz-viewers' and 'signoz-editors'.
|
||||
Expected: User gets EDITOR (highest of VIEWER and EDITOR).
|
||||
"""
|
||||
email = f"multi-group-user-{uuid.uuid4().hex[:8]}@saml.integration.test"
|
||||
create_user_idp_with_groups(email, "password", True, ["signoz-viewers", "signoz-editors"])
|
||||
|
||||
# DEBUG: Verify user has both groups in Keycloak
|
||||
from keycloak import KeycloakAdmin
|
||||
kc = KeycloakAdmin(
|
||||
server_url=idp.container.host_configs["6060"].base(),
|
||||
username="admin",
|
||||
password="password",
|
||||
realm_name="master",
|
||||
)
|
||||
user_id = kc.get_user_id(email)
|
||||
groups = kc.get_user_groups(user_id)
|
||||
print(f"\n=== DEBUG: User groups in Keycloak: {[g['name'] for g in groups]} ===\n")
|
||||
|
||||
# DEBUG: Check if domain has role mappings configured
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
domain = _get_saml_domain(signoz, admin_token)
|
||||
print(f"\n=== DEBUG: Domain role mapping config: {domain.get('roleMapping')} ===\n")
|
||||
print(f"domain: {domain}")
|
||||
|
||||
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
|
||||
|
||||
def test_saml_role_mapping_explicit_viewer_group(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP, # pylint: disable=unused-argument
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Test: User explicitly mapped to VIEWER via groups should get VIEWER.
|
||||
This tests the bug where VIEWER group mappings were incorrectly ignored.
|
||||
"""
|
||||
email = "viewer-group-user@saml.integration.test"
|
||||
create_user_idp_with_groups(email, "password", True, ["signoz-viewers"])
|
||||
|
||||
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
|
||||
|
||||
def test_saml_role_mapping_unmapped_group_uses_default(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP, # pylint: disable=unused-argument
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Test: User in unmapped group falls back to default role (VIEWER).
|
||||
"""
|
||||
email = "unmapped-group-user@saml.integration.test"
|
||||
create_user_idp_with_groups(email, "password", True, ["some-other-group"])
|
||||
|
||||
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
|
||||
|
||||
def test_saml_update_domain_with_use_role_claim(
|
||||
signoz: SigNoz,
|
||||
get_token: Callable[[str, str], str],
|
||||
get_saml_settings: Callable[[], dict],
|
||||
) -> None:
|
||||
"""
|
||||
Updates SAML domain to enable useRoleAttribute (direct role attribute).
|
||||
"""
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
domain = _get_saml_domain(signoz, admin_token)
|
||||
settings = get_saml_settings()
|
||||
|
||||
response = requests.put(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
|
||||
json={
|
||||
"config": {
|
||||
"ssoEnabled": True,
|
||||
"ssoType": "saml",
|
||||
"samlConfig": {
|
||||
"samlEntity": settings["entityID"],
|
||||
"samlIdp": settings["singleSignOnServiceLocation"],
|
||||
"samlCert": settings["certificate"],
|
||||
"samlAttributeMapping": {
|
||||
"name": "displayName",
|
||||
"groups": "groups",
|
||||
"role": "signoz_role",
|
||||
},
|
||||
},
|
||||
"roleMapping": {
|
||||
"defaultRole": "VIEWER",
|
||||
"groupMappings": {
|
||||
"signoz-admins": "ADMIN",
|
||||
"signoz-editors": "EDITOR",
|
||||
},
|
||||
"useRoleAttribute": True,
|
||||
},
|
||||
},
|
||||
},
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
|
||||
def test_saml_role_mapping_role_claim_takes_precedence(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP, # pylint: disable=unused-argument
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
setup_user_profile: Callable[[], None],
|
||||
) -> None:
|
||||
"""
|
||||
Test: useRoleAttribute takes precedence over group mappings.
|
||||
User is in 'signoz-editors' group but has role attribute 'ADMIN'.
|
||||
Expected: User gets ADMIN (from role attribute).
|
||||
"""
|
||||
|
||||
setup_user_profile()
|
||||
|
||||
email = "role-claim-precedence@saml.integration.test"
|
||||
create_user_idp_with_role(email, "password", True, "ADMIN", ["signoz-editors"])
|
||||
|
||||
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
|
||||
|
||||
def test_saml_role_mapping_invalid_role_claim_fallback(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP, # pylint: disable=unused-argument
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
setup_user_profile: Callable[[], None],
|
||||
) -> None:
|
||||
"""
|
||||
Test: Invalid role claim falls back to group mappings.
|
||||
User has invalid role 'SUPERADMIN' and is in 'signoz-editors'.
|
||||
Expected: User gets EDITOR (from group mapping).
|
||||
"""
|
||||
setup_user_profile()
|
||||
email = "invalid-role-user@saml.integration.test"
|
||||
create_user_idp_with_role(email, "password", True, "SUPERADMIN", ["signoz-editors"])
|
||||
|
||||
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
|
||||
|
||||
def test_saml_role_mapping_case_insensitive(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP, # pylint: disable=unused-argument
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
setup_user_profile: Callable[[], None],
|
||||
) -> None:
|
||||
"""
|
||||
Test: Role attribute matching is case-insensitive.
|
||||
User has role 'admin' (lowercase).
|
||||
Expected: User gets ADMIN role.
|
||||
"""
|
||||
setup_user_profile()
|
||||
email = "lowercase-role-user@saml.integration.test"
|
||||
create_user_idp_with_role(email, "password", True, "admin", [])
|
||||
|
||||
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
|
||||
|
||||
# def test_saml_role_mapping_update_on_subsequent_login(
|
||||
# signoz: SigNoz,
|
||||
# idp: TestContainerIDP, # pylint: disable=unused-argument
|
||||
# driver: webdriver.Chrome,
|
||||
# create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
# add_user_to_group: Callable[[str, str], None],
|
||||
# idp_login: Callable[[str, str], None],
|
||||
# get_token: Callable[[str, str], str],
|
||||
# get_session_context: Callable[[str], str],
|
||||
# ) -> None:
|
||||
# """
|
||||
# Test: User's role should update on subsequent logins when IDP groups change.
|
||||
|
||||
# This tests the critical bug where GetOrCreateUser doesn't update roles.
|
||||
|
||||
# Steps:
|
||||
# 1. User logs in with 'signoz-editors' group -> gets EDITOR role
|
||||
# 2. User's group changes in IDP to 'signoz-admins'
|
||||
# 3. User logs in again -> should get ADMIN role
|
||||
# """
|
||||
# email = "role-update-user@saml.integration.test"
|
||||
|
||||
# # First login with EDITOR group
|
||||
# create_user_idp_with_groups(email, "password", True, ["signoz-editors"])
|
||||
# _perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
|
||||
|
||||
# admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
# found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
# assert found_user is not None
|
||||
# assert found_user["role"] == "EDITOR"
|
||||
|
||||
# # Add user to admin group in IDP
|
||||
# add_user_to_group(email, "signoz-admins")
|
||||
|
||||
# # Clear browser session and login again
|
||||
# driver.delete_all_cookies()
|
||||
# _perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
|
||||
|
||||
# # Check if role was updated
|
||||
# found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
# assert found_user is not None
|
||||
# # After fix, this should be ADMIN. Currently stays EDITOR (bug).
|
||||
# assert found_user["role"] == "ADMIN"
|
||||
|
||||
|
||||
#!########################################################################
|
||||
#!############## KEEP THIS IN THE END ALWAYS #############################
|
||||
#!########################################################################
|
||||
def test_cleanup_saml_domain(
|
||||
signoz: SigNoz,
|
||||
get_token: Callable[[str, str], str],
|
||||
) -> None:
|
||||
"""Cleanup: Remove the SAML domain after tests complete."""
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
domain = _get_saml_domain(signoz, admin_token)
|
||||
|
||||
if domain:
|
||||
response = requests.delete(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
# 204 No Content or 200 OK are both valid
|
||||
assert response.status_code in [HTTPStatus.OK, HTTPStatus.NO_CONTENT, HTTPStatus.NOT_FOUND]
|
||||
|
||||
# also remove the saml client from the idp
|
||||
response = requests.delete(
|
||||
idp.container.host_configs["6060"].get(f"/realms/master/clients/{domain['id']}"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
assert response.status_code in [HTTPStatus.OK, HTTPStatus.NO_CONTENT, HTTPStatus.NOT_FOUND]
|
||||
|
||||
@@ -127,3 +127,429 @@ def test_oidc_authn(
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
|
||||
|
||||
def _get_oidc_domain(signoz: SigNoz, admin_token: str) -> dict:
|
||||
"""Helper to get the OIDC domain."""
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/domains"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
return next(
|
||||
(
|
||||
domain
|
||||
for domain in response.json()["data"]
|
||||
if domain["name"] == "oidc.integration.test"
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
|
||||
def _get_user_by_email(signoz: SigNoz, admin_token: str, email: str) -> dict:
|
||||
"""Helper to get a user by email."""
|
||||
response = requests.get(
|
||||
signoz.self.host_configs["8080"].get("/api/v1/user"),
|
||||
timeout=2,
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
)
|
||||
return next(
|
||||
(user for user in response.json()["data"] if user["email"] == email),
|
||||
None,
|
||||
)
|
||||
|
||||
|
||||
def _perform_oidc_login(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP,
|
||||
driver: webdriver.Chrome,
|
||||
get_session_context: Callable[[str], str],
|
||||
idp_login: Callable[[str, str], None],
|
||||
email: str,
|
||||
password: str,
|
||||
) -> None:
|
||||
"""Helper to perform OIDC login flow."""
|
||||
session_context = get_session_context(email)
|
||||
url = session_context["orgs"][0]["authNSupport"]["callback"][0]["url"]
|
||||
parsed_url = urlparse(url)
|
||||
actual_url = (
|
||||
f"{idp.container.host_configs['6060'].get(parsed_url.path)}?{parsed_url.query}"
|
||||
)
|
||||
driver.get(actual_url)
|
||||
idp_login(email, password)
|
||||
|
||||
|
||||
def test_oidc_update_domain_with_group_mappings(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP,
|
||||
get_token: Callable[[str, str], str],
|
||||
get_oidc_settings: Callable[[str], dict],
|
||||
) -> None:
|
||||
"""
|
||||
Updates OIDC domain to add role mapping with group mappings and claim mapping.
|
||||
"""
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
domain = _get_oidc_domain(signoz, admin_token)
|
||||
client_id = f"oidc.integration.test.{signoz.self.host_configs['8080'].address}:{signoz.self.host_configs['8080'].port}"
|
||||
settings = get_oidc_settings(client_id)
|
||||
|
||||
response = requests.put(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
|
||||
json={
|
||||
"config": {
|
||||
"ssoEnabled": True,
|
||||
"ssoType": "oidc",
|
||||
"oidcConfig": {
|
||||
"clientId": settings["client_id"],
|
||||
"clientSecret": settings["client_secret"],
|
||||
"issuer": f"{idp.container.container_configs['6060'].get(urlparse(settings['issuer']).path)}",
|
||||
"issuerAlias": settings["issuer"],
|
||||
"getUserInfo": True,
|
||||
"claimMapping": {
|
||||
"email": "email",
|
||||
"name": "name",
|
||||
"groups": "groups",
|
||||
"role": "signoz_role",
|
||||
},
|
||||
},
|
||||
"roleMapping": {
|
||||
"defaultRole": "VIEWER",
|
||||
"groupMappings": {
|
||||
"signoz-admins": "ADMIN",
|
||||
"signoz-editors": "EDITOR",
|
||||
"signoz-viewers": "VIEWER",
|
||||
},
|
||||
"useRoleAttribute": False,
|
||||
},
|
||||
},
|
||||
},
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
|
||||
def test_oidc_role_mapping_single_group_admin(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP,
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Test: OIDC user in 'signoz-admins' group gets ADMIN role.
|
||||
"""
|
||||
email = "admin-group-user@oidc.integration.test"
|
||||
create_user_idp_with_groups(email, "password123", True, ["signoz-admins"])
|
||||
|
||||
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
|
||||
|
||||
def test_oidc_role_mapping_single_group_editor(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP,
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Test: OIDC user in 'signoz-editors' group gets EDITOR role.
|
||||
"""
|
||||
email = "editor-group-user@oidc.integration.test"
|
||||
create_user_idp_with_groups(email, "password123", True, ["signoz-editors"])
|
||||
|
||||
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
|
||||
|
||||
def test_oidc_role_mapping_multiple_groups_highest_wins(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP,
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Test: OIDC user in multiple groups gets highest role.
|
||||
User is in 'signoz-viewers' and 'signoz-admins'.
|
||||
Expected: User gets ADMIN (highest of the two).
|
||||
"""
|
||||
email = "multi-group-user@oidc.integration.test"
|
||||
create_user_idp_with_groups(email, "password123", True, ["signoz-viewers", "signoz-admins"])
|
||||
|
||||
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
|
||||
|
||||
def test_oidc_role_mapping_explicit_viewer_group(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP,
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Test: OIDC user explicitly mapped to VIEWER via groups gets VIEWER.
|
||||
Tests the bug where VIEWER mappings were ignored.
|
||||
"""
|
||||
email = "viewer-group-user@oidc.integration.test"
|
||||
create_user_idp_with_groups(email, "password123", True, ["signoz-viewers"])
|
||||
|
||||
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
|
||||
|
||||
def test_oidc_role_mapping_unmapped_group_uses_default(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP,
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
) -> None:
|
||||
"""
|
||||
Test: OIDC user in unmapped group falls back to default role.
|
||||
"""
|
||||
email = "unmapped-group-user@oidc.integration.test"
|
||||
create_user_idp_with_groups(email, "password123", True, ["some-other-group"])
|
||||
|
||||
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "VIEWER"
|
||||
|
||||
|
||||
def test_oidc_update_domain_with_use_role_claim(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP,
|
||||
get_token: Callable[[str, str], str],
|
||||
get_oidc_settings: Callable[[str], dict],
|
||||
) -> None:
|
||||
"""
|
||||
Updates OIDC domain to enable useRoleClaim.
|
||||
"""
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
domain = _get_oidc_domain(signoz, admin_token)
|
||||
client_id = f"oidc.integration.test.{signoz.self.host_configs['8080'].address}:{signoz.self.host_configs['8080'].port}"
|
||||
settings = get_oidc_settings(client_id)
|
||||
|
||||
response = requests.put(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
|
||||
json={
|
||||
"config": {
|
||||
"ssoEnabled": True,
|
||||
"ssoType": "oidc",
|
||||
"oidcConfig": {
|
||||
"clientId": settings["client_id"],
|
||||
"clientSecret": settings["client_secret"],
|
||||
"issuer": f"{idp.container.container_configs['6060'].get(urlparse(settings['issuer']).path)}",
|
||||
"issuerAlias": settings["issuer"],
|
||||
"getUserInfo": True,
|
||||
"claimMapping": {
|
||||
"email": "email",
|
||||
"name": "name",
|
||||
"groups": "groups",
|
||||
"role": "signoz_role",
|
||||
},
|
||||
},
|
||||
"roleMapping": {
|
||||
"defaultRole": "VIEWER",
|
||||
"groupMappings": {
|
||||
"signoz-admins": "ADMIN",
|
||||
"signoz-editors": "EDITOR",
|
||||
},
|
||||
"useRoleAttribute": True,
|
||||
},
|
||||
},
|
||||
},
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.NO_CONTENT
|
||||
|
||||
|
||||
def test_oidc_role_mapping_role_claim_takes_precedence(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP,
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
setup_user_profile: Callable[[], None],
|
||||
) -> None:
|
||||
"""
|
||||
Test: useRoleAttribute takes precedence over group mappings.
|
||||
User is in 'signoz-editors' group but has role claim 'ADMIN'.
|
||||
Expected: User gets ADMIN (from role claim).
|
||||
"""
|
||||
setup_user_profile()
|
||||
email = "role-claim-precedence@oidc.integration.test"
|
||||
create_user_idp_with_role(email, "password123", True, "ADMIN", ["signoz-editors"])
|
||||
|
||||
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "ADMIN"
|
||||
|
||||
|
||||
def test_oidc_role_mapping_invalid_role_claim_fallback(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP,
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
setup_user_profile: Callable[[], None],
|
||||
) -> None:
|
||||
"""
|
||||
Test: Invalid role claim falls back to group mappings.
|
||||
User has invalid role 'SUPERADMIN' and is in 'signoz-editors'.
|
||||
Expected: User gets EDITOR (from group mapping).
|
||||
"""
|
||||
setup_user_profile()
|
||||
email = "invalid-role-user@oidc.integration.test"
|
||||
create_user_idp_with_role(email, "password123", True, "SUPERADMIN", ["signoz-editors"])
|
||||
|
||||
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
|
||||
|
||||
def test_oidc_role_mapping_case_insensitive(
|
||||
signoz: SigNoz,
|
||||
idp: TestContainerIDP,
|
||||
driver: webdriver.Chrome,
|
||||
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
|
||||
idp_login: Callable[[str, str], None],
|
||||
get_token: Callable[[str, str], str],
|
||||
get_session_context: Callable[[str], str],
|
||||
setup_user_profile: Callable[[], None],
|
||||
) -> None:
|
||||
"""
|
||||
Test: Role claim matching is case-insensitive.
|
||||
User has role 'editor' (lowercase).
|
||||
Expected: User gets EDITOR role.
|
||||
"""
|
||||
setup_user_profile()
|
||||
email = "lowercase-role-user@oidc.integration.test"
|
||||
create_user_idp_with_role(email, "password123", True, "editor", [])
|
||||
|
||||
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
|
||||
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
assert found_user is not None
|
||||
assert found_user["role"] == "EDITOR"
|
||||
|
||||
|
||||
# def test_oidc_role_mapping_update_on_subsequent_login(
|
||||
# signoz: SigNoz,
|
||||
# idp: TestContainerIDP,
|
||||
# driver: webdriver.Chrome,
|
||||
# create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
|
||||
# add_user_to_group: Callable[[str, str], None],
|
||||
# idp_login: Callable[[str, str], None],
|
||||
# get_token: Callable[[str, str], str],
|
||||
# get_session_context: Callable[[str], str],
|
||||
# ) -> None:
|
||||
# """
|
||||
# Test: User's role should update on subsequent logins when IDP groups change.
|
||||
|
||||
# This tests the critical bug where GetOrCreateUser doesn't update roles.
|
||||
|
||||
# Steps:
|
||||
# 1. User logs in with 'signoz-editors' group -> gets EDITOR role
|
||||
# 2. User's group changes in IDP to 'signoz-admins'
|
||||
# 3. User logs in again -> should get ADMIN role
|
||||
# """
|
||||
# email = "role-update-user@oidc.integration.test"
|
||||
|
||||
# # First login with EDITOR group
|
||||
# create_user_idp_with_groups(email, "password123", True, ["signoz-editors"])
|
||||
# _perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
|
||||
|
||||
# admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
# found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
# assert found_user is not None
|
||||
# assert found_user["role"] == "EDITOR"
|
||||
|
||||
# # Add user to admin group in IDP
|
||||
# add_user_to_group(email, "signoz-admins")
|
||||
|
||||
# # Clear browser session and login again
|
||||
# driver.delete_all_cookies()
|
||||
# _perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
|
||||
|
||||
# # Check if role was updated
|
||||
# found_user = _get_user_by_email(signoz, admin_token, email)
|
||||
|
||||
# assert found_user is not None
|
||||
# # After fix, this should be ADMIN. Currently stays EDITOR (bug).
|
||||
# assert found_user["role"] == "ADMIN"
|
||||
|
||||
|
||||
#!########################################################################
|
||||
#!############## KEEP THIS IN THE END ALWAYS #############################
|
||||
#!########################################################################
|
||||
def test_cleanup_oidc_domain(
|
||||
signoz: SigNoz,
|
||||
get_token: Callable[[str, str], str],
|
||||
) -> None:
|
||||
"""Cleanup: Remove the OIDC domain after tests complete."""
|
||||
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
|
||||
domain = _get_oidc_domain(signoz, admin_token)
|
||||
|
||||
if domain:
|
||||
response = requests.delete(
|
||||
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
|
||||
headers={"Authorization": f"Bearer {admin_token}"},
|
||||
timeout=2,
|
||||
)
|
||||
# 204 No Content or 200 OK are both valid
|
||||
assert response.status_code in [HTTPStatus.OK, HTTPStatus.NO_CONTENT, HTTPStatus.NOT_FOUND]
|
||||
|
||||
Reference in New Issue
Block a user