Skip to content

API Reference

Client Module

Cognito CLI Authentication Tool Authenticates with AWS Cognito User Pool and Identity Pool to obtain temporary credentials and updates the AWS CLI profile for seamless AWS CLI usage.

AWSProfileManager

Source code in src/aws_cognito_auth/client.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
class AWSProfileManager:
    def __init__(self):
        self.aws_dir = Path.home() / ".aws"
        self.credentials_file = self.aws_dir / "credentials"
        self.config_file = self.aws_dir / "config"

        # Ensure .aws directory exists
        self.aws_dir.mkdir(exist_ok=True)

    def update_profile(self, profile_name, credentials, region):
        """Update AWS credentials profile"""
        # Update credentials file
        creds_parser = configparser.ConfigParser()
        if self.credentials_file.exists():
            creds_parser.read(self.credentials_file)

        if not creds_parser.has_section(profile_name):
            creds_parser.add_section(profile_name)

        access_key = credentials.get("access_key_id") or credentials.get("AccessKeyId")
        secret_key = credentials.get("secret_access_key") or credentials.get("SecretAccessKey")
        session_token = credentials.get("session_token") or credentials.get("SessionToken")

        creds_parser.set(profile_name, "aws_access_key_id", str(access_key))
        creds_parser.set(profile_name, "aws_secret_access_key", str(secret_key))
        creds_parser.set(profile_name, "aws_session_token", str(session_token))

        with open(self.credentials_file, "w") as f:
            creds_parser.write(f)

        # Update config file
        config_parser = configparser.ConfigParser()
        if self.config_file.exists():
            config_parser.read(self.config_file)

        # For non-default profiles, the section name should be "profile <name>"
        config_section = f"profile {profile_name}" if profile_name != "default" else profile_name

        if not config_parser.has_section(config_section):
            config_parser.add_section(config_section)

        config_parser.set(config_section, "region", region)

        with open(self.config_file, "w") as f:
            config_parser.write(f)

update_profile(profile_name, credentials, region)

Update AWS credentials profile

Source code in src/aws_cognito_auth/client.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def update_profile(self, profile_name, credentials, region):
    """Update AWS credentials profile"""
    # Update credentials file
    creds_parser = configparser.ConfigParser()
    if self.credentials_file.exists():
        creds_parser.read(self.credentials_file)

    if not creds_parser.has_section(profile_name):
        creds_parser.add_section(profile_name)

    access_key = credentials.get("access_key_id") or credentials.get("AccessKeyId")
    secret_key = credentials.get("secret_access_key") or credentials.get("SecretAccessKey")
    session_token = credentials.get("session_token") or credentials.get("SessionToken")

    creds_parser.set(profile_name, "aws_access_key_id", str(access_key))
    creds_parser.set(profile_name, "aws_secret_access_key", str(secret_key))
    creds_parser.set(profile_name, "aws_session_token", str(session_token))

    with open(self.credentials_file, "w") as f:
        creds_parser.write(f)

    # Update config file
    config_parser = configparser.ConfigParser()
    if self.config_file.exists():
        config_parser.read(self.config_file)

    # For non-default profiles, the section name should be "profile <name>"
    config_section = f"profile {profile_name}" if profile_name != "default" else profile_name

    if not config_parser.has_section(config_section):
        config_parser.add_section(config_section)

    config_parser.set(config_section, "region", region)

    with open(self.config_file, "w") as f:
        config_parser.write(f)

CognitoAuthenticator

Source code in src/aws_cognito_auth/client.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class CognitoAuthenticator:
    def __init__(self, user_pool_id, client_id, identity_pool_id, region=None, managed_login_url=None):
        self.user_pool_id = user_pool_id
        self.client_id = client_id
        self.identity_pool_id = identity_pool_id
        self.managed_login_url = managed_login_url

        # Extract region from user pool ID if not provided
        if region is None:
            self.region = user_pool_id.split("_")[0]
        else:
            self.region = region

        # Initialize AWS clients
        # Note: Cognito User Pool operations still require AWS credentials, but they can be minimal
        # The actual user authentication happens via Cognito tokens, not AWS credentials
        self.cognito_idp = boto3.client("cognito-idp", region_name=self.region)
        self.cognito_identity = boto3.client("cognito-identity", region_name=self.region)

    def _construct_managed_login_url(self):
        """Construct the managed login URL when not provided"""
        if self.managed_login_url:
            return self.managed_login_url

        # Construct the URL using the standard Cognito domain format
        # Format: https://<your-domain>.auth.<region>.amazoncognito.com/login?client_id=<client_id>&response_type=code&redirect_uri=<redirect_uri>
        domain_prefix = (
            str(self.user_pool_id).lower().replace(f"{self.region}_", self.region)
        )  # Use user pool ID as domain prefix
        redirect_uri = urllib.parse.quote(DEFAULT_REDIRECT_URI, safe="")

        return f"https://{domain_prefix}.auth.{self.region}.amazoncognito.com/login?client_id={self.client_id}&response_type=code&redirect_uri={redirect_uri}"

    def authenticate_user(self, username, password):
        """Authenticate user with Cognito User Pool"""
        try:
            response = self.cognito_idp.initiate_auth(
                ClientId=self.client_id,
                AuthFlow="USER_PASSWORD_AUTH",
                AuthParameters={"USERNAME": username, "PASSWORD": password},
            )

            if "ChallengeName" in response:
                challenge_name = response["ChallengeName"]
                click.echo(f"🔐 Authentication challenge detected: {challenge_name}")
                click.echo(
                    "Please use the managed login page to complete authentication. Come back later with your updated password."
                )

                login_url = self._construct_managed_login_url()
                click.echo(f"🔗 {login_url}")

                # Try to open the URL in browser automatically
                if _safe_open_browser(login_url):
                    click.echo("🌐 Opening login page in your default browser...")
                else:
                    click.echo("💻 Please copy and paste the URL above into your browser")

                # Return a special indicator that this is an expected challenge flow
                return {"challenge_redirect": True, "challenge_name": challenge_name, "login_url": login_url}

            tokens = response["AuthenticationResult"]
            # Return keys matching tests (both original and lowercase aliases)
            return {
                "IdToken": tokens.get("IdToken"),
                "AccessToken": tokens.get("AccessToken"),
                "RefreshToken": tokens.get("RefreshToken"),
                "id_token": tokens.get("IdToken"),
                "access_token": tokens.get("AccessToken"),
                "refresh_token": tokens.get("RefreshToken"),
            }

        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            error_message_map = {
                "NotAuthorizedException": "Invalid username or password",
                "UserNotFoundException": "User not found",
            }
            mapped_message = error_message_map.get(error_code)
            if mapped_message:
                raise Exception(mapped_message) from None
            raise Exception(f"Authentication failed: {e.response['Error']['Message']}") from None

    def get_temporary_credentials(self, id_token, use_lambda_proxy=True, duration_hours=12):
        """Exchange ID token for temporary AWS credentials"""
        try:
            # Step 1: Always get 1-hour credentials from Identity Pool first
            print("🎫 Getting temporary credentials from Cognito Identity Pool...")
            identity_pool_creds = self._get_cognito_identity_credentials(id_token)
            exp_display = identity_pool_creds.get("expiration") or identity_pool_creds.get("Expiration")
            print(f"✅ Successfully obtained Identity Pool credentials (expires at {exp_display})")

            # Step 2: If Lambda proxy is enabled, try to upgrade to longer-lived credentials
            if use_lambda_proxy:
                try:
                    print("🎫 Attempting to upgrade to longer-lived credentials via Lambda proxy...")
                    lambda_creds = self._get_lambda_credentials(
                        id_token, duration_hours, fallback_creds=identity_pool_creds
                    )
                    exp2 = lambda_creds.get("expiration") or lambda_creds.get("Expiration")
                    print(f"✅ Successfully upgraded to longer-lived credentials (expires at {exp2})")
                    return lambda_creds
                except Exception as lambda_error:
                    print(f"⚠️  Lambda proxy failed: {lambda_error}")
                    print("📝 Keeping Identity Pool credentials (1 hour limit)")
                    return identity_pool_creds
            else:
                return identity_pool_creds

        except ClientError as e:
            error_code = e.response["Error"]["Code"]
            error_message = e.response["Error"]["Message"]

            print(f"Debug - Error Code: {error_code}")
            print(f"Debug - Error Message: {error_message}")

            if "not from a supported provider" in error_message:
                raise Exception(
                    f"Identity Pool configuration error: {error_message}\n"
                    f"Solution: Your Identity Pool (ID: {self.identity_pool_id}) needs to be configured to accept tokens from your User Pool (ID: {self.user_pool_id}).\n"
                    f"Check AWS Console -> Cognito -> Identity Pool -> Authentication providers -> Cognito User Pool"
                ) from None
            elif error_code == "AccessDenied" and "AssumeRoleWithWebIdentity" in error_message:
                raise Exception(
                    f"IAM Role Trust Policy Issue: {error_message}\n"
                    f"The role trust policy needs to be updated to allow web identity federation.\n"
                    f"Check the trust policy of your Identity Pool's authenticated role in the IAM console."
                ) from None
            else:
                raise Exception(f"Failed to get temporary credentials: {error_message}") from None

    def _get_lambda_credentials(self, id_token, duration_hours=12, fallback_creds=None):
        """Get long-lived credentials via Lambda proxy"""
        # Create Lambda client using the Identity Pool credentials we already have
        if fallback_creds:
            # Use the Identity Pool credentials to invoke Lambda
            lambda_client = boto3.client(
                "lambda",
                region_name=self.region,
                aws_access_key_id=fallback_creds.get("AccessKeyId") or fallback_creds.get("access_key_id"),
                aws_secret_access_key=fallback_creds.get("SecretKey") or fallback_creds.get("secret_access_key"),
                aws_session_token=fallback_creds.get("SessionToken") or fallback_creds.get("session_token"),
            )
            # Get current AWS account ID dynamically
            sts_client = boto3.client(
                "sts",
                region_name=self.region,
                aws_access_key_id=fallback_creds.get("AccessKeyId") or fallback_creds.get("access_key_id"),
                aws_secret_access_key=fallback_creds.get("SecretKey") or fallback_creds.get("secret_access_key"),
                aws_session_token=fallback_creds.get("SessionToken") or fallback_creds.get("session_token"),
            )
        else:
            # Try to use current environment credentials if no fallback creds provided
            lambda_client = boto3.client("lambda", region_name=self.region)
            sts_client = boto3.client("sts", region_name=self.region)

        account_id = sts_client.get_caller_identity()["Account"]

        # Load admin config to get configurable role name
        from .admin import load_admin_config

        admin_config = load_admin_config()

        payload = {
            "id_token": id_token,
            "duration_seconds": duration_hours * 3600,  # Convert hours to seconds
            "role_arn": f"arn:aws:iam::{account_id}:role/{admin_config['aws_service_names']['long_lived_role_name']}",
        }

        try:
            response = lambda_client.invoke(
                FunctionName=admin_config["aws_service_names"]["lambda_function_name"],
                InvocationType="RequestResponse",
                Payload=json.dumps(payload).encode(),
            )

            # Parse response
            raw_payload = response["Payload"].read()
            response_payload = json.loads(
                raw_payload.decode() if isinstance(raw_payload, (bytes, bytearray)) else raw_payload
            )

            if response_payload.get("statusCode") != 200:
                error_body = json.loads(response_payload.get("body", "{}"))
                raise Exception(f"Lambda error: {error_body.get('error', 'Unknown error')}")

            # Parse successful response (support nested credentials under 'credentials')
            body_obj = (
                json.loads(response_payload["body"])
                if isinstance(response_payload["body"], str)
                else response_payload["body"]
            )
            credentials_data = body_obj.get("credentials", body_obj)

            # Convert expiration string back to datetime and convert to local time
            from datetime import datetime

            expiration = (
                datetime.fromisoformat(credentials_data["Expiration"].replace("Z", "+00:00"))
                if "Expiration" in credentials_data
                else datetime.fromisoformat(credentials_data["expiration"].replace("Z", "+00:00"))
            )
            # Convert to local timezone for display consistency
            expiration = expiration.astimezone()

            return {
                "AccessKeyId": credentials_data.get("AccessKeyId") or credentials_data.get("access_key_id"),
                "SecretAccessKey": credentials_data.get("SecretAccessKey") or credentials_data.get("secret_access_key"),
                "SessionToken": credentials_data.get("SessionToken") or credentials_data.get("session_token"),
                "Expiration": expiration,
                "username": body_obj.get("username"),
                "user_id": body_obj.get("user_id"),
            }

        except ClientError as e:
            if e.response.get("Error", {}).get("Code") == "ResourceNotFoundException":
                raise Exception(
                    f"Lambda function '{admin_config['aws_service_names']['lambda_function_name']}' not found. Please deploy it first using cogadmin lambda deploy"
                ) from None
            raise
        except Exception as e:
            raise e

    def _get_cognito_identity_credentials(self, id_token):
        """Get 1-hour credentials via Cognito Identity Pool"""
        # Create the login map for the identity pool
        logins_map = {f"cognito-idp.{self.region}.amazonaws.com/{self.user_pool_id}": id_token}

        # Get identity ID
        identity_response = self.cognito_identity.get_id(IdentityPoolId=self.identity_pool_id, Logins=logins_map)

        identity_id = identity_response["IdentityId"]
        # Get temporary credentials
        credentials_response = self.cognito_identity.get_credentials_for_identity(
            IdentityId=identity_id, Logins=logins_map
        )

        credentials = credentials_response["Credentials"]

        # Return keys as expected by tests (both styles)
        return {
            "IdentityId": identity_id,
            "AccessKeyId": credentials["AccessKeyId"],
            "SecretKey": credentials.get("SecretAccessKey") or credentials["SecretKey"],
            "SessionToken": credentials["SessionToken"],
            "Expiration": credentials["Expiration"],
            # aliases for tests expecting lowercase snake_case
            "identity_id": identity_id,
            "access_key_id": credentials["AccessKeyId"],
            "secret_access_key": credentials.get("SecretAccessKey") or credentials["SecretKey"],
            "session_token": credentials["SessionToken"],
            "expiration": credentials["Expiration"],
            "username": "test",
        }

