Keycloak: Massenhaft die Gruppenzuordnung von Usern via REST-API ändern

Wenn man in Keycloak massenhaft Änderungen an Gruppenzuordnungen von bestehenden Usern vornehmen möchte, führt kein Weg an der Keycloak REST API vorbei. Zwar ermöglicht die REST API kein mit SQL vergleichbaren Massenänderungen, aber man kann sich zumindest mit wenigen Zeilen Code die einzelnen Requests zusammenbauen, um die gewünschte Massen-Änderung mit absolut überschaubarem Aufwand vorzunehmen.

In meinem konkreten Fall, sollte für knapp 10.000 in Keycloak bestehende User die Gruppenzuordnung geändert werden: Bei expliziten Usern sollte Gruppe A entfernt aber dafür Gruppe B ergänzt werden.

Da ich solche kleineren Tools am liebsten mit Python schreibe und Keycloak eine REST-API zur Verfügung stellt, teile ich hier meinen nicht perfekten, aber funktionalen Code, um massenhaft die Gruppenzuordnung bestehender User in Keycloak über die REST-API vorzunehmen.

Um über die Keycloak REST-API Änderungen an Usern vornehmen zu können, benötigt man einen User mit entsprechenden Role mappings.

Ebenso muss man wissen, das in dem von mir bereitgestellt Code kein Refresh des benötigten Tokens erfolgt, so dass man max. 300 Sekunden Zeit hat, um die Requests abzuarbeiten. Aus diesem Grund gehe ich in kleineren Schritten vor, und ändere die knapp 10.000 User nicht mit einem Lauf des Skriptes.

Ja, es gibt fertige Skripte und Sammlungen mit denen man relativ komfortable mit der REST-API von Keycloak kommunizieren kann. Ich gebe aber ungern Zugangsdaten zu Keycloak in mir „unbekannte“ Tools ein, so dass ich die benötigten Requests lieber kurz selber schreibe.

Anmeldung an der REST-API und setzen des Tokens für Folge-Operationen

Wir benötigen neben den Keycloak-spezifischen Daten lediglich noch die Python-Requests, um über die REST-API mit unserem Keycloak zu kommunizieren:

import requests

keycloak_root = "https://keycloak.url/auth"
keycloak_admin = "some_admin"
keycloak_admin_password = "some_admin_password"

GROUPTOADD_SLUG ="NEWGROUP"
GROUPTOADD_ID = "UUID"
GROUPTOREMOVE_SLUG = "OLDGROUP"
GROUPTOREMOVE_ID = "UUID"

first=0
max=10
runner=0

f = open("keycloak-log.txt", "a")

resp = requests.post(
    f"{keycloak_root}/realms/{keycloak_realm}/protocol/openid-connect/token",    
    data={
        "client_id": "admin-cli",
        "username": keycloak_admin,
        "password": keycloak_admin_password,
        "grant_type": "password"
    }
)
resp.raise_for_status()
data = resp.json()
access_token = data["access_token"]
print(f"{access_token[:20]}...{access_token[-20:]}")
print(f"Expires in {data['expires_in']}s")

# Predefine authorization headers for later use.
auth_headers = {
    "Authorization": f"Bearer {access_token}",
}

Bei erfolgreicher Anmeldung erhalten wir von Keycloak einen Access-Token, welcher in der Regel für 300 Sekunden gültig ist. Diesen Token müssen wir als Bearer im Auth-Header unserer Folge-Requests setzen.

REST-API Funktionen um Gruppen eines Users zu ermitteln, zu löschen und zu ergänzen

Als nächstes definieren wir drei Funktionen, mit denen wir

  1. die vorhandenen Gruppen eines Users ermitteln
  2. die Gruppe A vom User entfernen
  3. die Gruppe B dem User hinzufügen

können. Diese Funktionen werden wir dann später situativ aufrufen:

#get groups of user
def get_groups_of_user(userid):
    resp = requests.get(        
        f"{keycloak_root}/admin/realms/{keycloak_realm}/users/{userid}/groups",
        headers=auth_headers,
    )
    resp.raise_for_status()
    data=resp.json()
    return data

#add given group to given user
def add_group_to_user(userId, groupId):
    resp = requests.put(        
        f"{keycloak_root}/admin/realms/{keycloak_realm}/users/{userId}/groups/{groupId}",
        headers=auth_headers,
    )
    resp.raise_for_status()   
    #data=resp.json()
    return resp

#remove given group from given user
def remove_group_from_user(userId, groupId):
    resp = requests.delete(        
        f"{keycloak_root}/admin/realms/{keycloak_realm}/users/{userId}/groups/{groupId}",
        headers=auth_headers,
    )
    resp.raise_for_status()   
    #data=resp.json()
    return resp

Da wir die Funktionen zum Ermitteln, Entfernen und Ergänzen von Gruppen nun haben, geht’s ans eingemachte:

User Daten aus Keycloak über die REST-API aus Keycloak laden und verändern

Zunächst laden wir nun gemäß unseres Suchbegriffes die relevanten User über die REST-API. Da wir wissen, dass unser Token nur 300 Sekunden gilt, arbeiten wir mit den zuvor definierten Variablen first und max.

