Logo Search packages:      
Sourcecode: landscape-client version File versions

provider.py

from pwd import struct_passwd
from grp import struct_group
import csv
import subprocess
import logging


00008 class UserManagementError(Exception):
    """Catch all error for problems with User Management."""


00012 class UserNotFoundError(Exception):
    """Raised when a user couldn't be found by uid/username."""


00016 class GroupNotFoundError(Exception):
    """Raised when a group couldn't be found by gid/groupname."""


00020 class UserProviderBase(object):
    """This is a base class for user Providers."""

    def __init__(self, locked_users=None):
        self.locked_users = locked_users or []

        self._min_uid = 1000
        self._max_uid = 60000

00029     def get_users(self):
        """Returns a list of all local users on the computer.

        Each user is represented as a dict with the keys: C{username},
        C{name}, C{uid}, C{enabled}, C{location}, C{work-phone} and
        C{home-phone}.  
        """
        users = []
        found_usernames = set()
        for user in self.get_user_data():
            if not isinstance(user, struct_passwd):
                user = struct_passwd(user)
            if user.pw_name in found_usernames:
                continue
            gecos_data = [x.decode("utf-8", "replace") or None
                          for x in user.pw_gecos.split(",")[:4]]
            while len(gecos_data) < 4:
                gecos_data.append(None)
            name, location, work_phone, home_phone = tuple(gecos_data)
            enabled = user.pw_name not in self.locked_users
            users.append({"username": user.pw_name, "name": name,
                          "uid": user.pw_uid, "enabled": enabled,
                          "location": location, "work-phone": work_phone,
                          "home-phone": home_phone,
                          "primary-gid": user.pw_gid})
            found_usernames.add(user.pw_name)
        return users

00057     def get_groups(self):
        """Returns a list of groups on the computer.

        Each group is represented as a dict with the keys: C{name},
        C{gid} and C{members}.
        """
        user_names = set([x["username"] for x in self.get_users()])
        groups = []
        found_groupnames = set()
        for group in self.get_group_data():
            if not isinstance(group, struct_group):
                group = struct_group(group)
            if group.gr_name in found_groupnames:
                continue
            member_names = user_names.intersection(group.gr_mem)
            groups.append({"name": group.gr_name, "gid": group.gr_gid,
                           "members": list(member_names)})
            found_groupnames.add(group.gr_name)
        return groups

00077     def get_uid(self, username):
        """Returns the UID for C{username}.

        @raises UserNotFoundError: Raised if C{username} doesn't match a
            user on the computer.
        """
        for data in self.get_users():
            if data["username"] == username:
                return data["uid"]
        raise UserNotFoundError("UID not found for user %s." % username)

00088     def get_gid(self, groupname):
        """Returns the GID for C{groupname}.

        @raises UserManagementError: Raised if C{groupname} doesn't
            match a group on the computer.
        """
        for data in self.get_groups():
            if data["name"] == groupname:
                return data["gid"]
        raise GroupNotFoundError("Group not found for group %s." % groupname)

class UserProvider(UserProviderBase):

    popen = subprocess.Popen

    passwd_fields = ["username", "passwd", "uid", "primary-gid", "gecos",
                     "home", "shell"]
    group_fields = ["name", "passwd", "gid", "members"]

    def __init__(self, locked_users=[], passwd_file="/etc/passwd",
                 group_file="/etc/group"):
        super(UserProvider, self).__init__(locked_users)
        self._passwd_file = passwd_file
        self._group_file = group_file

    def get_user_data(self):
        """
        Parse passwd(5) formatted files and return tuples of user data in the
        form (username, password, uid, primary-group-id, gecos data, home
        directory, path to the user's shell)
        """
        user_data = []
        passwd_file = open(self._passwd_file, "r")
        reader = csv.DictReader(passwd_file, fieldnames=self.passwd_fields,
                                delimiter=":", quoting=csv.QUOTE_NONE)
        current_line = 0
        for row in reader:
            current_line += 1
            # This skips the NIS user marker in the passwd file.
            if (row["username"].startswith("+") or
                row["username"].startswith("-")):
                continue
            try:
                user_data.append((row["username"], row["passwd"],
                                  int(row["uid"]), int(row["primary-gid"]),
                                  row["gecos"], row["home"], row["shell"]))
            except (ValueError, TypeError):

                logging.warn("passwd file %s is incorrectly formatted: "
                             "line %d." % (self._passwd_file, current_line))

        passwd_file.close()
        return user_data

    def get_group_data(self):
        """
        Parse group(5) formatted files and return tuples of group data in the
        form (groupname, group password, group id and a list of member
        usernames).
        """
        group_data = []
        group_file = open(self._group_file, "r")
        reader = csv.DictReader(group_file, fieldnames=self.group_fields,
                                delimiter=":", quoting=csv.QUOTE_NONE)
        current_line = 0
        for row in reader:
            current_line += 1
            # Skip if we find the NIS marker
            if (row["name"].startswith("+") or
                row["name"].startswith("-")):
                continue
            try:
                group_data.append((row["name"], row["passwd"], int(row["gid"]), 
                                   row["members"].split(",")))
            except (AttributeError, ValueError):
                logging.warn("group file %s is incorrectly formatted: "
                             "line %d." % (self._group_file, current_line))
        group_file.close()
        return group_data
 

Generated by  Doxygen 1.6.0   Back to index