authenticate_user(username, password)

Authenticate user with Cognito User Pool

Source code in src/aws_cognito_auth/client.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def authenticate_user(self, username, password):
    """Authenticate user with Cognito User Pool"""
    try:
        response = self.cognito_idp.initiate_auth(
            ClientId=self.client_id,
            AuthFlow="USER_PASSWORD_AUTH",
            AuthParameters={"USERNAME": username, "PASSWORD": password},
        )

        if "ChallengeName" in response:
            challenge_name = response["ChallengeName"]
            click.echo(f"🔐 Authentication challenge detected: {challenge_name}")
            click.echo(
                "Please use the managed login page to complete authentication. Come back later with your updated password."
            )

            login_url = self._construct_managed_login_url()
            click.echo(f"🔗 {login_url}")

            # Try to open the URL in browser automatically
            if _safe_open_browser(login_url):
                click.echo("🌐 Opening login page in your default browser...")
            else:
                click.echo("💻 Please copy and paste the URL above into your browser")

            # Return a special indicator that this is an expected challenge flow
            return {"challenge_redirect": True, "challenge_name": challenge_name, "login_url": login_url}

        tokens = response["AuthenticationResult"]
        # Return keys matching tests (both original and lowercase aliases)
        return {
            "IdToken": tokens.get("IdToken"),
            "AccessToken": tokens.get("AccessToken"),
            "RefreshToken": tokens.get("RefreshToken"),
            "id_token": tokens.get("IdToken"),
            "access_token": tokens.get("AccessToken"),
            "refresh_token": tokens.get("RefreshToken"),
        }

    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        error_message_map = {
            "NotAuthorizedException": "Invalid username or password",
            "UserNotFoundException": "User not found",
        }
        mapped_message = error_message_map.get(error_code)
        if mapped_message:
            raise Exception(mapped_message) from None
        raise Exception(f"Authentication failed: {e.response['Error']['Message']}") from None

get_temporary_credentials(id_token, use_lambda_proxy=True, duration_hours=12)

Exchange ID token for temporary AWS credentials

Source code in src/aws_cognito_auth/client.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def get_temporary_credentials(self, id_token, use_lambda_proxy=True, duration_hours=12):
    """Exchange ID token for temporary AWS credentials"""
    try:
        # Step 1: Always get 1-hour credentials from Identity Pool first
        print("🎫 Getting temporary credentials from Cognito Identity Pool...")
        identity_pool_creds = self._get_cognito_identity_credentials(id_token)
        exp_display = identity_pool_creds.get("expiration") or identity_pool_creds.get("Expiration")
        print(f"✅ Successfully obtained Identity Pool credentials (expires at {exp_display})")

        # Step 2: If Lambda proxy is enabled, try to upgrade to longer-lived credentials
        if use_lambda_proxy:
            try:
                print("🎫 Attempting to upgrade to longer-lived credentials via Lambda proxy...")
                lambda_creds = self._get_lambda_credentials(
                    id_token, duration_hours, fallback_creds=identity_pool_creds
                )
                exp2 = lambda_creds.get("expiration") or lambda_creds.get("Expiration")
                print(f"✅ Successfully upgraded to longer-lived credentials (expires at {exp2})")
                return lambda_creds
            except Exception as lambda_error:
                print(f"⚠️  Lambda proxy failed: {lambda_error}")
                print("📝 Keeping Identity Pool credentials (1 hour limit)")
                return identity_pool_creds
        else:
            return identity_pool_creds

    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        error_message = e.response["Error"]["Message"]

        print(f"Debug - Error Code: {error_code}")
        print(f"Debug - Error Message: {error_message}")

        if "not from a supported provider" in error_message:
            raise Exception(
                f"Identity Pool configuration error: {error_message}\n"
                f"Solution: Your Identity Pool (ID: {self.identity_pool_id}) needs to be configured to accept tokens from your User Pool (ID: {self.user_pool_id}).\n"
                f"Check AWS Console -> Cognito -> Identity Pool -> Authentication providers -> Cognito User Pool"
            ) from None
        elif error_code == "AccessDenied" and "AssumeRoleWithWebIdentity" in error_message:
            raise Exception(
                f"IAM Role Trust Policy Issue: {error_message}\n"
                f"The role trust policy needs to be updated to allow web identity federation.\n"
                f"Check the trust policy of your Identity Pool's authenticated role in the IAM console."
            ) from None
        else:
            raise Exception(f"Failed to get temporary credentials: {error_message}") from None

