You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
244 lines
9.1 KiB
244 lines
9.1 KiB
#!/usr/bin/python2.7
|
|
# Copyright 2019 The Chromium Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""This implements a simple tool to create policy blobs signed with a given
|
|
key. It can create both device and user policies. The output will consist of
|
|
two files a policy file and the owner.key file which contains the policy
|
|
signature.
|
|
|
|
The input file is JSON. The root dictionary contains a list under the
|
|
key "managed_users". Keys in the root dictionary identify request scopes.
|
|
The user-request scope is described by a dictionary that holds two
|
|
sub-dictionaries: "mandatory" and "recommended". Both these hold the policy
|
|
definitions as key/value stores, their format is identical to what the Linux
|
|
implementation reads from /etc.
|
|
The device-scope holds the policy-definition directly as key/value stores
|
|
in the protobuf-format.
|
|
|
|
Example:
|
|
|
|
{
|
|
"google/chromeos/device" : {
|
|
"guest_mode_enabled" : false
|
|
},
|
|
"google/chromeos/user" : {
|
|
"mandatory" : {
|
|
"HomepageLocation" : "http://www.chromium.org",
|
|
"IncognitoEnabled" : false
|
|
},
|
|
"recommended" : {
|
|
"JavascriptEnabled": false
|
|
}
|
|
}
|
|
}
|
|
|
|
"""
|
|
|
|
import optparse
|
|
import os
|
|
import re
|
|
import sys
|
|
import time
|
|
import tlslite
|
|
import tlslite.api
|
|
import tlslite.utils
|
|
|
|
# The name and availability of the json module varies in python versions.
|
|
try:
|
|
import simplejson as json
|
|
except ImportError:
|
|
try:
|
|
import json
|
|
except ImportError:
|
|
json = None
|
|
|
|
import asn1der
|
|
import device_management_backend_pb2 as dm
|
|
import cloud_policy_pb2 as cp
|
|
import chrome_device_policy_pb2 as dp
|
|
|
|
# ASN.1 object identifier for PKCS#1/RSA.
|
|
PKCS1_RSA_OID = '\x2a\x86\x48\x86\xf7\x0d\x01\x01\x01'
|
|
|
|
def SetProtobufMessageField(group_message, field, field_value):
|
|
'''Sets a field in a protobuf message.
|
|
|
|
Args:
|
|
group_message: The protobuf message.
|
|
field: The field of the message to set, it shuold be a member of
|
|
group_message.DESCRIPTOR.fields.
|
|
field_value: The value to set.
|
|
'''
|
|
if field.label == field.LABEL_REPEATED:
|
|
assert type(field_value) == list
|
|
entries = group_message.__getattribute__(field.name)
|
|
for list_item in field_value:
|
|
entries.append(list_item)
|
|
return
|
|
elif field.type == field.TYPE_BOOL:
|
|
assert type(field_value) == bool
|
|
elif field.type == field.TYPE_STRING:
|
|
assert type(field_value) == str or type(field_value) == unicode
|
|
elif field.type == field.TYPE_INT64:
|
|
assert type(field_value) == int
|
|
elif (field.type == field.TYPE_MESSAGE and
|
|
field.message_type.name == 'StringList'):
|
|
assert type(field_value) == list
|
|
entries = group_message.__getattribute__(field.name).entries
|
|
for list_item in field_value:
|
|
entries.append(list_item)
|
|
return
|
|
else:
|
|
raise Exception('Unknown field type %s' % field.type)
|
|
group_message.__setattr__(field.name, field_value)
|
|
|
|
def GatherDevicePolicySettings(settings, policies):
|
|
'''Copies all the policies from a dictionary into a protobuf of type
|
|
CloudDeviceSettingsProto.
|
|
|
|
Args:
|
|
settings: The destination ChromeDeviceSettingsProto protobuf.
|
|
policies: The source dictionary containing policies in JSON format.
|
|
'''
|
|
for group in settings.DESCRIPTOR.fields:
|
|
# Create protobuf message for group.
|
|
group_message = eval('dp.' + group.message_type.name + '()')
|
|
# Indicates if at least one field was set in |group_message|.
|
|
got_fields = False
|
|
# Iterate over fields of the message and feed them from the
|
|
# policy config file.
|
|
for field in group_message.DESCRIPTOR.fields:
|
|
field_value = None
|
|
if field.name in policies:
|
|
got_fields = True
|
|
field_value = policies[field.name]
|
|
SetProtobufMessageField(group_message, field, field_value)
|
|
if got_fields:
|
|
settings.__getattribute__(group.name).CopyFrom(group_message)
|
|
|
|
def GatherUserPolicySettings(settings, policies):
|
|
'''Copies all the policies from a dictionary into a protobuf of type
|
|
CloudPolicySettings.
|
|
|
|
Args:
|
|
settings: The destination: a CloudPolicySettings protobuf.
|
|
policies: The source: a dictionary containing policies under keys
|
|
'recommended' and 'mandatory'.
|
|
'''
|
|
for group in settings.DESCRIPTOR.fields:
|
|
# Create protobuf message for group.
|
|
group_message = eval('cp.' + group.message_type.name + '()')
|
|
# We assume that this policy group will be recommended, and only switch
|
|
# it to mandatory if at least one of its members is mandatory.
|
|
group_message.policy_options.mode = cp.PolicyOptions.RECOMMENDED
|
|
# Indicates if at least one field was set in |group_message|.
|
|
got_fields = False
|
|
# Iterate over fields of the message and feed them from the
|
|
# policy config file.
|
|
for field in group_message.DESCRIPTOR.fields:
|
|
field_value = None
|
|
if field.name in policies['mandatory']:
|
|
group_message.policy_options.mode = cp.PolicyOptions.MANDATORY
|
|
field_value = policies['mandatory'][field.name]
|
|
elif field.name in policies['recommended']:
|
|
field_value = policies['recommended'][field.name]
|
|
if field_value != None:
|
|
got_fields = True
|
|
SetProtobufMessageField(group_message, field, field_value)
|
|
if got_fields:
|
|
settings.__getattribute__(group.name).CopyFrom(group_message)
|
|
|
|
def ProcessCloudPolicy(policy_type,
|
|
policy_def, policy_key,
|
|
username,
|
|
output_path):
|
|
"""Creates a policy blob.
|
|
|
|
Encodes the policy into protobuf representation, signs it and saves it.
|
|
|
|
Args:
|
|
policy_type: can be 'google/chromeos/user' or 'google/chromeos/device'.
|
|
policy_def: The JSON file containing the policy definition.
|
|
policy_key: A private key to be used to sign the blob.
|
|
username: Username to be integrated in the policy blob.
|
|
output_path: A directory where to put the output files.
|
|
"""
|
|
policy = json.loads(open(policy_def).read())
|
|
policy_value = ''
|
|
if (policy_type in policy):
|
|
if policy_type == 'google/chromeos/user':
|
|
settings = cp.CloudPolicySettings()
|
|
GatherUserPolicySettings(settings, policy[policy_type])
|
|
policy_value = settings.SerializeToString()
|
|
elif policy_type == 'google/chromeos/device':
|
|
settings = dp.ChromeDeviceSettingsProto()
|
|
GatherDevicePolicySettings(settings, policy[policy_type])
|
|
policy_value = settings.SerializeToString()
|
|
|
|
key = tlslite.api.parsePEMKey(open(policy_key).read(), private=True)
|
|
|
|
algorithm = asn1der.Sequence(
|
|
[ asn1der.Data(asn1der.OBJECT_IDENTIFIER, PKCS1_RSA_OID),
|
|
asn1der.Data(asn1der.NULL, '') ])
|
|
rsa_pubkey = asn1der.Sequence([ asn1der.Integer(key.n),
|
|
asn1der.Integer(key.e) ])
|
|
pubkey = asn1der.Sequence([ algorithm, asn1der.Bitstring(rsa_pubkey) ])
|
|
key_version = 1
|
|
|
|
# Fill the policy data protobuf.
|
|
policy_data = dm.PolicyData()
|
|
policy_data.policy_type = policy_type
|
|
policy_data.timestamp = int(time.time() * 1000)
|
|
policy_data.request_token = "DEV_TOKEN"
|
|
policy_data.policy_value = policy_value
|
|
policy_data.machine_name = "MEAN_MACHINE"
|
|
policy_data.public_key_version = 1
|
|
policy_data.username = username
|
|
policy_data.device_id = "1337_1D"
|
|
signed_data = policy_data.SerializeToString()
|
|
|
|
response = dm.DeviceManagementResponse()
|
|
fetch_response = response.policy_response.responses.add()
|
|
fetch_response.policy_data = signed_data
|
|
fetch_response.policy_data_signature = bytes(
|
|
key.hashAndSign(signed_data))
|
|
fetch_response.new_public_key = pubkey
|
|
|
|
open("%s/policy" % output_path,"wb").
|
|
write(fetch_response.SerializeToString());
|
|
open("%s/owner.key" % output_path,"wb").write(pubkey);
|
|
|
|
def main(options):
|
|
ProcessCloudPolicy(options.policy_type,
|
|
options.policy_def, options.policy_key,
|
|
options.policy_user,
|
|
options.output_path);
|
|
|
|
if __name__ == '__main__':
|
|
option_parser = optparse.OptionParser()
|
|
option_parser.add_option('-k', '--policy-key', default="mykey",
|
|
dest='policy_key',
|
|
help='Specify a path to a PEM-encoded private key '
|
|
'to use for policy signing.')
|
|
option_parser.add_option('-p', '--policy-def', default="device_management",
|
|
dest='policy_def',
|
|
help='Specify a path to a PEM-encoded private key '
|
|
'to use for policy signing.')
|
|
option_parser.add_option('-u', '--policy-user', default='user@example.com',
|
|
dest='policy_user',
|
|
help='Specify the user name the server should '
|
|
'report back to the client as the user owning the '
|
|
'token used for making the policy request.')
|
|
option_parser.add_option('-o', '--output-path', default='.',
|
|
dest='output_path',
|
|
help='Specifies the directory to output policy '
|
|
'files to.')
|
|
option_parser.add_option('-t', '--type', default='google/chromeos/device',
|
|
dest='policy_type',
|
|
help='Specifies the type of policy to create.')
|
|
options, args = option_parser.parse_args()
|
|
|
|
sys.exit(main(options))
|