Compare commits

...

1 Commits

Author SHA1 Message Date
Karan Balani
9e3078383e feat: idp attributes mapping 2025-12-21 00:08:21 +05:30
16 changed files with 1497 additions and 19 deletions

View File

@@ -2,6 +2,7 @@ package oidccallbackauthn
import (
"context"
"fmt"
"net/url"
"github.com/SigNoz/signoz/pkg/authn"
@@ -20,7 +21,7 @@ const (
)
var (
scopes []string = []string{"email", oidc.ScopeOpenID}
scopes []string = []string{"email", "profile", oidc.ScopeOpenID}
)
var _ authn.CallbackAuthN = (*AuthN)(nil)
@@ -126,7 +127,39 @@ func (a *AuthN) HandleCallback(ctx context.Context, query url.Values) (*authtype
}
}
return authtypes.NewCallbackIdentity("", email, authDomain.StorableAuthDomain().OrgID, state), nil
// DEBUG: Print all assertion values to see what's being received
fmt.Printf("\n=== DEBUG: All assertion values ===\n")
for key, attr := range claims {
fmt.Printf(" Key: %q, Value: %v\n", key, attr)
}
fmt.Printf("=================================\n\n")
name := ""
if nameClaim := authDomain.AuthDomainConfig().OIDC.ClaimMapping.Name; nameClaim != "" {
if n, ok := claims[nameClaim].(string); ok {
name = n
}
}
var groups []string
if groupsClaim := authDomain.AuthDomainConfig().OIDC.ClaimMapping.Groups; groupsClaim != "" {
if g, ok := claims[groupsClaim].([]interface{}); ok {
for _, group := range g {
if gs, ok := group.(string); ok {
groups = append(groups, gs)
}
}
}
}
role := ""
if roleClaim := authDomain.AuthDomainConfig().OIDC.ClaimMapping.Role; roleClaim != "" {
if r, ok := claims[roleClaim].(string); ok {
role = r
}
}
return authtypes.NewCallbackIdentity(name, email, authDomain.StorableAuthDomain().OrgID, state, groups, role), nil
}
func (a *AuthN) ProviderInfo(ctx context.Context, authDomain *authtypes.AuthDomain) *authtypes.AuthNProviderInfo {

View File

@@ -5,6 +5,7 @@ import (
"crypto/x509"
"encoding/base64"
"encoding/pem"
"fmt"
"net/url"
"strings"
@@ -87,6 +88,14 @@ func (a *AuthN) HandleCallback(ctx context.Context, formValues url.Values) (*aut
return nil, err
}
// DEBUG: Print all assertion values to see what's being received
fmt.Printf("\n=== DEBUG: All assertion values ===\n")
for key, attr := range assertionInfo.Values {
fmt.Printf(" Key: %q, Name: %q, FriendlyName: %q, Values: %v\n",
key, attr.Name, attr.FriendlyName, attr.Values)
}
fmt.Printf("=================================\n\n")
if assertionInfo.WarningInfo.InvalidTime {
return nil, errors.New(errors.TypeForbidden, errors.CodeForbidden, "saml: expired saml response")
}
@@ -96,7 +105,30 @@ func (a *AuthN) HandleCallback(ctx context.Context, formValues url.Values) (*aut
return nil, errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "saml: invalid email").WithAdditional("The nameID assertion is used to retrieve the email address, please check your IDP configuration and try again.")
}
return authtypes.NewCallbackIdentity("", email, authDomain.StorableAuthDomain().OrgID, state), nil
name := ""
var groups []string
role := ""
attributeMapping := authDomain.AuthDomainConfig().SAML.AttributeMapping
if attributeMapping != nil {
if attributeMapping.Name != "" {
if val := assertionInfo.Values.Get(attributeMapping.Name); val != "" {
name = val
}
}
if attributeMapping.Groups != "" {
groups = assertionInfo.Values.GetAll(attributeMapping.Groups)
}
if attributeMapping.Role != "" {
if val := assertionInfo.Values.Get(attributeMapping.Role); val != "" {
role = val
}
}
}
return authtypes.NewCallbackIdentity(name, email, authDomain.StorableAuthDomain().OrgID, state, groups, role), nil
}
func (a *AuthN) ProviderInfo(ctx context.Context, authDomain *authtypes.AuthDomain) *authtypes.AuthNProviderInfo {

2
go.mod
View File

@@ -341,7 +341,7 @@ require (
golang.org/x/time v0.11.0 // indirect
golang.org/x/tools v0.36.0 // indirect
gonum.org/v1/gonum v0.16.0 // indirect
google.golang.org/api v0.236.0 // indirect
google.golang.org/api v0.236.0
google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect
google.golang.org/grpc v1.75.1 // indirect

2
go.sum
View File

@@ -1713,6 +1713,8 @@ google.golang.org/genproto v0.0.0-20220421151946-72621c1f0bd3/go.mod h1:8w6bsBMX
google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2 h1:1tXaIXCracvtsRxSBsYDiSBN0cuJvM7QYW+MrpIRY78=
google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:49MsLSx0oWMOZqcpB3uL8ZOkAh1+TndpJ8ONoCBWiZk=
google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY=
google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE=

View File

@@ -3,6 +3,7 @@ package googlecallbackauthn
import (
"context"
"net/url"
"strings"
"github.com/SigNoz/signoz/pkg/authn"
"github.com/SigNoz/signoz/pkg/errors"
@@ -10,6 +11,9 @@ import (
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/coreos/go-oidc/v3/oidc"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
admin "google.golang.org/api/admin/directory/v1"
"google.golang.org/api/option"
)
const (
@@ -18,7 +22,7 @@ const (
)
var (
scopes []string = []string{"email"}
scopes []string = []string{"email", "profile"}
)
var _ authn.CallbackAuthN = (*AuthN)(nil)
@@ -113,8 +117,22 @@ func (a *AuthN) HandleCallback(ctx context.Context, query url.Values) (*authtype
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "google: failed to parse email").WithAdditional(err.Error())
}
return authtypes.NewCallbackIdentity(claims.Name, email, authDomain.StorableAuthDomain().OrgID, state), nil
var groups []string
if authDomain.AuthDomainConfig().Google.FetchGroups {
groups, err = a.fetchGoogleWorkspaceGroups(ctx, claims.Email, authDomain.AuthDomainConfig().Google)
if err != nil {
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "google: could not fetch groups").WithAdditional(err.Error())
}
if len(authDomain.AuthDomainConfig().Google.AllowedGroups) > 0 {
groups = filterGroups(groups, authDomain.AuthDomainConfig().Google.AllowedGroups)
if len(groups) == 0 {
return nil, errors.Newf(errors.TypeForbidden, errors.CodeForbidden, "google: user %q is not in any allowed groups", claims.Email)
}
}
}
return authtypes.NewCallbackIdentity(claims.Name, email, authDomain.StorableAuthDomain().OrgID, state, groups, ""), nil
}
func (a *AuthN) ProviderInfo(ctx context.Context, authDomain *authtypes.AuthDomain) *authtypes.AuthNProviderInfo {
@@ -136,3 +154,84 @@ func (a *AuthN) oauth2Config(siteURL *url.URL, authDomain *authtypes.AuthDomain,
}).String(),
}
}
func (a *AuthN) fetchGoogleWorkspaceGroups(ctx context.Context, userEmail string, config *authtypes.GoogleConfig) ([]string, error) {
adminEmail := config.GetAdminEmailForDomain(userEmail)
if adminEmail == "" {
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "no admin email configured for domain of %s", userEmail)
}
jwtConfig, err := google.JWTConfigFromJSON([]byte(config.ServiceAccountJSON), admin.AdminDirectoryGroupReadonlyScope)
if err != nil {
return nil, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "invalid service account credentials").WithAdditional(err.Error())
}
jwtConfig.Subject = adminEmail
adminService, err := admin.NewService(ctx, option.WithHTTPClient(jwtConfig.Client(ctx)))
if err != nil {
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "unable to create directory service").WithAdditional(err.Error())
}
checkedGroups := make(map[string]struct{})
return a.getGroups(adminService, userEmail, config.FetchTransitiveGroupMembership, checkedGroups)
}
// Recursive method
func (a *AuthN) getGroups(adminService *admin.Service, userEmail string, fetchTransitive bool, checkedGroups map[string]struct{}) ([]string, error) {
var userGroups []string
var pageToken string
for {
call := adminService.Groups.List().UserKey(userEmail)
if pageToken != "" {
call = call.PageToken(pageToken)
}
groupList, err := call.Do()
if err != nil {
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "unable to list groups").WithAdditional(err.Error())
}
for _, group := range groupList.Groups {
if _, exists := checkedGroups[group.Email]; exists {
continue
}
checkedGroups[group.Email] = struct{}{}
userGroups = append(userGroups, group.Email)
if fetchTransitive {
transitiveGroups, err := a.getGroups(adminService, group.Email, fetchTransitive, checkedGroups)
if err != nil {
return nil, errors.Newf(errors.TypeInternal, errors.CodeInternal, "unable to list transitive groups").WithAdditional(err.Error())
}
userGroups = append(userGroups, transitiveGroups...)
}
}
pageToken = groupList.NextPageToken
if pageToken == "" {
break
}
}
return userGroups, nil
}
func filterGroups(userGroups, allowedGroups []string) []string {
allowed := make(map[string]struct{}, len(allowedGroups))
for _, g := range allowedGroups {
allowed[strings.ToLower(g)] = struct{}{} // just to make o(1) searches
}
var filtered []string
for _, g := range userGroups {
if _, ok := allowed[strings.ToLower(g)]; ok {
filtered = append(filtered, g)
}
}
return filtered
}