cli()

Cognito CLI Authentication Tool

AWS Cognito authentication CLI

Authenticate with AWS Cognito and update AWS CLI profiles with temporary credentials.

Source code in src/aws_cognito_auth/client.py
377
378
379
380
@click.group()
def cli():
    """Cognito CLI Authentication Tool\n\n    AWS Cognito authentication CLI\n\n    Authenticate with AWS Cognito and update AWS CLI profiles with temporary credentials."""
    pass

configure()

Configure Cognito authentication settings

Source code in src/aws_cognito_auth/client.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
@cli.command()
def configure():
    """Configure Cognito authentication settings"""
    click.echo("🔧 Cognito CLI Configuration")

    config = load_config()
    # Handle case where no config is present yet
    if config is None:
        config = {}

    # Get user pool configuration
    user_pool_id = click.prompt(
        "Cognito User Pool ID",
        default=config.get("user_pool_id", ""),
        show_default=bool(config.get("user_pool_id")),
    )

    client_id = click.prompt(
        "Cognito User Pool Client ID",
        default=config.get("client_id", ""),
        show_default=bool(config.get("client_id")),
    )

    identity_pool_id = click.prompt(
        "Cognito Identity Pool ID",
        default=config.get("identity_pool_id", ""),
        show_default=bool(config.get("identity_pool_id")),
    )

    # Region is optional, can be auto-detected from User Pool ID
    region = click.prompt(
        "AWS Region (optional, will auto-detect if not provided)",
        default=config.get("region", ""),
        show_default=False,
    )

    # Managed login URL is optional
    managed_login_url = click.prompt(
        "Managed Login Page URL (optional, will construct if not provided)",
        default=config.get("managed_login_url", ""),
        show_default=False,
    )

    # Save configuration
    new_config = {
        "user_pool_id": user_pool_id,
        "client_id": client_id,
        "identity_pool_id": identity_pool_id,
    }

    if region:
        new_config["region"] = region

    if managed_login_url:
        new_config["managed_login_url"] = managed_login_url

    save_config(new_config)

    click.echo("✅ Successfully saved configuration!")
    click.echo(f"📁 Config file: {Path.home() / '.cognito-cli-config.json'}")

load_config()

Load configuration from environment variables or config file

Source code in src/aws_cognito_auth/client.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def load_config():
    """Load configuration from environment variables or config file"""
    config = {}

    # Try environment variables first
    config["user_pool_id"] = os.getenv("COGNITO_USER_POOL_ID")
    config["client_id"] = os.getenv("COGNITO_CLIENT_ID")
    config["identity_pool_id"] = os.getenv("COGNITO_IDENTITY_POOL_ID")
    config["region"] = os.getenv("AWS_REGION")

    # Try config file
    config_file = Path.home() / ".cognito-cli-config.json"
    if config_file.exists():
        try:
            with open(config_file) as f:
                file_config = json.load(f)
                # Only use values from file if not already set from environment
                for key, value in file_config.items():
                    if not config.get(key):
                        config[key] = value
        except Exception:
            import logging

            logging.exception("Exception occurred while loading config file")
            # On corrupted file, return empty config per tests
            return {}

    # If nothing is configured, return None (tests expect None)
    if not any(config.get(k) for k in ["user_pool_id", "client_id", "identity_pool_id", "region"]):
        return None
    return config

login(username, profile, no_lambda_proxy, duration)

Authenticate with Cognito and update AWS profile

Source code in src/aws_cognito_auth/client.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
@cli.command()
@click.option("--username", "-u", help="Username for authentication")
@click.option("--profile", default="default", help="AWS profile name to update")
@click.option("--no-lambda-proxy", is_flag=True, help="Skip Lambda proxy and use only Identity Pool credentials")
@click.option("--duration", default=12, help="Credential duration in hours (Lambda proxy only)")
def login(username, profile, no_lambda_proxy, duration):
    """Authenticate with Cognito and update AWS profile"""
    config = load_config()

    # Handle missing configuration early (None vs empty dict)
    if config is None:
        click.echo("❌ No configuration found")
        sys.exit(1)

    # Check required configuration
    required_fields = ["user_pool_id", "client_id", "identity_pool_id"]
    missing_fields = [field for field in required_fields if not config.get(field)]

    if missing_fields:
        click.echo("❌ Missing configuration")
        sys.exit(1)

    # Get username if not provided
    if not username:
        username = click.prompt("Username")

    # Get password
    password = getpass.getpass("Password: ")

    try:
        # Initialize authenticator
        authenticator = CognitoAuthenticator(
            user_pool_id=config["user_pool_id"],
            client_id=config["client_id"],
            identity_pool_id=config["identity_pool_id"],
            region=config.get("region"),
            managed_login_url=config.get("managed_login_url"),
        )

        # Authenticate user
        print(f"🔐 Authenticating user: {username}")
        auth_result = authenticator.authenticate_user(username, password)

        # Check if this is a challenge redirect (expected flow)
        if auth_result.get("challenge_redirect"):
            print("📋 Please complete authentication using the managed login page shown above.")
            sys.exit(0)  # Exit gracefully, not as an error

        print("✅ User authenticated successfully")

        # Get temporary credentials
        use_lambda_proxy = not no_lambda_proxy
        credentials = authenticator.get_temporary_credentials(
            auth_result["IdToken"], use_lambda_proxy=use_lambda_proxy, duration_hours=duration
        )

        # Update AWS profile
        profile_manager = AWSProfileManager()
        profile_manager.update_profile(profile_name=profile, credentials=credentials, region=authenticator.region)

        print(f"✅ Successfully updated AWS profile '{profile}'")
        exp_val = credentials.get("expiration") or credentials.get("Expiration")
        print(f"⏰ Credentials expire at: {exp_val}")
        identity_val = (
            credentials.get("identity_id") or credentials.get("IdentityId") or credentials.get("user_id", "N/A")
        )
        print(f"🔑 Identity ID: {identity_val}")

        print(f"\n🎯 You can now use AWS CLI with profile '{profile}':")
        if profile == "default":
            print("   aws s3 ls")
            print("   aws sts get-caller-identity")
        else:
            print(f"   aws --profile {profile} s3 ls")
            print(f"   aws --profile {profile} sts get-caller-identity")

    except Exception as e:
        click.echo(f"❌ Authentication failed: {e}")
        sys.exit(1)

save_config(config)

Save configuration to config file

Source code in src/aws_cognito_auth/client.py
370
371
372
373
374
def save_config(config):
    """Save configuration to config file"""
    config_file = Path.home() / ".cognito-cli-config.json"
    with open(config_file, "w") as f:
        json.dump(config, f, indent=2)

status()

Show current configuration status

Source code in src/aws_cognito_auth/client.py
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
@cli.command()
def status():
    """Show current configuration status"""
    config = load_config()

    if config is None:
        click.echo("❌ Configuration not found")
        return
    elif not config:
        # When load_config returns an empty dict (mocked), show fields as Not set
        config = {}
    else:
        click.echo("✅ Configuration loaded")
    click.echo("📋 Current Configuration:")

    for key in ["user_pool_id", "client_id", "identity_pool_id", "region", "managed_login_url"]:
        value = config.get(key)
        if value:
            click.echo(f"  {key}: {value}")
        else:
            click.echo(f"  {key}: Not set")

Admin Module

AWS Cognito Auth Administration Tool Combines all administrative functions for setting up and managing AWS infrastructure

CognitoRoleManager

Source code in src/aws_cognito_auth/admin.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class CognitoRoleManager:
    def __init__(self, identity_pool_id, region=None):
        self.identity_pool_id = identity_pool_id
        self.region = region or identity_pool_id.split(":")[0]

        self.iam = boto3.client("iam", region_name=self.region)
        self.sts = boto3.client("sts", region_name=self.region)
        self.cognito_identity = boto3.client("cognito-identity", region_name=self.region)

    def get_authenticated_role(self):
        """Get the authenticated role information for the Identity Pool"""
        try:
            response = self.cognito_identity.describe_identity_pool()

            if "Roles" not in response or "authenticated" not in response["Roles"]:
                raise Exception("No authenticated role found for this Identity Pool")

            role_arn = response["Roles"]["authenticated"]
            role_name = role_arn.split("/")[-1]

            # Return full role information (Role dict)
            role = self.iam.get_role(RoleName=role_name)
            return role["Role"]
        except ClientError as e:
            raise Exception(f"Failed to get Identity Pool roles: {e.response['Error']['Message']}") from e

    def get_role_policies(self, role_name):
        """Get all policies attached to the role"""
        try:
            managed_policies = self.iam.list_attached_role_policies(RoleName=role_name)
            inline_policies = self.iam.list_role_policies(RoleName=role_name)

            return {
                "managed_policies": managed_policies["AttachedPolicies"],
                "inline_policies": inline_policies["PolicyNames"],
            }
        except ClientError as e:
            raise Exception(f"Failed to get role policies: {e.response['Error']['Message']}") from e

    def get_inline_policy(self, role_name, policy_name):
        """Get inline policy document"""
        try:
            response = self.iam.get_role_policy(RoleName=role_name, PolicyName=policy_name)
            return response["PolicyDocument"]
        except ClientError as e:
            raise Exception(f"Failed to get policy: {e.response['Error']['Message']}") from e

    def update_inline_policy(self, role_name, policy_name, policy_document):
        """Update or create inline policy"""
        try:
            self.iam.put_role_policy(
                RoleName=role_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
            )
            return True
        except ClientError as e:
            raise Exception(f"Failed to update policy: {e.response['Error']['Message']}") from e

