登陆Azure控制台后,在左侧导航栏选择大象图标的HDInsight集群菜单(默认情况下,需要点击更多,在弹出的菜单中才能看到此项,可以点击后面的星按钮添加收藏,以使此菜单出现在第一屏中)。
新建集群,类型选择Windows,可以自定义集群,在测试目的中选择最低配置的集群结点(节省银子,虽然是免费额度),如下图。
按下图所示选择结点的配置
第一次使用Azure一般都需要创建新的存储账户,这里的容器就相当于我们这个集群存储的根目录。在Azure中,使用Azure Storage Blob作为类似HDFS的存在。
最后的摘要页面也有明确提示,从集群创建到被删除这个过程中将会一直按照右下角显示价格进行计费,无论是否运行任务。所以对于Azure HDInsight的新用户记得用后要删除是很重要的。另外Azure PowerShell和Azure .NET SDK都提供了使用代码创建与删除HDInsight集群的方法,方便将集群的创建,任务部署与集群删除作为一系列自动化任务来完成。
点击创建按钮,集群创建工作随之开始进行。
创建前,可以点击下载模板链接,将集群创建参数作为模板保存,由于创建过程还是稍显复杂,而有了这个模板就可以在本地通过Azure CLI,PowerShell或C#代码来完成集群的创建。详见此博文
这个过程大约持续20分钟,直到仪表盘中的正在创建变成正在运行。
集群创建好后,可以进入集群的控制台看看:
除了首页为Windows平台Azure HDInsights专有,其他页面都是到YARN,HDFS(Azure Blob Storage)及Job History原有管理页面的链接。通过各自的管理页面,可以了解到MR任务的工作情况,数据存储等。
有了运行环境,我们来实现两个简单的基于.NET的MR任务,分别创建名为NetMapper和NetReducer的两个控制台应用程序。然后添加如下代码:
这些代码来自微软官方
// Mapper
class NetMapper
{
static void Main(string[] args)
{
if (args.Length > 0)
{
Console.SetIn(new StreamReader(args[0]));
}
string line;
while ((line = Console.ReadLine()) != null)
{
Console.WriteLine(line);
}
}
}
// Reducer
class NetReducer
{
static void Main(string[] args)
{
string line;
var count = 0;
if (args.Length > 0)
{
Console.SetIn(new StreamReader(args[0]));
}
while ((line = Console.ReadLine()) != null)
{
count += line.Count(cr => (cr == ' ' || cr == '\n'));
}
Console.WriteLine(count);
}
}
将这两个项目分别生成,得到NetMapper.exe与NetReducer.exe。
接着在解决方案中新建一个项目SubmitNet用于提交MR任务到Azure中,提交代码如下:
class Program
{
private static HDInsightJobManagementClient _hdiJobManagementClient;
private const string ExistingClusterUri = "test-netcore.azurehdinsight.net";
private const string ExistingClusterUsername = "admin"; //HDInsight集群默认用户名就是admin
private const string ExistingClusterPassword = "创建集群时设置的密码";
private const string DefaultStorageAccountName = "hdinsighthystar"; //存储账户名
private const string DefaultStorageAccountKey = "存储账户Key";
private const string DefaultStorageContainerName = "与HDInsight集群关联的容器的名称";
static void Main(string[] args)
{
Console.WriteLine("The application is running ...");
var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = ExistingClusterUsername, Password = ExistingClusterPassword };
_hdiJobManagementClient = new HDInsightJobManagementClient(ExistingClusterUri, clusterCredentials);
SubmitMRJob();
Console.WriteLine("Press ENTER to continue ...");
Console.ReadLine();
}
private static void SubmitMRJob()
{
var paras = new MapReduceStreamingJobSubmissionParameters
{
Files = new List<string>() { "/example/app/NetMapper.exe", "/example/app/NetReducer.exe" },
Mapper = "NetMapper.exe",
Reducer = "NetReducer.exe",
Input= "/example/data/gutenberg/davinci.txt",
Output = "/example/data/StreamingOutput/wc.txt"
};
Console.WriteLine("Submitting the MR job to the cluster...");
var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceStreamingJob(paras);
var jobId = jobResponse.JobSubmissionJsonResponse.Id;
Console.WriteLine("Response status code is " + jobResponse.StatusCode);
Console.WriteLine("JobId is " + jobId);
Console.WriteLine("Waiting for the job completion ...");
// Wait for job completion
var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
while (!jobDetail.Status.JobComplete)
{
Thread.Sleep(1000);
jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
}
// Get job output
var storageAccess = new AzureStorageAccess(DefaultStorageAccountName, DefaultStorageAccountKey,
DefaultStorageContainerName);
var output = (jobDetail.ExitValue == 0)
? _hdiJobManagementClient.JobManagement.GetJobOutput(jobId, storageAccess) // fetch stdout output in case of success
: _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); // fetch stderr output in case of failure
Console.WriteLine("Job output is: ");
using (var reader = new StreamReader(output, Encoding.UTF8))
{
string value = reader.ReadToEnd();
Console.WriteLine(value);
}
}
}