View File

@@ -112,7 +112,7 @@ func (b *base) WithUrl(u string) *base {
}
}
// WithUrl adds additional messages to the base error and returns a new base error.
// WithAdditional adds additional messages to the base error and returns a new base error.
func (b *base) WithAdditional(a ...string) *base {
return &base{
t: b.t,

View File

@@ -2,6 +2,7 @@ package implsession
import (
"context"
"fmt"
"net/url"
"slices"
"strings"
@@ -123,7 +124,7 @@ func (module *module) DeprecatedCreateSessionByEmailPassword(ctx context.Context
}
if !factorPassword.Equals(password) {
return nil, errors.New(errors.TypeUnauthenticated, types.ErrCodeIncorrectPassword, "invalid email orpassword")
return nil, errors.New(errors.TypeUnauthenticated, types.ErrCodeIncorrectPassword, "invalid email or password")
}
identity := authtypes.NewIdentity(users[0].ID, users[0].OrgID, users[0].Email, users[0].Role)
@@ -151,13 +152,33 @@ func (module *module) CreateCallbackAuthNSession(ctx context.Context, authNProvi
return "", err
}
module.settings.Logger().InfoContext(ctx, "$$$$$$$$$$$$$$ callback values %%%%%%%%%%%%%%", "values", values)
callbackIdentity, err := callbackAuthN.HandleCallback(ctx, values)
if err != nil {
module.settings.Logger().ErrorContext(ctx, "failed to handle callback", "error", err, "authn_provider", authNProvider)
return "", err
}
user, err := types.NewUser(callbackIdentity.Name, callbackIdentity.Email, types.RoleViewer, callbackIdentity.OrgID)
authDomain, err := module.authDomain.GetByOrgIDAndID(ctx, callbackIdentity.OrgID, callbackIdentity.State.DomainID)
if err != nil {
return "", err
}
roleMapping := authDomain.AuthDomainConfig().RoleMapping
// DEBUG: Add this line to print the callback identity
module.settings.Logger().InfoContext(ctx, "$$$$$$$$$$$$$$$$ DEBUG callback identity ###############",
"email", callbackIdentity.Email,
"name", callbackIdentity.Name,
"groups", callbackIdentity.Groups,
"role", callbackIdentity.Role,
"orgID", callbackIdentity.OrgID,
)
role := resolveRole(callbackIdentity, roleMapping)
user, err := types.NewUser(callbackIdentity.Name, callbackIdentity.Email, role, callbackIdentity.OrgID)
if err != nil {
return "", err
}
@@ -231,3 +252,59 @@ func getProvider[T authn.AuthN](authNProvider authtypes.AuthNProvider, authNs ma
return provider, nil
}
func resolveRole(callbackIdentity *authtypes.CallbackIdentity, roleMapping *authtypes.RoleMapping) types.Role {
// DEBUG: Print all assertion values to see what's being received
fmt.Printf("\n=== DEBUG: callback identity values ===\n")
fmt.Printf("callbackIdentity.Name - %s", callbackIdentity.Name)
fmt.Printf("callbackIdentity.Role - %s", callbackIdentity.Role)
fmt.Printf("=================================\n\n")
if roleMapping == nil {
return types.RoleViewer
}
if roleMapping.UseRoleAttribute && callbackIdentity.Role != "" {
if role, err := types.NewRole(strings.ToUpper(callbackIdentity.Role)); err == nil {
return role
}
}
if len(roleMapping.GroupMappings) > 0 && len(callbackIdentity.Groups) > 0 {
highestRole := types.RoleViewer
found := false
for _, group := range callbackIdentity.Groups {
if mappedRole, exists := roleMapping.GroupMappings[group]; exists {
found = true
if role, err := types.NewRole(strings.ToUpper(mappedRole)); err == nil {
if compareRoles(role, highestRole) > 0 {
highestRole = role
}
}
}
}
if found {
return highestRole
}
}
if roleMapping.DefaultRole != "" {
if role, err := types.NewRole(strings.ToUpper(roleMapping.DefaultRole)); err == nil {
return role
}
}
return types.RoleViewer
}
func compareRoles(a, b types.Role) int {
order := map[types.Role]int{
types.RoleViewer: 0,
types.RoleEditor: 1,
types.RoleAdmin: 2,
}
return order[a] - order[b]
}

View File

@@ -32,10 +32,12 @@ type Identity struct {
}
type CallbackIdentity struct {
Name string `json:"name"`
Email valuer.Email `json:"email"`
OrgID valuer.UUID `json:"orgId"`
State State `json:"state"`
Name string `json:"name"`
Email valuer.Email `json:"email"`
OrgID valuer.UUID `json:"orgId"`
State State `json:"state"`
Groups []string `json:"groups,omitempty"`
Role string `json:"role,omitempty"`
}
type State struct {
@@ -85,12 +87,14 @@ func NewIdentity(userID valuer.UUID, orgID valuer.UUID, email valuer.Email, role
}
}
func NewCallbackIdentity(name string, email valuer.Email, orgID valuer.UUID, state State) *CallbackIdentity {
func NewCallbackIdentity(name string, email valuer.Email, orgID valuer.UUID, state State, groups []string, role string) *CallbackIdentity {
return &CallbackIdentity{
Name: name,
Email: email,
OrgID: orgID,
State: state,
Name: name,
Email: email,
OrgID: orgID,
State: state,
Groups: groups,
Role: role,
}
}

View File

@@ -63,6 +63,7 @@ type AuthDomainConfig struct {
SAML *SamlConfig `json:"samlConfig"`
Google *GoogleConfig `json:"googleAuthConfig"`
OIDC *OIDCConfig `json:"oidcConfig"`
RoleMapping *RoleMapping `json:"roleMapping"`
}
type AuthDomain struct {

View File

@@ -2,10 +2,13 @@ package authtypes
import (
"encoding/json"
"strings"
"github.com/SigNoz/signoz/pkg/errors"
)
const wildCardDomain = "*"
type GoogleConfig struct {
// ClientID is the application's ID. For example, 292085223830.apps.googleusercontent.com.
ClientID string `json:"clientId"`
@@ -15,6 +18,27 @@ type GoogleConfig struct {
// What is the meaning of this? Should we remove this?
RedirectURI string `json:"redirectURI"`
// Whether to fetch the Google workspace groups (required additional API scopes)
FetchGroups bool `json:"fetchGroups"`
// Service Account creds JSON stored for Google Admin SDK access
// This is content of the JSON file stored directly into db as string
// Required if FetchGroups is true (unless running on GCE with default credentials)
ServiceAccountJSON string `json:"serviceAccountJson,omitempty"`
// Map of workspace domain to admin email for service account impersonation
// The service account will impersonate this admin to call the directory API
// Use "*" as key for wildcard/default that matches any domain
// Example: {"example.com": "admin@exmaple.com", "*": "fallbackadmin@company.com"}
DomainToAdminEmail map[string]string `json:"adminEmail,omitempty"`
// If true, fetch transitive group membership (recursive - groups that contains other groups)
FetchTransitiveGroupMembership bool `json:"fetchTransitiveGroupMembership,omitempty"`
// Optional list of allowed groups
// If this is present, only users belonging to one of these groups will be allowed to login
AllowedGroups []string `json:"allowedGroups,omitempty"`
}
func (config *GoogleConfig) UnmarshalJSON(data []byte) error {
@@ -33,6 +57,33 @@ func (config *GoogleConfig) UnmarshalJSON(data []byte) error {
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "clientSecret is required")
}
if temp.FetchGroups {
if len(temp.DomainToAdminEmail) == 0 {
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "domainToAdminEmail is required if fetchGroups is true")
}
if temp.ServiceAccountJSON == "" {
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "serviceAccountJSON is required if fetchGroups is true")
}
}
*config = GoogleConfig(temp)
return nil
}
func (config *GoogleConfig) GetAdminEmailForDomain(userEmail string) string {
domain := extractDomainFromEmail(userEmail)
if adminEmail, ok := config.DomainToAdminEmail[domain]; ok {
return adminEmail
}
return config.DomainToAdminEmail[wildCardDomain]
}
func extractDomainFromEmail(email string) string {
if at := strings.LastIndex(email, "@"); at >= 0 {
return email[at+1:]
}
return wildCardDomain
}

View File

@@ -34,6 +34,12 @@ type OIDCConfig struct {
type ClaimMapping struct {
// Configurable key which contains the email claims. Defaults to "email"
Email string `json:"email"`
// Configuration key which contains the name. Defaults to "name"
Name string `json:"name"`
// Configuration key which contains the name. Defaults to "groups" (Optional)
Groups string `json:"groups"`
// Configuration key which contains the name. Defaults to "role" (Optional)
Role string `json:"role"`
}
func (config *OIDCConfig) UnmarshalJSON(data []byte) error {

View File

@@ -0,0 +1,10 @@
package authtypes
type RoleMapping struct {
// Default role any new SSO users. Defaults to "VIEWER"
DefaultRole string `json:"defaultRole"`
// Map of IDP group names to SigNoz roles. Key is group name, value is SigNoz role
GroupMappings map[string]string `json:"groupMappings"`
// If true, use the role claim directly from IDP instead of group mappings
UseRoleAttribute bool `json:"useRoleAttribute"`
}

View File

@@ -20,6 +20,18 @@ type SamlConfig struct {
// For providers like jumpcloud, this should be set to true.
// Note: This is the reverse of WantAuthnRequestsSigned. If WantAuthnRequestsSigned is false, then InsecureSkipAuthNRequestsSigned should be true.
InsecureSkipAuthNRequestsSigned bool `json:"insecureSkipAuthNRequestsSigned"`
// Mapping of SAML assertion attributes
AttributeMapping *SamlAttributeMapping `json:"samlAttributeMapping"`
}
type SamlAttributeMapping struct {
// SAML attribute name for display name
Name string `json:"name"`
// SAML attribute name for groups
Groups string `json:"groups"`
// SAML attribute name for direct role
Role string `json:"role"`
}
func (config *SamlConfig) UnmarshalJSON(data []byte) error {

View File

@@ -1,4 +1,4 @@
from typing import Any, Callable, Dict
from typing import Any, Callable, Dict, List
from urllib.parse import urljoin
from xml.etree import ElementTree
@@ -26,6 +26,15 @@ def create_saml_client(
realm_name="master",
)
# DELETE existing client if it exists (to ensure mappers are updated)
saml_client_id = f"{signoz.self.host_configs['8080'].address}:{signoz.self.host_configs['8080'].port}"
try:
existing_client_id = client.get_client_id(client_id=saml_client_id)
if existing_client_id:
client.delete_client(existing_client_id)
except Exception:
pass # Client doesn't exist, that's fine
client.create_client(
skip_exists=True,
payload={
@@ -114,6 +123,31 @@ def create_saml_client(
"attribute.name": "Role",
},
},
{
"name": "groups",
"protocol": "saml",
"protocolMapper": "saml-group-membership-mapper",
"consentRequired": False,
"config": {
"full.path": "false",
"attribute.nameformat": "Basic",
"single": "true", # ! this was changed to true as we need the groups in the single attribute section
"friendly.name": "groups",
"attribute.name": "groups",
},
},
{
"name": "role attribute",
"protocol": "saml",
"protocolMapper": "saml-user-attribute-mapper",
"consentRequired": False,
"config": {
"attribute.nameformat": "Basic",
"user.attribute": "signoz_role",
"friendly.name": "signoz_role",
"attribute.name": "signoz_role",
},
},
],
"defaultClientScopes": ["saml_organization", "role_list"],
"optionalClientScopes": [],
@@ -163,6 +197,16 @@ def create_oidc_client(
realm_name="master",
)
_ensure_groups_client_scope(client)
# DELETE existing client if it exists (to ensure redirect URIs are updated)
try:
existing_client_id = client.get_client_id(client_id=client_id)
if existing_client_id:
client.delete_client(existing_client_id)
except Exception:
pass # Client doesn't exist, that's fine
client.create_client(
skip_exists=True,
payload={
@@ -208,6 +252,7 @@ def create_oidc_client(
"profile",
"basic",
"email",
"groups",
],
"optionalClientScopes": [
"address",
@@ -333,3 +378,248 @@ def idp_login(driver: webdriver.Chrome) -> Callable[[str, str], None]:
wait.until(EC.invisibility_of_element((By.ID, "kc-login")))
return _idp_login
@pytest.fixture(name="create_group_idp", scope="function")
def create_group_idp(idp: types.TestContainerIDP) -> Callable[[str], str]:
"""Creates a group in Keycloak IDP."""
client = KeycloakAdmin(
server_url=idp.container.host_configs["6060"].base(),
username=IDP_ROOT_USERNAME,
password=IDP_ROOT_PASSWORD,
realm_name="master",
)
created_groups = []
def _create_group_idp(group_name: str) -> str:
group_id = client.create_group({"name": group_name}, skip_exists=True)
created_groups.append(group_id)
return group_id
yield _create_group_idp
for group_id in created_groups:
try:
client.delete_group(group_id)
except Exception:
pass
@pytest.fixture(name="create_user_idp_with_groups", scope="function")
def create_user_idp_with_groups(
idp: types.TestContainerIDP,
create_group_idp: Callable[[str], str],
) -> Callable[[str, str, bool, List[str]], None]:
"""Creates a user in Keycloak IDP with specified groups."""
client = KeycloakAdmin(
server_url=idp.container.host_configs["6060"].base(),
username=IDP_ROOT_USERNAME,
password=IDP_ROOT_PASSWORD,
realm_name="master",
)
created_users = []
def _create_user_idp_with_groups(
email: str, password: str, verified: bool, groups: List[str]
) -> None:
# Create groups first
group_ids = []
for group_name in groups:
group_id = create_group_idp(group_name)
group_ids.append(group_id)
# Create user
user_id = client.create_user(
exist_ok=False,
payload={
"username": email,
"email": email,
"enabled": True,
"emailVerified": verified,
},
)
client.set_user_password(user_id, password, temporary=False)
created_users.append(user_id)
# Add user to groups
for group_id in group_ids:
client.group_user_add(user_id, group_id)
yield _create_user_idp_with_groups
for user_id in created_users:
try:
break
client.delete_user(user_id)
except Exception:
pass
@pytest.fixture(name="add_user_to_group", scope="function")
def add_user_to_group(
idp: types.TestContainerIDP,
create_group_idp: Callable[[str], str],
) -> Callable[[str, str], None]:
"""Adds an existing user to a group."""
client = KeycloakAdmin(
server_url=idp.container.host_configs["6060"].base(),
username=IDP_ROOT_USERNAME,
password=IDP_ROOT_PASSWORD,
realm_name="master",
)
def _add_user_to_group(email: str, group_name: str) -> None:
user_id = client.get_user_id(email)
group_id = create_group_idp(group_name)
client.group_user_add(user_id, group_id)
return _add_user_to_group
@pytest.fixture(name="create_user_idp_with_role", scope="function")
def create_user_idp_with_role(
idp: types.TestContainerIDP,
create_group_idp: Callable[[str], str],
) -> Callable[[str, str, bool, str, List[str]], None]:
"""Creates a user in Keycloak IDP with a custom role attribute and optional groups."""
client = KeycloakAdmin(
server_url=idp.container.host_configs["6060"].base(),
username=IDP_ROOT_USERNAME,
password=IDP_ROOT_PASSWORD,
realm_name="master",
)
created_users = []
def _create_user_idp_with_role(
email: str, password: str, verified: bool, role: str, groups: List[str]
) -> None:
# Create groups first
group_ids = []
for group_name in groups:
group_id = create_group_idp(group_name)
group_ids.append(group_id)
# Create user with role attribute
user_id = client.create_user(
exist_ok=False,
payload={
"username": email,
"email": email,
"enabled": True,
"emailVerified": verified,
"attributes": {
"signoz_role": role,
},
},
)
client.set_user_password(user_id, password, temporary=False)
created_users.append(user_id)
# Add user to groups
for group_id in group_ids:
client.group_user_add(user_id, group_id)
yield _create_user_idp_with_role
for user_id in created_users:
try:
break
client.delete_user(user_id)
except Exception:
pass
@pytest.fixture(name="setup_user_profile", scope="package")
def setup_user_profile(idp: types.TestContainerIDP) -> Callable[[], None]:
"""Setup Keycloak User Profile with signoz_role attribute."""
def _setup_user_profile() -> None:
client = KeycloakAdmin(
server_url=idp.container.host_configs["6060"].base(),
username=IDP_ROOT_USERNAME,
password=IDP_ROOT_PASSWORD,
realm_name="master",
)
# Get current user profile config
profile = client.get_realm_users_profile()
# Check if signoz_role attribute already exists
attributes = profile.get("attributes", [])
signoz_role_exists = any(attr.get("name") == "signoz_role" for attr in attributes)
if not signoz_role_exists:
# Add signoz_role attribute to user profile
attributes.append({
"name": "signoz_role",
"displayName": "SigNoz Role",
"validations": {},
"annotations": {},
# "required": {
# "roles": [] # Not required
# },
"permissions": {
"view": ["admin", "user"],
"edit": ["admin"]
},
"multivalued": False
})
profile["attributes"] = attributes
# Update the realm user profile
client.update_realm_users_profile(payload=profile)
return _setup_user_profile
def _ensure_groups_client_scope(client: KeycloakAdmin) -> None:
"""Create 'groups' client scope if it doesn't exist."""
# Check if groups scope exists
scopes = client.get_client_scopes()
groups_scope_exists = any(s.get("name") == "groups" for s in scopes)
if not groups_scope_exists:
# Create the groups client scope
client.create_client_scope(
payload={
"name": "groups",
"description": "Group membership",
"protocol": "openid-connect",
"attributes": {
"include.in.token.scope": "true",
"display.on.consent.screen": "true",
},
"protocolMappers": [
{
"name": "groups",
"protocol": "openid-connect",
"protocolMapper": "oidc-group-membership-mapper",
"consentRequired": False,
"config": {
"full.path": "false",
"id.token.claim": "true",
"access.token.claim": "true",
"claim.name": "groups",
"userinfo.token.claim": "true",
},
},
{
"name": "signoz_role",
"protocol": "openid-connect",
"protocolMapper": "oidc-usermodel-attribute-mapper",
"consentRequired": False,
"config": {
"user.attribute": "signoz_role",
"id.token.claim": "true",
"access.token.claim": "true",
"claim.name": "signoz_role",
"userinfo.token.claim": "true",
"jsonType.label": "String",
},
},
],
},
skip_exists=True,
)

View File

@@ -4,6 +4,7 @@ from typing import Any, Callable, Dict, List
import requests
from selenium import webdriver
from wiremock.resources.mappings import Mapping
import uuid
from fixtures.auth import (
USER_ADMIN_EMAIL,
@@ -195,3 +196,437 @@ def test_idp_initiated_saml_authn(
assert found_user is not None
assert found_user["role"] == "VIEWER"
def _get_saml_domain(signoz: SigNoz, admin_token: str) -> dict:
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/domains"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
return next(
(
domain
for domain in response.json()["data"]
if domain["name"] == "saml.integration.test"
),
None,
)
def _get_user_by_email(signoz: SigNoz, admin_token: str, email: str) -> dict:
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
return next(
(user for user in response.json()["data"] if user["email"] == email),
None,
)
def _perform_saml_login(
signoz: SigNoz,
driver: webdriver.Chrome,
get_session_context: Callable[[str], str],
idp_login: Callable[[str, str], None],
email: str,
password: str,
) -> None:
session_context = get_session_context(email)
url = session_context["orgs"][0]["authNSupport"]["callback"][0]["url"]
driver.get(url)
idp_login(email, password)
def test_saml_update_domain_with_group_mappings(
signoz: SigNoz,
get_token: Callable[[str, str], str],
get_saml_settings: Callable[[], dict],
) -> None:
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
domain = _get_saml_domain(signoz, admin_token)
settings = get_saml_settings()
# update the existing saml domain to have role mappings also
response = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
json={
"config": {
"ssoEnabled": True,
"ssoType": "saml",
"samlConfig": {
"samlEntity": settings["entityID"],
"samlIdp": settings["singleSignOnServiceLocation"],
"samlCert": settings["certificate"],
"samlAttributeMapping": {
"name": "displayName",
"groups": "groups",
"role": "role",
},
},
"roleMapping": {
"defaultRole": "VIEWER",
"groupMappings": {
"signoz-admins": "ADMIN",
"signoz-editors": "EDITOR",
"signoz-viewers": "VIEWER",
},
"useRoleAttribute": False,
},
},
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.NO_CONTENT
def test_saml_role_mapping_single_group_admin(
signoz: SigNoz,
idp: TestContainerIDP, # pylint: disable=unused-argument
driver: webdriver.Chrome,
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
) -> None:
"""
Test: User in 'signoz-admins' group gets ADMIN role.
"""
email = "admin-group-user@saml.integration.test"
create_user_idp_with_groups(email, "password", True, ["signoz-admins"])
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
def test_saml_role_mapping_single_group_editor(
signoz: SigNoz,
idp: TestContainerIDP, # pylint: disable=unused-argument
driver: webdriver.Chrome,
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
) -> None:
"""
Test: User in 'signoz-editors' group gets EDITOR role.
"""
email = "editor-group-user@saml.integration.test"
create_user_idp_with_groups(email, "password", True, ["signoz-editors"])
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
def test_saml_role_mapping_multiple_groups_highest_wins(
signoz: SigNoz,
idp: TestContainerIDP, # pylint: disable=unused-argument
driver: webdriver.Chrome,
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
) -> None:
"""
Test: User in multiple groups gets highest role.
User is in both 'signoz-viewers' and 'signoz-editors'.
Expected: User gets EDITOR (highest of VIEWER and EDITOR).
"""
email = f"multi-group-user-{uuid.uuid4().hex[:8]}@saml.integration.test"
create_user_idp_with_groups(email, "password", True, ["signoz-viewers", "signoz-editors"])
# DEBUG: Verify user has both groups in Keycloak
from keycloak import KeycloakAdmin
kc = KeycloakAdmin(
server_url=idp.container.host_configs["6060"].base(),
username="admin",
password="password",
realm_name="master",
)
user_id = kc.get_user_id(email)
groups = kc.get_user_groups(user_id)
print(f"\n=== DEBUG: User groups in Keycloak: {[g['name'] for g in groups]} ===\n")
# DEBUG: Check if domain has role mappings configured
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
domain = _get_saml_domain(signoz, admin_token)
print(f"\n=== DEBUG: Domain role mapping config: {domain.get('roleMapping')} ===\n")
print(f"domain: {domain}")
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
def test_saml_role_mapping_explicit_viewer_group(
signoz: SigNoz,
idp: TestContainerIDP, # pylint: disable=unused-argument
driver: webdriver.Chrome,
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
) -> None:
"""
Test: User explicitly mapped to VIEWER via groups should get VIEWER.
This tests the bug where VIEWER group mappings were incorrectly ignored.
"""
email = "viewer-group-user@saml.integration.test"
create_user_idp_with_groups(email, "password", True, ["signoz-viewers"])
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "VIEWER"
def test_saml_role_mapping_unmapped_group_uses_default(
signoz: SigNoz,
idp: TestContainerIDP, # pylint: disable=unused-argument
driver: webdriver.Chrome,
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
) -> None:
"""
Test: User in unmapped group falls back to default role (VIEWER).
"""
email = "unmapped-group-user@saml.integration.test"
create_user_idp_with_groups(email, "password", True, ["some-other-group"])
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "VIEWER"
def test_saml_update_domain_with_use_role_claim(
signoz: SigNoz,
get_token: Callable[[str, str], str],
get_saml_settings: Callable[[], dict],
) -> None:
"""
Updates SAML domain to enable useRoleAttribute (direct role attribute).
"""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
domain = _get_saml_domain(signoz, admin_token)
settings = get_saml_settings()
response = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
json={
"config": {
"ssoEnabled": True,
"ssoType": "saml",
"samlConfig": {
"samlEntity": settings["entityID"],
"samlIdp": settings["singleSignOnServiceLocation"],
"samlCert": settings["certificate"],
"samlAttributeMapping": {
"name": "displayName",
"groups": "groups",
"role": "signoz_role",
},
},
"roleMapping": {
"defaultRole": "VIEWER",
"groupMappings": {
"signoz-admins": "ADMIN",
"signoz-editors": "EDITOR",
},
"useRoleAttribute": True,
},
},
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.NO_CONTENT
def test_saml_role_mapping_role_claim_takes_precedence(
signoz: SigNoz,
idp: TestContainerIDP, # pylint: disable=unused-argument
driver: webdriver.Chrome,
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
setup_user_profile: Callable[[], None],
) -> None:
"""
Test: useRoleAttribute takes precedence over group mappings.
User is in 'signoz-editors' group but has role attribute 'ADMIN'.
Expected: User gets ADMIN (from role attribute).
"""
setup_user_profile()
email = "role-claim-precedence@saml.integration.test"
create_user_idp_with_role(email, "password", True, "ADMIN", ["signoz-editors"])
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
def test_saml_role_mapping_invalid_role_claim_fallback(
signoz: SigNoz,
idp: TestContainerIDP, # pylint: disable=unused-argument
driver: webdriver.Chrome,
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
setup_user_profile: Callable[[], None],
) -> None:
"""
Test: Invalid role claim falls back to group mappings.
User has invalid role 'SUPERADMIN' and is in 'signoz-editors'.
Expected: User gets EDITOR (from group mapping).
"""
setup_user_profile()
email = "invalid-role-user@saml.integration.test"
create_user_idp_with_role(email, "password", True, "SUPERADMIN", ["signoz-editors"])
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
def test_saml_role_mapping_case_insensitive(
signoz: SigNoz,
idp: TestContainerIDP, # pylint: disable=unused-argument
driver: webdriver.Chrome,
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
setup_user_profile: Callable[[], None],
) -> None:
"""
Test: Role attribute matching is case-insensitive.
User has role 'admin' (lowercase).
Expected: User gets ADMIN role.
"""
setup_user_profile()
email = "lowercase-role-user@saml.integration.test"
create_user_idp_with_role(email, "password", True, "admin", [])
_perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
# def test_saml_role_mapping_update_on_subsequent_login(
# signoz: SigNoz,
# idp: TestContainerIDP, # pylint: disable=unused-argument
# driver: webdriver.Chrome,
# create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
# add_user_to_group: Callable[[str, str], None],
# idp_login: Callable[[str, str], None],
# get_token: Callable[[str, str], str],
# get_session_context: Callable[[str], str],
# ) -> None:
# """
# Test: User's role should update on subsequent logins when IDP groups change.
# This tests the critical bug where GetOrCreateUser doesn't update roles.
# Steps:
# 1. User logs in with 'signoz-editors' group -> gets EDITOR role
# 2. User's group changes in IDP to 'signoz-admins'
# 3. User logs in again -> should get ADMIN role
# """
# email = "role-update-user@saml.integration.test"
# # First login with EDITOR group
# create_user_idp_with_groups(email, "password", True, ["signoz-editors"])
# _perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
# admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# found_user = _get_user_by_email(signoz, admin_token, email)
# assert found_user is not None
# assert found_user["role"] == "EDITOR"
# # Add user to admin group in IDP
# add_user_to_group(email, "signoz-admins")
# # Clear browser session and login again
# driver.delete_all_cookies()
# _perform_saml_login(signoz, driver, get_session_context, idp_login, email, "password")
# # Check if role was updated
# found_user = _get_user_by_email(signoz, admin_token, email)
# assert found_user is not None
# # After fix, this should be ADMIN. Currently stays EDITOR (bug).
# assert found_user["role"] == "ADMIN"
#!########################################################################
#!############## KEEP THIS IN THE END ALWAYS #############################
#!########################################################################
def test_cleanup_saml_domain(
signoz: SigNoz,
get_token: Callable[[str, str], str],
) -> None:
"""Cleanup: Remove the SAML domain after tests complete."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
domain = _get_saml_domain(signoz, admin_token)
if domain:
response = requests.delete(
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
# 204 No Content or 200 OK are both valid
assert response.status_code in [HTTPStatus.OK, HTTPStatus.NO_CONTENT, HTTPStatus.NOT_FOUND]
# also remove the saml client from the idp
response = requests.delete(
idp.container.host_configs["6060"].get(f"/realms/master/clients/{domain['id']}"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code in [HTTPStatus.OK, HTTPStatus.NO_CONTENT, HTTPStatus.NOT_FOUND]

View File

@@ -127,3 +127,429 @@ def test_oidc_authn(
assert found_user is not None
assert found_user["role"] == "VIEWER"
def _get_oidc_domain(signoz: SigNoz, admin_token: str) -> dict:
"""Helper to get the OIDC domain."""
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/domains"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
return next(
(
domain
for domain in response.json()["data"]
if domain["name"] == "oidc.integration.test"
),
None,
)
def _get_user_by_email(signoz: SigNoz, admin_token: str, email: str) -> dict:
"""Helper to get a user by email."""
response = requests.get(
signoz.self.host_configs["8080"].get("/api/v1/user"),
timeout=2,
headers={"Authorization": f"Bearer {admin_token}"},
)
return next(
(user for user in response.json()["data"] if user["email"] == email),
None,
)
def _perform_oidc_login(
signoz: SigNoz,
idp: TestContainerIDP,
driver: webdriver.Chrome,
get_session_context: Callable[[str], str],
idp_login: Callable[[str, str], None],
email: str,
password: str,
) -> None:
"""Helper to perform OIDC login flow."""
session_context = get_session_context(email)
url = session_context["orgs"][0]["authNSupport"]["callback"][0]["url"]
parsed_url = urlparse(url)
actual_url = (
f"{idp.container.host_configs['6060'].get(parsed_url.path)}?{parsed_url.query}"
)
driver.get(actual_url)
idp_login(email, password)
def test_oidc_update_domain_with_group_mappings(
signoz: SigNoz,
idp: TestContainerIDP,
get_token: Callable[[str, str], str],
get_oidc_settings: Callable[[str], dict],
) -> None:
"""
Updates OIDC domain to add role mapping with group mappings and claim mapping.
"""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
domain = _get_oidc_domain(signoz, admin_token)
client_id = f"oidc.integration.test.{signoz.self.host_configs['8080'].address}:{signoz.self.host_configs['8080'].port}"
settings = get_oidc_settings(client_id)
response = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
json={
"config": {
"ssoEnabled": True,
"ssoType": "oidc",
"oidcConfig": {
"clientId": settings["client_id"],
"clientSecret": settings["client_secret"],
"issuer": f"{idp.container.container_configs['6060'].get(urlparse(settings['issuer']).path)}",
"issuerAlias": settings["issuer"],
"getUserInfo": True,
"claimMapping": {
"email": "email",
"name": "name",
"groups": "groups",
"role": "signoz_role",
},
},
"roleMapping": {
"defaultRole": "VIEWER",
"groupMappings": {
"signoz-admins": "ADMIN",
"signoz-editors": "EDITOR",
"signoz-viewers": "VIEWER",
},
"useRoleAttribute": False,
},
},
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.NO_CONTENT
def test_oidc_role_mapping_single_group_admin(
signoz: SigNoz,
idp: TestContainerIDP,
driver: webdriver.Chrome,
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
) -> None:
"""
Test: OIDC user in 'signoz-admins' group gets ADMIN role.
"""
email = "admin-group-user@oidc.integration.test"
create_user_idp_with_groups(email, "password123", True, ["signoz-admins"])
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
def test_oidc_role_mapping_single_group_editor(
signoz: SigNoz,
idp: TestContainerIDP,
driver: webdriver.Chrome,
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
) -> None:
"""
Test: OIDC user in 'signoz-editors' group gets EDITOR role.
"""
email = "editor-group-user@oidc.integration.test"
create_user_idp_with_groups(email, "password123", True, ["signoz-editors"])
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
def test_oidc_role_mapping_multiple_groups_highest_wins(
signoz: SigNoz,
idp: TestContainerIDP,
driver: webdriver.Chrome,
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
) -> None:
"""
Test: OIDC user in multiple groups gets highest role.
User is in 'signoz-viewers' and 'signoz-admins'.
Expected: User gets ADMIN (highest of the two).
"""
email = "multi-group-user@oidc.integration.test"
create_user_idp_with_groups(email, "password123", True, ["signoz-viewers", "signoz-admins"])
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
def test_oidc_role_mapping_explicit_viewer_group(
signoz: SigNoz,
idp: TestContainerIDP,
driver: webdriver.Chrome,
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
) -> None:
"""
Test: OIDC user explicitly mapped to VIEWER via groups gets VIEWER.
Tests the bug where VIEWER mappings were ignored.
"""
email = "viewer-group-user@oidc.integration.test"
create_user_idp_with_groups(email, "password123", True, ["signoz-viewers"])
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "VIEWER"
def test_oidc_role_mapping_unmapped_group_uses_default(
signoz: SigNoz,
idp: TestContainerIDP,
driver: webdriver.Chrome,
create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
) -> None:
"""
Test: OIDC user in unmapped group falls back to default role.
"""
email = "unmapped-group-user@oidc.integration.test"
create_user_idp_with_groups(email, "password123", True, ["some-other-group"])
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "VIEWER"
def test_oidc_update_domain_with_use_role_claim(
signoz: SigNoz,
idp: TestContainerIDP,
get_token: Callable[[str, str], str],
get_oidc_settings: Callable[[str], dict],
) -> None:
"""
Updates OIDC domain to enable useRoleClaim.
"""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
domain = _get_oidc_domain(signoz, admin_token)
client_id = f"oidc.integration.test.{signoz.self.host_configs['8080'].address}:{signoz.self.host_configs['8080'].port}"
settings = get_oidc_settings(client_id)
response = requests.put(
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
json={
"config": {
"ssoEnabled": True,
"ssoType": "oidc",
"oidcConfig": {
"clientId": settings["client_id"],
"clientSecret": settings["client_secret"],
"issuer": f"{idp.container.container_configs['6060'].get(urlparse(settings['issuer']).path)}",
"issuerAlias": settings["issuer"],
"getUserInfo": True,
"claimMapping": {
"email": "email",
"name": "name",
"groups": "groups",
"role": "signoz_role",
},
},
"roleMapping": {
"defaultRole": "VIEWER",
"groupMappings": {
"signoz-admins": "ADMIN",
"signoz-editors": "EDITOR",
},
"useRoleAttribute": True,
},
},
},
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
assert response.status_code == HTTPStatus.NO_CONTENT
def test_oidc_role_mapping_role_claim_takes_precedence(
signoz: SigNoz,
idp: TestContainerIDP,
driver: webdriver.Chrome,
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
setup_user_profile: Callable[[], None],
) -> None:
"""
Test: useRoleAttribute takes precedence over group mappings.
User is in 'signoz-editors' group but has role claim 'ADMIN'.
Expected: User gets ADMIN (from role claim).
"""
setup_user_profile()
email = "role-claim-precedence@oidc.integration.test"
create_user_idp_with_role(email, "password123", True, "ADMIN", ["signoz-editors"])
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "ADMIN"
def test_oidc_role_mapping_invalid_role_claim_fallback(
signoz: SigNoz,
idp: TestContainerIDP,
driver: webdriver.Chrome,
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
setup_user_profile: Callable[[], None],
) -> None:
"""
Test: Invalid role claim falls back to group mappings.
User has invalid role 'SUPERADMIN' and is in 'signoz-editors'.
Expected: User gets EDITOR (from group mapping).
"""
setup_user_profile()
email = "invalid-role-user@oidc.integration.test"
create_user_idp_with_role(email, "password123", True, "SUPERADMIN", ["signoz-editors"])
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
def test_oidc_role_mapping_case_insensitive(
signoz: SigNoz,
idp: TestContainerIDP,
driver: webdriver.Chrome,
create_user_idp_with_role: Callable[[str, str, bool, str, List[str]], None],
idp_login: Callable[[str, str], None],
get_token: Callable[[str, str], str],
get_session_context: Callable[[str], str],
setup_user_profile: Callable[[], None],
) -> None:
"""
Test: Role claim matching is case-insensitive.
User has role 'editor' (lowercase).
Expected: User gets EDITOR role.
"""
setup_user_profile()
email = "lowercase-role-user@oidc.integration.test"
create_user_idp_with_role(email, "password123", True, "editor", [])
_perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
found_user = _get_user_by_email(signoz, admin_token, email)
assert found_user is not None
assert found_user["role"] == "EDITOR"
# def test_oidc_role_mapping_update_on_subsequent_login(
# signoz: SigNoz,
# idp: TestContainerIDP,
# driver: webdriver.Chrome,
# create_user_idp_with_groups: Callable[[str, str, bool, List[str]], None],
# add_user_to_group: Callable[[str, str], None],
# idp_login: Callable[[str, str], None],
# get_token: Callable[[str, str], str],
# get_session_context: Callable[[str], str],
# ) -> None:
# """
# Test: User's role should update on subsequent logins when IDP groups change.
# This tests the critical bug where GetOrCreateUser doesn't update roles.
# Steps:
# 1. User logs in with 'signoz-editors' group -> gets EDITOR role
# 2. User's group changes in IDP to 'signoz-admins'
# 3. User logs in again -> should get ADMIN role
# """
# email = "role-update-user@oidc.integration.test"
# # First login with EDITOR group
# create_user_idp_with_groups(email, "password123", True, ["signoz-editors"])
# _perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
# admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
# found_user = _get_user_by_email(signoz, admin_token, email)
# assert found_user is not None
# assert found_user["role"] == "EDITOR"
# # Add user to admin group in IDP
# add_user_to_group(email, "signoz-admins")
# # Clear browser session and login again
# driver.delete_all_cookies()
# _perform_oidc_login(signoz, idp, driver, get_session_context, idp_login, email, "password123")
# # Check if role was updated
# found_user = _get_user_by_email(signoz, admin_token, email)
# assert found_user is not None
# # After fix, this should be ADMIN. Currently stays EDITOR (bug).
# assert found_user["role"] == "ADMIN"
#!########################################################################
#!############## KEEP THIS IN THE END ALWAYS #############################
#!########################################################################
def test_cleanup_oidc_domain(
signoz: SigNoz,
get_token: Callable[[str, str], str],
) -> None:
"""Cleanup: Remove the OIDC domain after tests complete."""
admin_token = get_token(USER_ADMIN_EMAIL, USER_ADMIN_PASSWORD)
domain = _get_oidc_domain(signoz, admin_token)
if domain:
response = requests.delete(
signoz.self.host_configs["8080"].get(f"/api/v1/domains/{domain['id']}"),
headers={"Authorization": f"Bearer {admin_token}"},
timeout=2,
)
# 204 No Content or 200 OK are both valid
assert response.status_code in [HTTPStatus.OK, HTTPStatus.NO_CONTENT, HTTPStatus.NOT_FOUND]