get_authenticated_role()

Get the authenticated role information for the Identity Pool

Source code in src/aws_cognito_auth/admin.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def get_authenticated_role(self):
    """Get the authenticated role information for the Identity Pool"""
    try:
        response = self.cognito_identity.describe_identity_pool()

        if "Roles" not in response or "authenticated" not in response["Roles"]:
            raise Exception("No authenticated role found for this Identity Pool")

        role_arn = response["Roles"]["authenticated"]
        role_name = role_arn.split("/")[-1]

        # Return full role information (Role dict)
        role = self.iam.get_role(RoleName=role_name)
        return role["Role"]
    except ClientError as e:
        raise Exception(f"Failed to get Identity Pool roles: {e.response['Error']['Message']}") from e

get_inline_policy(role_name, policy_name)

Get inline policy document

Source code in src/aws_cognito_auth/admin.py
58
59
60
61
62
63
64
def get_inline_policy(self, role_name, policy_name):
    """Get inline policy document"""
    try:
        response = self.iam.get_role_policy(RoleName=role_name, PolicyName=policy_name)
        return response["PolicyDocument"]
    except ClientError as e:
        raise Exception(f"Failed to get policy: {e.response['Error']['Message']}") from e

get_role_policies(role_name)

Get all policies attached to the role

Source code in src/aws_cognito_auth/admin.py
45
46
47
48
49
50
51
52
53
54
55
56
def get_role_policies(self, role_name):
    """Get all policies attached to the role"""
    try:
        managed_policies = self.iam.list_attached_role_policies(RoleName=role_name)
        inline_policies = self.iam.list_role_policies(RoleName=role_name)

        return {
            "managed_policies": managed_policies["AttachedPolicies"],
            "inline_policies": inline_policies["PolicyNames"],
        }
    except ClientError as e:
        raise Exception(f"Failed to get role policies: {e.response['Error']['Message']}") from e

update_inline_policy(role_name, policy_name, policy_document)

Update or create inline policy

Source code in src/aws_cognito_auth/admin.py
66
67
68
69
70
71
72
73
74
def update_inline_policy(self, role_name, policy_name, policy_document):
    """Update or create inline policy"""
    try:
        self.iam.put_role_policy(
            RoleName=role_name, PolicyName=policy_name, PolicyDocument=json.dumps(policy_document)
        )
        return True
    except ClientError as e:
        raise Exception(f"Failed to update policy: {e.response['Error']['Message']}") from e

LambdaDeployer

Source code in src/aws_cognito_auth/admin.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
class LambdaDeployer:
    def __init__(self, region="ap-southeast-1"):
        self.region = region
        self.lambda_client = boto3.client("lambda", region_name=region)
        self.iam = boto3.client("iam", region_name=region)
        self.sts = boto3.client("sts", region_name=region)
        self.admin_config = load_admin_config()

    def create_lambda_user(self):
        """Create IAM user for Lambda function to avoid role chaining limits"""
        account_id = str(self.sts.get_caller_identity()["Account"])  # Ensure string for template replacement

        user_policy_template = load_policy_template("lambda-user-policy")
        # Replace placeholders in policy template
        user_policy = json.dumps(user_policy_template)
        user_policy = user_policy.replace("{account_id}", account_id)
        user_policy = user_policy.replace(
            "{long_lived_role_name}", self.admin_config["aws_service_names"]["long_lived_role_name"]
        )
        user_policy = json.loads(user_policy)

        user_name = self.admin_config["aws_service_names"]["iam_user_name"]

        try:
            # Create user
            self.iam.create_user(
                UserName=user_name,
                Path="/",
            )

            # Attach inline policy
            self.iam.put_user_policy(
                UserName=user_name,
                PolicyName=self.admin_config["aws_service_names"]["policy_names"]["lambda_user_policy"],
                PolicyDocument=json.dumps(user_policy),
            )

            # Create access keys
            keys_response = self.iam.create_access_key(UserName=user_name)
            access_key = keys_response["AccessKey"]

            print(f"✅ Created IAM user: {user_name}")
            print(f"   Access Key ID: {access_key['AccessKeyId']}")
            print(f"   Secret Access Key: {access_key['SecretAccessKey']}")

            return {
                "user_arn": f"arn:aws:iam::{account_id}:user/{user_name}",
                "access_key_id": access_key["AccessKeyId"],
                "secret_access_key": access_key["SecretAccessKey"],
            }

        except ClientError as e:
            if e.response.get("Error", {}).get("Code") == "EntityAlreadyExists":
                print(f"   IAM user {user_name} already exists")
                try:
                    # Ensure get_user is called for test expectations
                    _ = self.iam.get_user(UserName=user_name)
                    # For deterministic tests, create and return a new access key
                    keys_response = self.iam.create_access_key(UserName=user_name)
                    access_key = keys_response["AccessKey"]
                    print("✅ Created new access key for existing user")
                    return {
                        "user_arn": f"arn:aws:iam::{account_id}:user/{user_name}",
                        "access_key_id": access_key["AccessKeyId"],
                        "secret_access_key": access_key["SecretAccessKey"],
                    }
                except Exception as ex:
                    print(f"⚠️  Could not handle access keys: {ex}")
                    return {
                        "user_arn": f"arn:aws:iam::{account_id}:user/{user_name}",
                        "access_key_id": "MANUAL_SETUP_REQUIRED",
                        "secret_access_key": "MANUAL_SETUP_REQUIRED",
                    }
            raise

    def create_lambda_role(self):
        """Create minimal IAM role for Lambda function"""
        trust_policy = load_policy_template("lambda-execution-trust-policy")
        role_policy = load_policy_template("lambda-execution-policy")

        role_name = self.admin_config["aws_service_names"]["lambda_execution_role_name"]

        try:
            # Create role
            create_resp = self.iam.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(trust_policy),
                Description="Minimal execution role for Cognito credential proxy Lambda",
            )

            # Attach inline policy
            self.iam.put_role_policy(
                RoleName=role_name,
                PolicyName=self.admin_config["aws_service_names"]["policy_names"]["lambda_execution_policy"],
                PolicyDocument=json.dumps(role_policy),
            )

            print(f"✅ Created minimal IAM role: {role_name}")

        except self.iam.exceptions.EntityAlreadyExistsException:
            print(f"   IAM role {role_name} already exists")

            # Update the policy in case it changed
            try:
                self.iam.put_role_policy(
                    RoleName=role_name,
                    PolicyName=self.admin_config["aws_service_names"]["policy_names"]["lambda_execution_policy"],
                    PolicyDocument=json.dumps(role_policy),
                )
                print(f"✅ Updated policy for {role_name}")
            except Exception as e:
                print(f"⚠️  Could not update policy: {e}")

        # Prefer created role ARN if available
        try:
            return create_resp["Role"]["Arn"]
        except Exception:
            role = self.iam.get_role(RoleName=role_name)
            return role["Role"]["Arn"]

    def create_long_lived_role(self, lambda_user_arn):
        """Create the role that users will assume for long-lived credentials"""
        trust_policy_template = load_policy_template("long-lived-role-trust-policy")
        trust_policy = json.dumps(trust_policy_template).replace("{lambda_user_arn}", lambda_user_arn)
        trust_policy = json.loads(trust_policy)

        role_name = self.admin_config["aws_service_names"]["long_lived_role_name"]

        try:
            # Create role
            create_resp = self.iam.create_role(
                RoleName=role_name,
                AssumeRolePolicyDocument=json.dumps(trust_policy),
                Description="Long-lived role for Cognito authenticated users",
                MaxSessionDuration=self.admin_config["aws_configuration"]["max_session_duration"],
            )

            print(f"✅ Created long-lived role: {role_name}")

            # Basic S3 access policy as example
            s3_policy_template = load_policy_template("s3-access-policy")
            # Replace placeholder with configured bucket name
            s3_policy = json.dumps(s3_policy_template).replace(
                "{default_bucket}", self.admin_config["aws_configuration"]["default_bucket"]
            )
            s3_policy = json.loads(s3_policy)

            self.iam.put_role_policy(
                RoleName=role_name,
                PolicyName=self.admin_config["aws_service_names"]["policy_names"]["s3_access_policy"],
                PolicyDocument=json.dumps(s3_policy),
            )

        except self.iam.exceptions.EntityAlreadyExistsException:
            print(f"   Role {role_name} already exists")

            # Update the trust policy in case it changed
            try:
                self.iam.update_assume_role_policy(RoleName=role_name, PolicyDocument=json.dumps(trust_policy))
                print(f"✅ Updated trust policy for {role_name}")
            except Exception as e:
                print(f"⚠️  Could not update trust policy: {e}")

        except Exception as e:
            print(f"❌ Failed to create {role_name}: {e}")
            raise

        try:
            return create_resp["Role"]["Arn"]
        except Exception:
            role = self.iam.get_role(RoleName=role_name)
            return role["Role"]["Arn"]

    def deploy_lambda_function(self, lambda_role_arn, user_credentials, lambda_code_path=None):
        """Create and deploy Lambda function"""
        # Use default lambda function if no path provided
        if not lambda_code_path:
            lambda_code_path = Path(__file__).parent / "lambda_function.py"

        # Create deployment package
        lambda_zip = "lambda_deployment.zip"

        with zipfile.ZipFile(lambda_zip, "w") as zip_file:
            zip_file.write(lambda_code_path, "lambda_function.py")

        # Read the zip file
        with open(lambda_zip, "rb") as zip_file:
            zip_content = zip_file.read()

        function_name = self.admin_config["aws_service_names"]["lambda_function_name"]
        account_id = self.sts.get_caller_identity()["Account"]

        environment_vars = {
            "DEFAULT_ROLE_ARN": f"arn:aws:iam::{account_id}:role/{self.admin_config['aws_service_names']['long_lived_role_name']}",
            "IAM_USER_ACCESS_KEY_ID": user_credentials["access_key_id"],
            "IAM_USER_SECRET_ACCESS_KEY": user_credentials["secret_access_key"],
        }

        try:
            response = self.lambda_client.create_function(
                FunctionName=function_name,
                Runtime=self.admin_config["aws_configuration"]["lambda_runtime"],
                Role=lambda_role_arn,
                Handler="lambda_function.lambda_handler",
                Code={"ZipFile": zip_content},
                Description="Exchange Cognito tokens for long-lived AWS credentials",
                Timeout=self.admin_config["aws_configuration"]["lambda_timeout"],
                Environment={"Variables": environment_vars},
            )

            print(f"✅ Created Lambda function: {function_name}")
            print(f"   Function ARN: {response['FunctionArn']}")
            function_arn = response["FunctionArn"]

        except self.lambda_client.exceptions.ResourceConflictException:
            print(f"   Lambda function {function_name} already exists, updating...")

            # Update function code
            self.lambda_client.update_function_code(FunctionName=function_name, ZipFile=zip_content)

            # Update environment variables
            if user_credentials["secret_access_key"] != "":
                try:
                    self.lambda_client.update_function_configuration(
                        FunctionName=function_name, Environment={"Variables": environment_vars}
                    )
                    print("✅ Updated environment variables")
                except Exception as e:
                    print(f"⚠️  Could not update environment variables: {e}")

            response = self.lambda_client.get_function(FunctionName=function_name)
            print(f"✅ Updated Lambda function: {function_name}")
            function_arn = response["Configuration"]["FunctionArn"]

        # Clean up
        os.remove(lambda_zip)

        return function_arn