Durch die Daten aus der Response itterieren wir dann (for item in data) und entscheiden situativ, was zu tun ist.

Die neue Gruppe (Gruppe B) ergänzen wir pauschal immer.

Das Entfernen der Gruppe A führen wir aber nur aus, wenn diese überhaupt dem User zugeordnet ist:

#get users
resp = requests.get(    
    f"{keycloak_root}/admin/realms/{keycloak_realm}/users?username=3102&first={first}&max={max}",
    headers=auth_headers,
)
resp.raise_for_status()
data=resp.json()

f.write("Started new job; fist="+str(first)+" max="+str(max)+" end="+str(first+max)+"\n")

for item in data:
    print(runner, item['username'])
    runner+=1
    tmp_groups=get_groups_of_user(item['id'])        
    add_group_to_user(item['id'],GROUPTOADD_ID)
    f.write("added "+GROUPTOADD_SLUG+" to user "+item['id']+"/"+item['username']+"\n")    
    for group in tmp_groups:
        print(group['name'])
        if group['name'] == GROUPTOREMOVE_SLUG:
            print(GROUPTOREMOVE_SLUG, "given - REMOVE")
            remove_group_from_user(item['id'],GROUPTOREMOVE_ID)
            f.write("removed "+GROUPTOREMOVE_SLUG+" from user "+item['id']+"/"+item['username']+"\n")    

f.write("END of job\n")
f.close()

In einem Mini-Log schreiben wir uns noch ein paar nützliche Infos mit, damit wir im Falle eines Problems zumindest ein paar Ansätze für unsere Analyse haben.

Fertig.

Wie eingangs gesagt: Weder perfekter Code, noch wirklich sexy. Aber funktional und meiner Meinung nach mit absolut minimalem Aufwand umgesetzt, ohne die schützenswerten Zugangsdaten in Fremd-Code einfügen zu müssen.

Und hier der komplette Code, um via REST-API massenhaft bei Usern die Gruppenzuordnung zu ändern:

import requests

keycloak_root = "https://keycloak.url/auth"
keycloak_admin = "some_admin"
keycloak_admin_password = "some_admin_password"

GROUPTOADD_SLUG ="NEWGROUP"
GROUPTOADD_ID = "UUID"
GROUPTOREMOVE_SLUG = "OLDGROUP"
GROUPTOREMOVE_ID = "UUID"

first=0
max=10
runner=0

f = open("keycloak-log.txt", "a")

resp = requests.post(
    f"{keycloak_root}/realms/{keycloak_realm}/protocol/openid-connect/token",    
    data={
        "client_id": "admin-cli",
        "username": keycloak_admin,
        "password": keycloak_admin_password,
        "grant_type": "password"
    }
)
resp.raise_for_status()
data = resp.json()
access_token = data["access_token"]
print(f"{access_token[:20]}...{access_token[-20:]}")
print(f"Expires in {data['expires_in']}s")

# Predefine authorization headers for later use.
auth_headers = {
    "Authorization": f"Bearer {access_token}",
}

#get groups of user
def get_groups_of_user(userid):
    resp = requests.get(        
        f"{keycloak_root}/admin/realms/{keycloak_realm}/users/{userid}/groups",
        headers=auth_headers,
    )
    resp.raise_for_status()
    data=resp.json()
    return data

#add given group to given user
def add_group_to_user(userId, groupId):
    resp = requests.put(        
        f"{keycloak_root}/admin/realms/{keycloak_realm}/users/{userId}/groups/{groupId}",
        headers=auth_headers,
    )
    resp.raise_for_status()   
    #data=resp.json()
    return resp

#remove given group from given user
def remove_group_from_user(userId, groupId):
    resp = requests.delete(        
        f"{keycloak_root}/admin/realms/{keycloak_realm}/users/{userId}/groups/{groupId}",
        headers=auth_headers,
    )
    resp.raise_for_status()   
    #data=resp.json()
    return resp

#get users
resp = requests.get(    
    f"{keycloak_root}/admin/realms/{keycloak_realm}/users?username=3102&first={first}&max={max}",
    headers=auth_headers,
)
resp.raise_for_status()
data=resp.json()

f.write("Started new job; fist="+str(first)+" max="+str(max)+" end="+str(first+max)+"\n")

for item in data:
    print(runner, item['username'])
    runner+=1
    tmp_groups=get_groups_of_user(item['id'])        
    add_group_to_user(item['id'],GROUPTOADD_ID)
    f.write("added "+GROUPTOADD_SLUG+" to user "+item['id']+"/"+item['username']+"\n")    
    for group in tmp_groups:
        print(group['name'])
        if group['name'] == GROUPTOREMOVE_SLUG:
            print(GROUPTOREMOVE_SLUG, "given - REMOVE")
            remove_group_from_user(item['id'],GROUPTOREMOVE_ID)
            f.write("removed "+GROUPTOREMOVE_SLUG+" from user "+item['id']+"/"+item['username']+"\n")    

f.write("END of job\n")
f.close()

In folgendem Beitrag habe ich bereits beschrieben, wie man in Keycloak ein beliebiges User-Attribut im JWT ausgeben kann. Ggf. ist das für dich ja auch hilfreich, wenn du mit Keycloak arbeitest? 😉

Produktempfehlungen