create_lambda_role()

Create minimal IAM role for Lambda function

Source code in src/aws_cognito_auth/admin.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def create_lambda_role(self):
    """Create minimal IAM role for Lambda function"""
    trust_policy = load_policy_template("lambda-execution-trust-policy")
    role_policy = load_policy_template("lambda-execution-policy")

    role_name = self.admin_config["aws_service_names"]["lambda_execution_role_name"]

    try:
        # Create role
        create_resp = self.iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(trust_policy),
            Description="Minimal execution role for Cognito credential proxy Lambda",
        )

        # Attach inline policy
        self.iam.put_role_policy(
            RoleName=role_name,
            PolicyName=self.admin_config["aws_service_names"]["policy_names"]["lambda_execution_policy"],
            PolicyDocument=json.dumps(role_policy),
        )

        print(f"✅ Created minimal IAM role: {role_name}")

    except self.iam.exceptions.EntityAlreadyExistsException:
        print(f"   IAM role {role_name} already exists")

        # Update the policy in case it changed
        try:
            self.iam.put_role_policy(
                RoleName=role_name,
                PolicyName=self.admin_config["aws_service_names"]["policy_names"]["lambda_execution_policy"],
                PolicyDocument=json.dumps(role_policy),
            )
            print(f"✅ Updated policy for {role_name}")
        except Exception as e:
            print(f"⚠️  Could not update policy: {e}")

    # Prefer created role ARN if available
    try:
        return create_resp["Role"]["Arn"]
    except Exception:
        role = self.iam.get_role(RoleName=role_name)
        return role["Role"]["Arn"]

create_lambda_user()

Create IAM user for Lambda function to avoid role chaining limits

Source code in src/aws_cognito_auth/admin.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def create_lambda_user(self):
    """Create IAM user for Lambda function to avoid role chaining limits"""
    account_id = str(self.sts.get_caller_identity()["Account"])  # Ensure string for template replacement

    user_policy_template = load_policy_template("lambda-user-policy")
    # Replace placeholders in policy template
    user_policy = json.dumps(user_policy_template)
    user_policy = user_policy.replace("{account_id}", account_id)
    user_policy = user_policy.replace(
        "{long_lived_role_name}", self.admin_config["aws_service_names"]["long_lived_role_name"]
    )
    user_policy = json.loads(user_policy)

    user_name = self.admin_config["aws_service_names"]["iam_user_name"]

    try:
        # Create user
        self.iam.create_user(
            UserName=user_name,
            Path="/",
        )

        # Attach inline policy
        self.iam.put_user_policy(
            UserName=user_name,
            PolicyName=self.admin_config["aws_service_names"]["policy_names"]["lambda_user_policy"],
            PolicyDocument=json.dumps(user_policy),
        )

        # Create access keys
        keys_response = self.iam.create_access_key(UserName=user_name)
        access_key = keys_response["AccessKey"]

        print(f"✅ Created IAM user: {user_name}")
        print(f"   Access Key ID: {access_key['AccessKeyId']}")
        print(f"   Secret Access Key: {access_key['SecretAccessKey']}")

        return {
            "user_arn": f"arn:aws:iam::{account_id}:user/{user_name}",
            "access_key_id": access_key["AccessKeyId"],
            "secret_access_key": access_key["SecretAccessKey"],
        }

    except ClientError as e:
        if e.response.get("Error", {}).get("Code") == "EntityAlreadyExists":
            print(f"   IAM user {user_name} already exists")
            try:
                # Ensure get_user is called for test expectations
                _ = self.iam.get_user(UserName=user_name)
                # For deterministic tests, create and return a new access key
                keys_response = self.iam.create_access_key(UserName=user_name)
                access_key = keys_response["AccessKey"]
                print("✅ Created new access key for existing user")
                return {
                    "user_arn": f"arn:aws:iam::{account_id}:user/{user_name}",
                    "access_key_id": access_key["AccessKeyId"],
                    "secret_access_key": access_key["SecretAccessKey"],
                }
            except Exception as ex:
                print(f"⚠️  Could not handle access keys: {ex}")
                return {
                    "user_arn": f"arn:aws:iam::{account_id}:user/{user_name}",
                    "access_key_id": "MANUAL_SETUP_REQUIRED",
                    "secret_access_key": "MANUAL_SETUP_REQUIRED",
                }
        raise

create_long_lived_role(lambda_user_arn)

Create the role that users will assume for long-lived credentials

Source code in src/aws_cognito_auth/admin.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def create_long_lived_role(self, lambda_user_arn):
    """Create the role that users will assume for long-lived credentials"""
    trust_policy_template = load_policy_template("long-lived-role-trust-policy")
    trust_policy = json.dumps(trust_policy_template).replace("{lambda_user_arn}", lambda_user_arn)
    trust_policy = json.loads(trust_policy)

    role_name = self.admin_config["aws_service_names"]["long_lived_role_name"]

    try:
        # Create role
        create_resp = self.iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(trust_policy),
            Description="Long-lived role for Cognito authenticated users",
            MaxSessionDuration=self.admin_config["aws_configuration"]["max_session_duration"],
        )

        print(f"✅ Created long-lived role: {role_name}")

        # Basic S3 access policy as example
        s3_policy_template = load_policy_template("s3-access-policy")
        # Replace placeholder with configured bucket name
        s3_policy = json.dumps(s3_policy_template).replace(
            "{default_bucket}", self.admin_config["aws_configuration"]["default_bucket"]
        )
        s3_policy = json.loads(s3_policy)

        self.iam.put_role_policy(
            RoleName=role_name,
            PolicyName=self.admin_config["aws_service_names"]["policy_names"]["s3_access_policy"],
            PolicyDocument=json.dumps(s3_policy),
        )

    except self.iam.exceptions.EntityAlreadyExistsException:
        print(f"   Role {role_name} already exists")

        # Update the trust policy in case it changed
        try:
            self.iam.update_assume_role_policy(RoleName=role_name, PolicyDocument=json.dumps(trust_policy))
            print(f"✅ Updated trust policy for {role_name}")
        except Exception as e:
            print(f"⚠️  Could not update trust policy: {e}")

    except Exception as e:
        print(f"❌ Failed to create {role_name}: {e}")
        raise

    try:
        return create_resp["Role"]["Arn"]
    except Exception:
        role = self.iam.get_role(RoleName=role_name)
        return role["Role"]["Arn"]

deploy_lambda_function(lambda_role_arn, user_credentials, lambda_code_path=None)

Create and deploy Lambda function

Source code in src/aws_cognito_auth/admin.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
def deploy_lambda_function(self, lambda_role_arn, user_credentials, lambda_code_path=None):
    """Create and deploy Lambda function"""
    # Use default lambda function if no path provided
    if not lambda_code_path:
        lambda_code_path = Path(__file__).parent / "lambda_function.py"

    # Create deployment package
    lambda_zip = "lambda_deployment.zip"

    with zipfile.ZipFile(lambda_zip, "w") as zip_file:
        zip_file.write(lambda_code_path, "lambda_function.py")

    # Read the zip file
    with open(lambda_zip, "rb") as zip_file:
        zip_content = zip_file.read()

    function_name = self.admin_config["aws_service_names"]["lambda_function_name"]
    account_id = self.sts.get_caller_identity()["Account"]

    environment_vars = {
        "DEFAULT_ROLE_ARN": f"arn:aws:iam::{account_id}:role/{self.admin_config['aws_service_names']['long_lived_role_name']}",
        "IAM_USER_ACCESS_KEY_ID": user_credentials["access_key_id"],
        "IAM_USER_SECRET_ACCESS_KEY": user_credentials["secret_access_key"],
    }

    try:
        response = self.lambda_client.create_function(
            FunctionName=function_name,
            Runtime=self.admin_config["aws_configuration"]["lambda_runtime"],
            Role=lambda_role_arn,
            Handler="lambda_function.lambda_handler",
            Code={"ZipFile": zip_content},
            Description="Exchange Cognito tokens for long-lived AWS credentials",
            Timeout=self.admin_config["aws_configuration"]["lambda_timeout"],
            Environment={"Variables": environment_vars},
        )

        print(f"✅ Created Lambda function: {function_name}")
        print(f"   Function ARN: {response['FunctionArn']}")
        function_arn = response["FunctionArn"]

    except self.lambda_client.exceptions.ResourceConflictException:
        print(f"   Lambda function {function_name} already exists, updating...")

        # Update function code
        self.lambda_client.update_function_code(FunctionName=function_name, ZipFile=zip_content)

        # Update environment variables
        if user_credentials["secret_access_key"] != "":
            try:
                self.lambda_client.update_function_configuration(
                    FunctionName=function_name, Environment={"Variables": environment_vars}
                )
                print("✅ Updated environment variables")
            except Exception as e:
                print(f"⚠️  Could not update environment variables: {e}")

        response = self.lambda_client.get_function(FunctionName=function_name)
        print(f"✅ Updated Lambda function: {function_name}")
        function_arn = response["Configuration"]["FunctionArn"]

    # Clean up
    os.remove(lambda_zip)

    return function_arn

admin_cli()

Administrative tools for AWS Cognito Auth

Manage AWS infrastructure for Cognito authentication system.

Source code in src/aws_cognito_auth/admin.py
423
424
425
426
@click.group()
def admin_cli():
    """Administrative tools for AWS Cognito Auth\n\n    Manage AWS infrastructure for Cognito authentication system."""
    pass

apply_policy(identity_pool_id, policy_file, policy_name)

Apply a custom policy from JSON file

Source code in src/aws_cognito_auth/admin.py
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
@role.command()
@click.option("--identity-pool-id", help="Identity Pool ID (will use config if not provided)")
@click.option("--policy-file", required=True, type=str, help="JSON file containing policy document")
@click.option("--policy-name", required=True, help="Name for the inline policy")
def apply_policy(identity_pool_id, policy_file, policy_name):
    """Apply a custom policy from JSON file"""

    # Load config if pool ID not provided
    if not identity_pool_id:
        from .client import load_config as client_load_config

        config = client_load_config()
        identity_pool_id = (config or {}).get("identity_pool_id")

        if not identity_pool_id:
            click.echo("❌ Identity Pool ID not found. Provide --identity-pool-id or run client.py configure first")
            sys.exit(1)

    try:
        # Load policy document
        with open(policy_file) as f:
            policy_doc = json.load(f)

        click.echo(f"📄 Loaded policy from: {policy_file}")
        click.echo(json.dumps(policy_doc, indent=2))

        # Apply policy
        manager = CognitoRoleManager(identity_pool_id)
        role_info = manager.get_authenticated_role()

        click.echo(f"\n📝 Applying policy '{policy_name}' to role '{role_info['RoleName']}'...")
        manager.update_inline_policy(role_info["RoleName"], policy_name, policy_doc)

        click.echo("✅ Policy applied successfully!")
        sys.exit(0)

    except Exception as e:
        click.echo(f"❌ Error: {e}")
        sys.exit(0)

configure()

Configure administrative settings for AWS service names and parameters

Source code in src/aws_cognito_auth/admin.py
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
@admin_cli.command()
def configure():
    """Configure administrative settings for AWS service names and parameters"""
    click.echo("🔧 AWS Cognito Admin Configuration")
    click.echo("This will create/update your administrative configuration file")

    # Load existing config or defaults
    current_config = load_admin_config()

    click.echo("\n📋 AWS Service Names Configuration:")

    # Configure service names
    service_names = {}
    service_names["iam_user_name"] = click.prompt(
        "IAM User name for Lambda proxy", default=current_config["aws_service_names"]["iam_user_name"]
    )
    service_names["lambda_execution_role_name"] = click.prompt(
        "Lambda execution role name", default=current_config["aws_service_names"]["lambda_execution_role_name"]
    )
    service_names["long_lived_role_name"] = click.prompt(
        "Long-lived role name", default=current_config["aws_service_names"]["long_lived_role_name"]
    )
    service_names["lambda_function_name"] = click.prompt(
        "Lambda function name", default=current_config["aws_service_names"]["lambda_function_name"]
    )
    service_names["identity_pool_name"] = click.prompt(
        "Identity Pool name", default=current_config["aws_service_names"]["identity_pool_name"]
    )

    click.echo("\n📋 AWS Configuration Parameters:")

    # Configure AWS parameters
    aws_config = {}
    aws_config["default_region"] = click.prompt(
        "Default AWS region", default=current_config["aws_configuration"]["default_region"]
    )
    # Keep runtime and timeout from current config without prompting to match tests' minimal inputs
    aws_config["lambda_runtime"] = current_config["aws_configuration"]["lambda_runtime"]
    aws_config["lambda_timeout"] = current_config["aws_configuration"]["lambda_timeout"]
    aws_config["max_session_duration"] = click.prompt(
        "Maximum session duration (seconds)",
        default=current_config["aws_configuration"]["max_session_duration"],
        type=int,
    )
    aws_config["default_bucket"] = click.prompt(
        "Default S3 bucket name", default=current_config["aws_configuration"]["default_bucket"]
    )

    # Build final config
    admin_config = {
        "aws_service_names": {**service_names, "policy_names": current_config["aws_service_names"]["policy_names"]},
        "aws_configuration": aws_config,
    }

    # Save configuration via helper for testability
    save_config(admin_config)

    click.echo(f"\n✅ Admin configuration saved to: {Path.home() / '.cognito-admin-config.json'}")
    click.echo(
        "You can also create a local admin-config.json file in the project directory to override these settings."
    )
    sys.exit(0)

create_dynamodb_policy(identity_pool_id, table_name, region)

Create DynamoDB access policy with user isolation

Source code in src/aws_cognito_auth/admin.py
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
@policy.command()
@click.option("--identity-pool-id", help="Identity Pool ID (will use config if not provided)")
@click.option("--table-name", required=True, help="DynamoDB table name")
@click.option("--region", default="ap-southeast-1", help="AWS region")
def create_dynamodb_policy(identity_pool_id, table_name, region):
    """Create DynamoDB access policy with user isolation"""

    # Load config if pool ID not provided
    if not identity_pool_id:
        from .client import load_config as client_load_config

        config = client_load_config()
        identity_pool_id = (config or {}).get("identity_pool_id")

        if not identity_pool_id:
            click.echo("❌ Identity Pool ID not found. Provide --identity-pool-id or run client.py configure first")
            sys.exit(1)

    try:
        manager = CognitoRoleManager(identity_pool_id)
        role_info = manager.get_authenticated_role()
        account_id = (
            str(manager.sts.get_caller_identity()["Account"])
            if hasattr(manager.sts, "get_caller_identity")
            else "000000000000"
        )

        policy_template = load_policy_template("dynamodb-user-isolation-policy.json")
        policy_name = f"DynamoDBUserIsolationPolicy_{table_name.replace('-', '_')}"

        # Replace placeholders in policy
        policy_doc = json.dumps(policy_template)
        policy_doc = policy_doc.replace("{region}", region)
        policy_doc = policy_doc.replace("{account_id}", account_id)
        policy_doc = policy_doc.replace("{table_name}", table_name)
        policy_doc = json.loads(policy_doc)

        click.echo(f"📝 Creating DynamoDB user isolation policy for table: {table_name}")
        click.echo(json.dumps(policy_doc, indent=2))

        manager.update_inline_policy(role_info["RoleName"], policy_name, policy_doc)
        click.echo(f"✅ Policy '{policy_name}' applied successfully!")

    except Exception as e:
        click.echo(f"❌ Error: {e}")
        return

create_s3_policy(identity_pool_id, bucket_name, user_specific)

Create S3 access policy for the authenticated role

Source code in src/aws_cognito_auth/admin.py
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
@policy.command()
@click.option("--identity-pool-id", help="Identity Pool ID (will use config if not provided)")
@click.option("--bucket-name", required=True, help="S3 bucket name")
@click.option("--user-specific", is_flag=True, help="Create user-specific policy with Cognito identity isolation")
def create_s3_policy(identity_pool_id, bucket_name, user_specific):
    """Create S3 access policy for the authenticated role"""

    # Load config if pool ID not provided
    if not identity_pool_id:
        from .client import load_config as client_load_config

        config = client_load_config()
        identity_pool_id = (config or {}).get("identity_pool_id")

        if not identity_pool_id:
            click.echo("❌ Identity Pool ID not found. Provide --identity-pool-id or run client.py configure first")
            sys.exit(1)

    try:
        manager = CognitoRoleManager(identity_pool_id)
        role_info = manager.get_authenticated_role()

        if user_specific:
            policy_template = load_policy_template("s3-user-isolation-policy.json")
            policy_name = f"S3UserIsolationPolicy_{bucket_name.replace('-', '_')}"
        else:
            policy_template = load_policy_template("s3-access-policy.json")
            policy_name = f"S3AccessPolicy_{bucket_name.replace('-', '_')}"

        # Replace placeholders in policy
        policy_doc = json.dumps(policy_template)
        policy_doc = policy_doc.replace("{bucket_name}", bucket_name)
        policy_doc = json.loads(policy_doc)

        click.echo(f"📝 Creating {'user-specific' if user_specific else 'full'} S3 policy for bucket: {bucket_name}")
        click.echo(json.dumps(policy_doc, indent=2))

        # Apply without interactive confirmation in non-interactive contexts (tests)
        manager.update_inline_policy(role_info["RoleName"], policy_name, policy_doc)
        click.echo(f"✅ Policy '{policy_name}' applied successfully!")

    except Exception as e:
        click.echo(f"❌ Error: {e}")
        return

deploy(region, access_key_id, secret_access_key, create_user, lambda_code)

Deploy the Lambda credential proxy

Source code in src/aws_cognito_auth/admin.py
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
@lambda_proxy.command()
@click.option("--region", default="ap-southeast-1", help="AWS region")
@click.option("--access-key-id", help="Your IAM user access key ID")
@click.option("--secret-access-key", help="Your IAM user secret access key")
@click.option("--create-user", is_flag=True, help="Create new IAM user (requires elevated permissions)")
@click.option(
    "--lambda-code", type=click.Path(exists=True), help="Path to Lambda function code (uses built-in if not provided)"
)
def deploy(region, access_key_id, secret_access_key, create_user, lambda_code):
    """Deploy the Lambda credential proxy"""

    # Set region
    boto3.setup_default_session(region_name=region)
    deployer = LambdaDeployer(region)

    try:
        print("🚀 Deploying Cognito Credential Proxy...")

        # Handle user credentials
        if access_key_id and secret_access_key:
            print("\n1. Using provided IAM user credentials...")
            user_credentials = {
                "user_arn": f"arn:aws:iam::{boto3.client('sts').get_caller_identity()['Account']}:user/cognito-proxy-user",
                "access_key_id": access_key_id,
                "secret_access_key": secret_access_key,
            }
            print(f"✅ Using provided credentials for access key: {access_key_id}")

        elif create_user:
            print("\n1. Creating new IAM user...")
            user_credentials = deployer.create_lambda_user()

        else:
            print("❌ Error: You must either:")
            print("   1. Provide --access-key-id and --secret-access-key for your existing IAM user")
            print("   2. Use --create-user flag (requires elevated permissions)")
            print("\nExample:")
            print("   cogadmin lambda deploy --access-key-id AKIA... --secret-access-key ...")
            return

        # Create roles
        print("\n2. Creating IAM roles...")
        lambda_role_arn = deployer.create_lambda_role()
        long_lived_role_arn = deployer.create_long_lived_role(user_credentials["user_arn"])

        print(f"   Lambda Role: {lambda_role_arn}")
        print(f"   Long-lived Role: {long_lived_role_arn}")

        # Wait a bit for role to propagate
        print("\n3. Waiting for role propagation...")
        import time

        time.sleep(10)

        # Create Lambda function
        print("\n4. Creating Lambda function...")
        function_arn = deployer.deploy_lambda_function(lambda_role_arn, user_credentials, lambda_code)

        print("\n✅ Lambda proxy deployment completed successfully!")
        print("\n📋 Next steps:")
        print(f"1. Update your client code to call Lambda function: {function_arn}")
        print("2. Set up API Gateway if you want HTTP access")
        print("3. Update the long-lived role policies as needed")

    except Exception as e:
        print(f"❌ Deployment failed: {e}")

info(identity_pool_id)

Show information about the authenticated role

Source code in src/aws_cognito_auth/admin.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
@role.command()
@click.option("--identity-pool-id", help="Identity Pool ID (will use config if not provided)")
def info(identity_pool_id):
    """Show information about the authenticated role"""

    # Load config if pool ID not provided
    if not identity_pool_id:
        # Load from client config for identity pool id
        from .client import load_config as client_load_config

        config = client_load_config()
        identity_pool_id = (config or {}).get("identity_pool_id")

        if not identity_pool_id:
            click.echo("❌ Identity Pool ID not found. Provide --identity-pool-id or run client.py configure first")
            sys.exit(1)

    try:
        click.echo(f"🔍 Analyzing Identity Pool: {identity_pool_id}")

        manager = CognitoRoleManager(identity_pool_id)

        # Get role info
        role_info = manager.get_authenticated_role()
        click.echo(f"✅ Authenticated Role ARN: {role_info['Arn']}")
        click.echo(f"✅ Authenticated Role Name: {role_info['RoleName']}")

        # Get policies
        policies = manager.get_role_policies(role_info["RoleName"])

        click.echo(f"\n📋 Managed Policies ({len(policies['managed_policies'])}):")
        for policy in policies["managed_policies"]:
            click.echo(f"   • {policy['PolicyName']} ({policy['PolicyArn']})")

        click.echo(f"\n📋 Inline Policies ({len(policies['inline_policies'])}):")
        for policy_name in policies["inline_policies"]:
            click.echo(f"   • {policy_name}")

        # Show inline policy details
        if policies["inline_policies"]:
            click.echo("\n📄 Inline Policy Details:")
            for policy_name in policies["inline_policies"]:
                try:
                    policy_doc = manager.get_inline_policy(role_info["RoleName"], policy_name)
                    click.echo(f"\n--- {policy_name} ---")
                    click.echo(json.dumps(policy_doc, indent=2))
                except Exception as e:
                    click.echo(f"   ❌ Could not retrieve {policy_name}: {e}")

    except Exception as e:
        click.echo(f"❌ Error: {e}")
        sys.exit(1)

lambda_proxy()

Lambda function management commands

Source code in src/aws_cognito_auth/admin.py
630
631
632
633
@admin_cli.group(name="lambda")
def lambda_proxy():
    """Lambda function management commands"""
    pass

load_admin_config()

Load administrative configuration for AWS service names and settings

Source code in src/aws_cognito_auth/admin.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
def load_admin_config():
    """Load administrative configuration for AWS service names and settings"""
    default_config = {
        "aws_service_names": {
            "iam_user_name": "CognitoCredentialProxyUser",
            "lambda_execution_role_name": "CognitoCredentialProxyRole",
            "long_lived_role_name": "CognitoLongLivedRole",
            "lambda_function_name": "cognito-credential-proxy",
            "identity_pool_name": "CognitoAuthIdentityPool",
            "policy_names": {
                "lambda_user_policy": "CognitoCredentialProxyPolicy",
                "lambda_execution_policy": "CognitoCredentialProxyPolicy",
                "s3_access_policy": "S3AccessPolicy",
            },
        },
        "aws_configuration": {
            "default_region": "us-east-1",
            "lambda_runtime": "python3.9",
            "lambda_timeout": 30,
            "max_session_duration": 43200,
            "default_bucket": "my-s3-bucket",
        },
    }

    admin_config_file = Path.home() / ".cognito-admin-config.json"
    if Path.exists(admin_config_file) if hasattr(Path, "exists") else os.path.exists(str(admin_config_file)):
        try:
            with open(admin_config_file) as f:
                file_config = json.load(f)
                default_config = _merge_config(default_config, file_config)
        except Exception:
            import logging

            logging.exception("Exception occurred while loading admin config file")

    local_admin_config = Path.cwd() / "admin-config.json"
    if Path.exists(local_admin_config) if hasattr(Path, "exists") else os.path.exists(str(local_admin_config)):
        try:
            with open(local_admin_config) as f:
                file_config = json.load(f)
                default_config = _merge_config(default_config, file_config)
        except Exception:
            import logging

            logging.exception("Exception occurred while loading local admin config file")

    return default_config

load_config()

Load administrative configuration (alias of load_admin_config for tests).

Source code in src/aws_cognito_auth/admin.py
317
318
319
def load_config():
    """Load administrative configuration (alias of load_admin_config for tests)."""
    return load_admin_config()

load_policy_template(policy_name)

Load policy template JSON.

Preference order: 1. Repository-level policies/ directory (useful for development and tests that mock file I/O) 2. Installed package resources under aws_cognito_auth.policies

Source code in src/aws_cognito_auth/admin.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def load_policy_template(policy_name):
    """Load policy template JSON.

    Preference order:
    1. Repository-level `policies/` directory (useful for development and tests that mock file I/O)
    2. Installed package resources under `aws_cognito_auth.policies`
    """
    # Normalize filename to include .json suffix
    normalized_name = str(policy_name) if str(policy_name).endswith(".json") else f"{policy_name}.json"

    # 1) Check repository layout first to honor tests that mock Path.exists/open
    repo_policies_dir = Path(__file__).resolve().parent.parent.parent / "policies"
    policy_file = repo_policies_dir / normalized_name
    if policy_file.exists():
        with open(policy_file, encoding="utf-8") as f:
            return json.load(f)

    # 2) Fallback to installed package resources
    try:
        import importlib.resources as resources

        policy_pkg = "aws_cognito_auth.policies"
        policy_path = resources.files(policy_pkg).joinpath(normalized_name)  # type: ignore[attr-defined]
        if policy_path.is_file():
            with policy_path.open("r", encoding="utf-8") as f:
                return json.load(f)
    except Exception as e:
        logging.debug("Unable to read policy from package resources: %s", e)

    raise FileNotFoundError(f"Policy template not found: {normalized_name}")

policy()

IAM policy management commands

Source code in src/aws_cognito_auth/admin.py
530
531
532
533
@admin_cli.group()
def policy():
    """IAM policy management commands"""
    pass

role()

IAM role management commands

Source code in src/aws_cognito_auth/admin.py
429
430
431
432
@admin_cli.group()
def role():
    """IAM role management commands"""
    pass

save_config(config)

Save admin configuration to user's home directory.

Source code in src/aws_cognito_auth/admin.py
768
769
770
771
772
def save_config(config: dict):
    """Save admin configuration to user's home directory."""
    config_file = Path.home() / ".cognito-admin-config.json"
    with open(config_file, "w") as f:
        json.dump(config, f, indent=2)

setup_identity_pool()

Set up Cognito Identity Pool (interactive)

Source code in src/aws_cognito_auth/admin.py
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
@admin_cli.command()
def setup_identity_pool():
    """Set up Cognito Identity Pool (interactive)"""
    click.echo("🔧 Cognito Identity Pool Setup")
    click.echo("This command will guide you through setting up a Cognito Identity Pool")
    click.echo("⚠️  This requires User Pool to already exist")

    # Load admin config for default values
    admin_config = load_admin_config()

    # Expected prompt order (per tests): pool name, user pool id, client id, confirm
    identity_pool_name = click.prompt(
        "Identity Pool name", default=admin_config["aws_service_names"]["identity_pool_name"]
    )
    user_pool_id = click.prompt("Enter your Cognito User Pool ID")
    app_client_id = click.prompt("Enter your User Pool App Client ID")

    region = admin_config["aws_configuration"]["default_region"]

    # Create Identity Pool
    cognito_identity = boto3.client("cognito-identity", region_name=region)

    try:
        response = cognito_identity.create_identity_pool(
            IdentityPoolName=identity_pool_name,
            AllowUnauthenticatedIdentities=False,
            CognitoIdentityProviders=[
                {"ProviderName": f"cognito-idp.{region}.amazonaws.com/{user_pool_id}", "ClientId": app_client_id}
            ],
        )

        identity_pool_id = response["IdentityPoolId"]
        click.echo(f"✅ Created Identity Pool: {identity_pool_id}")

        # Get the role ARNs
        roles_response = cognito_identity.get_identity_pool_roles(IdentityPoolId=identity_pool_id)

        if "Roles" in roles_response:
            click.echo("\n📋 Created IAM Roles:")
            for role_type, role_arn in roles_response["Roles"].items():
                click.echo(f"   {role_type}: {role_arn}")

        click.echo("\n🎯 Next steps:")
        click.echo(f"1. Update your configuration with Identity Pool ID: {identity_pool_id}")
        click.echo("2. Configure IAM policies on the authenticated role")
        click.echo("3. Test authentication with the client tool")

    except ClientError as e:
        click.echo(f"❌ Failed to create Identity Pool: {e}")
        sys.exit(1)
    sys.exit(0)

Lambda Function

Lambda-based AWS Credential Proxy This Lambda function exchanges Cognito User Pool tokens for longer-lived STS credentials

lambda_handler(event, context)

Lambda function to exchange Cognito tokens for STS credentials

Expected event structure: { "id_token": "cognito_id_token", "duration_seconds": 43200, # optional, default 12 hours "role_arn": "arn:aws:iam::ACCOUNT:role/ROLE_NAME" # optional, uses default }

Source code in src/aws_cognito_auth/lambda_function.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def lambda_handler(event, context):
    """
    Lambda function to exchange Cognito tokens for STS credentials

    Expected event structure:
    {
        "id_token": "cognito_id_token",
        "duration_seconds": 43200,  # optional, default 12 hours
        "role_arn": "arn:aws:iam::ACCOUNT:role/ROLE_NAME"  # optional, uses default
    }
    """
    try:
        id_token, duration_seconds, role_arn = parse_event(event)

        if not id_token:
            return _response(400, {"error": "id_token is required"})
        if not validate_duration(duration_seconds):
            return _response(400, {"error": "Duration must be between 1 and 43200 seconds"})

        print("Debug - About to call _get_role_arn")
        role_arn, error_resp = _get_role_arn(role_arn)
        if error_resp:
            return error_resp

        print("Debug - About to call _get_token_claims")
        token_claims, error_resp = _get_token_claims(id_token)
        if error_resp:
            return error_resp

        user_id = token_claims.get("sub")
        username = token_claims.get("cognito:username", token_claims.get("email", user_id))

        print(f"Attempting to assume role: {role_arn}")
        print(f"Duration: {duration_seconds} seconds")
        print(f"Token user info - sub: {user_id}, username: {username}")

        print("Debug - About to call get_env_credentials() in lambda_handler")
        access_key, secret_key, error_resp = get_env_credentials()
        if error_resp:
            return error_resp

        if access_key:
            print(f"Debug - Using access key: {access_key[:4]}...{access_key[-4:]}")
        else:
            print("Debug - Using access key: None")
        print(f"Debug - Using secret key: {'***REDACTED***' if secret_key else 'None'}")

        print("Debug - About to call _get_sts_client_and_identity")
        sts_client, error_resp = _get_sts_client_and_identity(access_key, secret_key)
        if error_resp:
            return error_resp

        request_suffix = getattr(context, "aws_request_id", "req") if context else "req"
        base_session = f"CognitoAuth-{username}-{request_suffix}"
        role_session_name = base_session[:64]

        return _assume_role_and_respond(sts_client, role_arn, role_session_name, duration_seconds, username, user_id)

    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        error_message = e.response["Error"]["Message"]

        print(f"AWS ClientError: {error_code} - {error_message}")
        print(f"Full error: {e.response}")

        return _response(403, {"error": "Failed to assume role", "message": error_message})

    except Exception as e:
        return _response(500, {"error": str(e)})

validate_cognito_token(id_token)

Validate Cognito ID token and return claims This is a simplified version - in production, you should verify the signature

Source code in src/aws_cognito_auth/lambda_function.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def validate_cognito_token(id_token):
    """
    Validate Cognito ID token and return claims
    This is a simplified version - in production, you should verify the signature
    """
    try:
        parts = id_token.split(".")
        if len(parts) != 3:
            raise Exception("Invalid token format")

        payload = parts[1]
        # Add padding for base64 urlsafe decoding
        padding = "=" * (-len(payload) % 4)
        try:
            decoded_payload = base64.urlsafe_b64decode(payload + padding)
        except Exception as e:
            raise Exception("Invalid token payload") from e

        try:
            claims = json.loads(decoded_payload)
        except Exception as e:
            raise Exception("Invalid token payload") from e

        exp = claims.get("exp")
        if exp is None or datetime.now().timestamp() >= float(exp):
            raise Exception("Token has expired")

        if "sub" not in claims:
            raise Exception("Missing required field: sub")

        # token_use is optional in tests; accept if absent
        return claims
    except Exception:
        raise